In [1]:
from cdptools import CDPInstance, configs

seattle = CDPInstance(configs.SEATTLE)
seattle

<CDPInstance [database: <CloudFirestoreDatabase [cdp-seattle]>, file_store: <GCSFileStore [cdp-seattle.appspot.com]>]>

In [2]:
from cdptools.indexers import Indexer
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import List, Dict
import pandas as pd
from datetime import timedelta
import altair as alt

def _get_indexed_event_term_references(term: str, instance: CDPInstance):
    return instance.database.select_rows_as_list("indexed_event_term", filters=[("term", term)])

def _get_event_details(event_id: str, instance: CDPInstance):
    return instance.database.select_row_by_id("event", event_id)

def get_trends(topics: List[str], window_size: int = 14, instance_config: Dict[str, str] = configs.SEATTLE):
    # Connect to instance
    instance = CDPInstance(instance_config)
    
    # Clean topics and stem
    topics = [Indexer.clean_text_for_indexing(topic) for topic in topics]
    
    # Create get term database partial
    get_term = partial(_get_indexed_event_term_references, instance=instance)
    
    # Run search for each topic
    with ThreadPoolExecutor() as exe:
        indexed_topics_lists = list(exe.map(get_term, topics))
    
    # Flatten the returned lists into one
    indexed_terms = [term for search_result in indexed_topics_lists for term in search_result]
    
    # Create get event database partial
    get_event = partial(_get_event_details, instance=instance)

    # Run get event details for each unique event
    unique_event_ids = set([it["event_id"] for it in indexed_terms])
    with ThreadPoolExecutor() as exe:
        events = list(exe.map(get_event, unique_event_ids))
                      
    # Convert lists to dataframes
    indexed_terms = pd.DataFrame(indexed_terms)
    events = pd.DataFrame(events)
                      
    # Get body information
    bodies = seattle.database.select_rows_as_dataframe("body")

    # Merge all three into a single dataframe
    indexed_terms = indexed_terms.merge(events, on="event_id", suffixes=("_indexed_term", "_event"))
    indexed_terms = indexed_terms.merge(bodies, on="body_id", suffixes=("_event", "_body"))
    
    # Reduce down the dataframe to required fields
    indexed_terms = indexed_terms[["event_datetime", "term", "name", "value"]]
    
    # Change column names
    indexed_terms = indexed_terms.rename(columns={
        "event_datetime": "datetime",
        "term": "topic",
        "name": "body"
    })
    
    # Compute windowed results
    # Iter through each indexed result of a topic and find any reference of the same term
    # Within the moving window time frame
    results = []
    for i, row in indexed_terms.iterrows():
        # Split the window into a timedelta
        td = timedelta(days=window_size // 2)

        # Create a boolean mask for the dataframe to use to determine if the row is in the
        # moving window timeframe and has the same topic
        datetime_topic_mask = (
            (indexed_terms["datetime"] > row["datetime"] - td) & 
            (indexed_terms["datetime"] < row["datetime"] + td) &
            (indexed_terms["topic"] == row["topic"])
        )
        
        # Get the matching rows
        rows_in_window = indexed_terms.loc[datetime_topic_mask]
        
        # Convert the row to a dictionary and add the sum of the values for that window as a relevance score
        this_row_result = dict(row)
        this_row_result["relevance"] = rows_in_window["value"].sum()
                      
        # Append the moving window frame for this row to the results
        results.append(this_row_result)

    # Convert results to dataframe
    results = pd.DataFrame(results)
                      
    # Render and return altair chart of the results
    return alt.Chart(results).mark_line().encode(
        x="datetime",
        y="relevance",
        color="topic",
        tooltip=["topic", "datetime", "relevance"]
    )

In [6]:
get_trends(["housing", "police", "parks"])

In [4]:
from cdptools.databases import WhereOperators
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def _get_indexed_event_terms_for_event(event_id: str, instance: CDPInstance):
    return instance.database.select_rows_as_list("indexed_event_term", filters=[("event_id", event_id)])

def get_top_recent_topics(days=30, instance_config: Dict[str, str] = configs.SEATTLE):
    # Construct instance
    instance = CDPInstance(instance_config)
    
    # Get target datetime
    target_dt = datetime.utcnow() - timedelta(days=days)
    
    # Get events in last n days
    events = instance.database.select_rows_as_list(
        "event",
        filters=[
            ("event_datetime", WhereOperators.gteq, target_dt)
        ]
    )
    
    # Create get terms partial
    get_terms_for_event = partial(_get_indexed_event_terms_for_event, instance=instance)
    
    # Get indexed_event_terms for each event
    with ThreadPoolExecutor() as exe:
        indexed_event_term_lists = exe.map(get_terms_for_event, [e["event_id"] for e in events])
    
    # Flatten lists
    indexed_terms = [term for search_result in indexed_event_term_lists for term in search_result]
    
    # Create dictionary and store cumulative term value
    results = {}
    for term_details in indexed_terms:
        if term_details["term"] not in results:
            results[term_details["term"]] = term_details["value"]
        else:
            new_value = term_details["value"]
            current_value = results[term_details["term"]]
            results[term_details["term"]] = new_value if new_value > current_value else current_value

    return sorted(results, key=results.get)