James Harrison, 2023-06-02

This is a modified version of 'run_weekly_aggregates.ipynb' which runs the aggregates using flowmachine directly, so that aggregates can optionally be unredacted.

This notebook is used to produce the following aggregates:
- Resident counts per 7-day rolling window
- Home relocations between consecutive 7-day rolling windows (i.e. offset by 1 day, unless some 7-day windows are skipped due to missing data)
- Home relocations between disjoint 7-day rolling windows (i.e. offset by 7 days)

for each day in the specified date range (by default, the most recently-ended full calendar month before today).

These aggregates are intended to be produced on an ongoing basis in preparation for crisis response work.

In [None]:
import concurrent.futures
import datetime
import json
import warnings
from pathlib import Path

import flowmachine as fm
import numpy as np
import pandas as pd
from apply_subscriber_set import ApplySubscriberSet
from dateutil.relativedelta import relativedelta
from flowclient.aggregates import (
    flows_spec,
    inflows_spec,
    outflows_spec,
    spatial_aggregate_spec,
)
from flowmachine.core.union import Union
from flowmachine.features.subscriber.per_subscriber_aggregate import (
    PerSubscriberAggregate,
)
from flowmachine.features.utilities.unique_values_from_queries import (
    UniqueValuesFromQueries,
)
from total_locatable_periods import TotalLocatablePeriods
from utils import (
    _write_query_result,
    daily_home_location_specs,
    find_dates_to_exclude,
    get_date_in_month,
    rolling_window_over_date_range,
)

# Setup

## Parameters

In [None]:
datetime_now = datetime.datetime.now()
datetime_now

In [None]:
# Parameters
author = "James Harrison <james.harrison@flowminder.org>"

start_date = get_date_in_month(
    datetime_now, day_of_month=1, month_offset=-1
)  # Start date of the data interval (inclusive)
logical_date = datetime_now
window_length = (
    7  # Length in days of the rolling window used to compute average call days
)
min_call_days = (
    2  # Minimal number of average days in a window a subscriber was sighted on
)
latest_truncation_threshold = (
    "18:00:00"  # Threshold for excluding temporally-truncated data
)

aggregation_unit = "lon-lat"  # Spatial aggregation unit
mapping_table = "geography.cell_to_admin_via_clusters_1km_20221025"
geom_table = "geography.clusters_1km_20221025"
geom_table_join_column = "cluster_id"
event_types = ["calls"]  # Event types to use

flowmachine_log_level = "info"  # Flowmachine log level
shared_data_dir = "./"  # Writable output directory
outputs_subdir = "aggregates/crisis_response"  # Subdirectory of shared data dir to which results of aggregate queries will be written
output_format = "csv"  # 'csv' or 'netcdf'
overwrite = False  # Set True to overwrite previously-saved aggregates for this month (with overwrite=False, conflicting aggregate files will be renamed)
calculate_relocations = True  # Set False to skip running the home relocations aggregate
require_latest_data = True  # If True, computation will not proceed if the last required day of data is later than the most recent available date
include_subsetted = True  # Set False to skip calculating aggregates using an "active subset" of subscribers
redact = False  # Set True to redact small counts from the aggregate outputs (as would be the case for results retrieved through the API)
aggregates_to_run = None  # Optionally specify a subset of aggregate kinds to run (ideally we wouldn't have this option and calculate_relocations as separate parameters)

In [None]:
# start/end date parameters may be strings, so convert to datetime.date
start_date = pd.Timestamp(start_date).date()
logical_date = pd.Timestamp(logical_date).date()
end_date = logical_date + relativedelta(days=1)

(start_date, end_date)

In [None]:
# Construct outputs path (we don't actually create the dir until we're ready to start writing outputs later)
outputs_path = (
    Path(shared_data_dir)
    / outputs_subdir
    / f"weekly_aggregates_{aggregation_unit}_{logical_date:%Y-%m-%d}"
)

outputs_path

## Connect

In [None]:
# Even if we're not using a subscriber subset, flowmachine connection is required to get the earliest/latest event time per day
fm.connect(
    flowdb_connection_pool_overflow=20,
    flowdb_connection_pool_size=5,
    log_level=flowmachine_log_level,
)

## Check dates

In [None]:
dates_to_skip = find_dates_to_exclude(
    flowdb_connection=fm.core.context.get_db(),
    # Kludge fix - I think something else is looking relative to the start of the month and isn't getting checks for missing dates here. -John
    start_date=start_date - relativedelta(days=window_length + 30),
    end_date=end_date,
    event_types=event_types,
    latest_truncation_threshold=latest_truncation_threshold,
    fail_on_missing_latest=require_latest_data,
)
dates_to_skip

In [None]:
# Get rolling windows
rolling_windows = rolling_window_over_date_range(
    start_date=start_date - relativedelta(days=window_length),
    end_date=end_date,
    window_length=window_length,
)

In [None]:
# Check for empty windows
empty_windows = sorted(
    [
        d
        for d in rolling_windows
        if not set(
            str(d.date())
            for d in pd.date_range(
                rolling_windows[d][0], rolling_windows[d][1], inclusive="left"
            )
        ).difference(dates_to_skip)
    ]
)

if empty_windows:
    warnings.warn(
        f"Windows for dates {empty_windows} have no data. Aggregates will not be produced for these dates."
    )

# Subscriber subset

Subscriber subsets have to be defined and run using flowmachine directly, and then the query IDs can be used to subset FlowAPI queries.

Subscriber subset is the set of subscribers who are active on `min_call_days` days in every non-empty `window_length`-day rolling window on average (median) over the specified date range.

## Define subscriber subset queries

In [None]:
if include_subsetted:
    tables = [f"events.{event_type}" for event_type in event_types]
    # Convert FlowAPI aggregation unit parameters to a flowmachine spatial unit
    if "admin" in aggregation_unit:
        spatial_unit = fm.core.spatial_unit.make_spatial_unit(
            spatial_unit_type="admin",
            level=int(aggregation_unit[-1]),
            mapping_table=mapping_table,
            geom_table=geom_table,
            geom_table_join_on=geom_table_join_column,
        )
    else:
        spatial_unit = fm.core.spatial_unit.make_spatial_unit(
            spatial_unit_type=aggregation_unit,
            mapping_table=mapping_table,
            geom_table=geom_table,
            geom_table_join_on=geom_table_join_column,
        )

In [None]:
# Get subset of subscribers with median call days per non-empty window >= `min_call_days`

if include_subsetted:
    # Count call days per subscriber per window over the month
    # (excluding the first window_length windows because these belong to the previous month)
    active_periods_queries = []
    for window in sorted(
        d
        for d in rolling_windows.keys()
        if datetime.date.fromisoformat(d) >= start_date
    ):
        try:
            active_periods_queries.append(
                TotalLocatablePeriods(
                    start=rolling_windows[window][0],
                    total_periods=window_length,
                    period_length=1,
                    period_unit="days",
                    spatial_unit=spatial_unit,
                    table=tables,
                    periods_to_exclude=dates_to_skip,
                )
            )
        except ValueError:
            # If all dates in this window are excluded, skip it
            pass

    # Need to fill counts with 0 for windows where a subscriber was inactive,
    # so that the median can be calculated correctly. For this we need the set of
    # all subscribers active in any of the windows.
    all_active_subscribers_query = UniqueValuesFromQueries(
        query_list=active_periods_queries,
        column_names="subscriber",
    )
    active_periods_for_all_subscribers_queries = []
    for active_periods_query in active_periods_queries:
        active_periods_for_all_subscribers_queries.append(
            ApplySubscriberSet(
                parent=active_periods_query,
                subscriber_set=all_active_subscribers_query,
                fill_values={"value": 0},
            )
        )

    # Find subset of subscribers that were active at least min_call_days days per window (median)
    subset_query = PerSubscriberAggregate(
        subscriber_query=Union(*active_periods_for_all_subscribers_queries),
        agg_column="value",
        agg_method="median",
    ).numeric_subset(high=np.inf, low=min_call_days, col="value")

## Run subscriber subset query

In [None]:
if include_subsetted:
    subset_query.store(store_dependencies=True).result()
    len(subset_query)

## Wrap in Table object so that flowmachine server can unpickle

In [None]:
if include_subsetted:
    subscriber_subset_table = subset_query.get_table()
    subscriber_subset_query_id = subscriber_subset_table.query_id
    subscriber_subset_query_id

Subset query id can now be passed on to API queries.

# FlowAPI side

## Define queries

In [None]:
api_specs = {}

### Home location sub-queries

In [None]:
if include_subsetted:
    home_location_specs_subset = daily_home_location_specs(
        rolling_windows=rolling_windows,
        aggregation_unit=aggregation_unit,
        mapping_table=mapping_table,
        geom_table=geom_table,
        geom_table_join_column=geom_table_join_column,
        subscriber_subset=subscriber_subset_query_id,
        event_types=event_types,
        dates_to_exclude=dates_to_skip,
    )
    home_location_specs_subset

In [None]:
home_location_specs_nosubset = daily_home_location_specs(
    rolling_windows=rolling_windows,
    aggregation_unit=aggregation_unit,
    mapping_table=mapping_table,
    geom_table=geom_table,
    geom_table_join_column=geom_table_join_column,
    subscriber_subset=None,
    event_types=event_types,
    dates_to_exclude=dates_to_skip,
)
home_location_specs_nosubset

### Resident counts

In [None]:
if include_subsetted:
    for d in home_location_specs_subset:
        # Don't produce resident counts for windows before start_date
        if datetime.date.fromisoformat(d) >= start_date:
            api_specs[f"resident-counts_subset_{d}"] = spatial_aggregate_spec(
                locations=home_location_specs_subset[d],
            )

In [None]:
for d in home_location_specs_nosubset:
    # Don't produce resident counts for windows before start_date
    if datetime.date.fromisoformat(d) >= start_date:
        api_specs[f"resident-counts_nosubset_{d}"] = spatial_aggregate_spec(
            locations=home_location_specs_nosubset[d],
        )

### Home relocations matrix

In [None]:
if calculate_relocations:
    non_empty_windows = sorted(home_location_specs_nosubset.keys())
    for d_from, d_to in zip(non_empty_windows[:-1], non_empty_windows[1:]):
        # Home relocations between consecutive windows (don't produce relocations for 'to' windows before start_date)
        if datetime.date.fromisoformat(d_to) >= start_date:
            if include_subsetted:
                api_specs[
                    f"home-relocations_consecutive_subset_from{d_from}_to{d_to}"
                ] = flows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to],
                    join_type="full outer",
                )
                api_specs[
                    f"home-relocations-in_consecutive_subset_from{d_from}_to{d_to}"
                ] = inflows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to],
                    join_type="inner",
                )
                api_specs[
                    f"home-relocations-out_consecutive_subset_from{d_from}_to{d_to}"
                ] = outflows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to],
                    join_type="inner",
                )
            api_specs[
                f"home-relocations_consecutive_nosubset_from{d_from}_to{d_to}"
            ] = flows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to],
                join_type="full outer",
            )
            api_specs[
                f"home-relocations-in_consecutive_nosubset_from{d_from}_to{d_to}"
            ] = inflows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to],
                join_type="inner",
            )
            api_specs[
                f"home-relocations-out_consecutive_nosubset_from{d_from}_to{d_to}"
            ] = outflows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to],
                join_type="inner",
            )
        # Home relocations between disjoint windows
        d_to_disjoint = str(
            (pd.Timestamp(d_from) + pd.Timedelta(days=window_length)).date()
        )
        if d_to_disjoint in non_empty_windows:
            if include_subsetted:
                api_specs[
                    f"home-relocations_disjoint_subset_from{d_from}_to{d_to_disjoint}"
                ] = flows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to_disjoint],
                    join_type="full outer",
                )
                api_specs[
                    f"home-relocations-in_disjoint_subset_from{d_from}_to{d_to_disjoint}"
                ] = inflows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to_disjoint],
                    join_type="inner",
                )
                api_specs[
                    f"home-relocations-out_disjoint_subset_from{d_from}_to{d_to_disjoint}"
                ] = outflows_spec(
                    from_location=home_location_specs_subset[d_from],
                    to_location=home_location_specs_subset[d_to_disjoint],
                    join_type="inner",
                )
            api_specs[
                f"home-relocations_disjoint_nosubset_from{d_from}_to{d_to_disjoint}"
            ] = flows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to_disjoint],
                join_type="full outer",
            )
            api_specs[
                f"home-relocations-in_disjoint_nosubset_from{d_from}_to{d_to_disjoint}"
            ] = inflows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to_disjoint],
                join_type="inner",
            )
            api_specs[
                f"home-relocations-out_disjoint_nosubset_from{d_from}_to{d_to_disjoint}"
            ] = outflows_spec(
                from_location=home_location_specs_nosubset[d_from],
                to_location=home_location_specs_nosubset[d_to_disjoint],
                join_type="inner",
            )

In [None]:
api_specs

## Run queries

In [None]:
from flowmachine.core.server.query_schemas import FlowmachineQuerySchema

In [None]:
fm_queries = {}
for label, query_spec in api_specs.items():
    print(label)
    # TODO: would be better to skip creating the query specs altogether for non-required aggregates,
    # but I want to add this functionality with minimal changes for now
    if (aggregates_to_run is None) or any(
        label.startswith(agg_name) for agg_name in aggregates_to_run
    ):
        fm_query_obj = FlowmachineQuerySchema().load(query_spec)._flowmachine_query_obj
        if redact:
            fm_queries[label] = (fm_query_obj, query_spec)
        else:
            unredacted_query_obj = fm_query_obj.redaction_target
            fm_queries[label] = (unredacted_query_obj, query_spec)
    else:
        print("Skipped")

In [None]:
futures = [q[0].store(store_dependencies=True) for label, q in fm_queries.items()]
concurrent.futures.wait(futures)

## Get results and write to files

In [None]:
outputs_path.mkdir(exist_ok=True, parents=True)

In [None]:
for label, (query, spec) in fm_queries.items():
    print(label)
    attrs = dict(
        created_at=datetime.datetime.now().isoformat(),
        flowmachine_version=fm.__version__,
        parameters=json.dumps(spec),
        author=author,
        redacted=str(redact),
        query_id=query.query_id,
        excluded_dates=sorted(dates_to_skip),
    )
    if redact:
        filepath = outputs_path / label
    else:
        filepath = outputs_path / f"{label}_unredacted"
    _write_query_result(
        query.get_dataframe(),
        filepath,
        file_format=output_format,
        overwrite=overwrite,
        attrs=attrs,
    )