In [None]:
import os
import json
import glob
import pandas as pd
import numpy as np
import seaborn as sns
import dask.dataframe as dd
from dask.distributed import LocalCluster
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.cm import tab10
from matplotlib.ticker import PercentFormatter

In [None]:
BASE_OUT_DIR = "/biostats_share/hillandr/data/WW_Mobility_2025_04_04/"
os.makedirs(BASE_OUT_DIR, exist_ok=True)
os.makedirs(os.path.join(BASE_OUT_DIR, "processed"), exist_ok=True)

## CBG->Sewershed Mapping

In [None]:
CBG_MAP_FILE = os.path.join(BASE_OUT_DIR, "processed/2025_03_03_geoid_to_sewershed.json")

In [None]:
overlap = pd.read_csv("/biostats_share/hillandr/data/WW_Mobility_2024_11_19/new_overlap_jan2025.csv", dtype={"GEOID": str}, index_col="GEOID")
overlap["pct_covered"] = overlap["PERCENTAGE"]/100
winner_sewershed = overlap.groupby("GEOID", group_keys=False).apply(lambda row: row.loc[(row.pct_covered == row.pct_covered.max()) & (row.pct_covered >= 0.002)])
geoid_to_sewershed = {k:v for k,v in zip(winner_sewershed.index, winner_sewershed.wwtp)}

In [None]:
with open(CBG_MAP_FILE, "w") as f:
    json.dump(geoid_to_sewershed, f)

## Monthly Patterns

In [None]:
MONTHLY_PATTERNS_FILES = "/biostats_share/hillandr/data/WW_Mobility_2025_03_03/raw_neigh_patterns/*.csv"
MONTHLY_VISIT_COUNTS_FILE = "/biostats_share/hillandr/data/WW_Mobility_2025_03_03/processed/colorado_visit_devices_sum_agg.csv"

In [None]:
cluster = LocalCluster()
client = cluster.get_client()

In [None]:
def get_home_areas(group: pd.DataFrame):
    ret_group = group.copy()
    ret_group["device_home_areas"] = ret_group["device_home_areas"].apply(json.loads)
    return ret_group

In [None]:
# Read GEOID -> Sewershed name mapping file.
with open(CBG_MAP_FILE, "r") as f:
    geoid_to_sewershed = json.load(f)

In [None]:
# Read CSV
month_patterns = dd.read_csv(MONTHLY_PATTERNS_FILES, dtype={"AREA": str})
# Correct the CBG GeoIDs
month_patterns["AREA"] = month_patterns["AREA"].where(~month_patterns["AREA"].str.startswith("8"), "0"+month_patterns["AREA"])
# Map CBG to Sewershed
month_patterns["AREA_SEWERSHED"] = month_patterns["AREA"].apply(lambda v: geoid_to_sewershed.get(v), meta=("AREA", str))
# Any rows which don't map to Sewershed we will drop.
month_patterns = month_patterns.dropna(subset=["AREA_SEWERSHED"])

# Day Patterns

In [None]:
def parse_stops_by_day(list_str: str):
    return json.loads(list_str)

In [None]:
DAY_VISIT_SUM_OUTPUT_PATH = os.path.join(BASE_OUT_DIR, "processed/2025_04_04_daily_visits_sum.csv")

In [None]:
month_patterns["STOPS_BY_DAY_L"] = month_patterns["STOPS_BY_DAY"].map(parse_stops_by_day, na_action="ignore", meta=("STOPS_BY_DAY", object))
month_patterns["DAY"] = month_patterns.apply(lambda row: pd.date_range(row.DATE_RANGE_START, row.DATE_RANGE_END, freq="D", inclusive="left"), axis=1, meta=("DAY", object))

In [None]:
# Explode into one row per combination of source and destination sewershed.
day_patterns_sewershed = month_patterns[["AREA", "AREA_SEWERSHED", "STOPS_BY_DAY_L", "DATE_RANGE_START", "DATE_RANGE_END", "DAY"]].explode(column=["DAY", "STOPS_BY_DAY_L"])
day_patterns_sewershed["STOPS_BY_DAY_L"] = day_patterns_sewershed["STOPS_BY_DAY_L"].astype(float)

In [None]:
# Calculate visitor device counts
day_visit_counts_sum = day_patterns_sewershed.groupby(["AREA_SEWERSHED","DATE_RANGE_START", "DAY"])["STOPS_BY_DAY_L"].agg("sum")

In [None]:
if os.path.isfile(DAY_VISIT_SUM_OUTPUT_PATH):
    os.remove(DAY_VISIT_SUM_OUTPUT_PATH)
day_visit_counts_sum.to_csv(DAY_VISIT_SUM_OUTPUT_PATH, single_file=True)

# Extract Monthly Visit/Home Device Counts

In [None]:
# Parse DEVICE_HOME_AREAS as JSON
month_patterns["DEVICE_HOME_AREAS_D"] = month_patterns["DEVICE_HOME_AREAS"].apply(json.loads, meta=("DEVICE_HOME_AREAS", object))
# Create columns for Source Area and Source Area Device Count
month_patterns["SOURCE_AREA"] = month_patterns["DEVICE_HOME_AREAS_D"].map(lambda d: list(d.keys()), meta=("DEVICE_HOME_AREAS_D", object))
month_patterns["SOURCE_AREA_DEVICE_COUNT"] = month_patterns["DEVICE_HOME_AREAS_D"].map(lambda d: list(d.values()), meta=("DEVICE_HOME_AREAS_D", object))

In [None]:
# Explode into one row per combination of source and destination sewershed.
month_patterns_sewershed = month_patterns[["AREA", "AREA_SEWERSHED", "DATE_RANGE_START", "DATE_RANGE_END", "SOURCE_AREA", "SOURCE_AREA_DEVICE_COUNT"]].explode(column=["SOURCE_AREA", "SOURCE_AREA_DEVICE_COUNT"])
# Map Source Area CBG IDs to Sewershed.
month_patterns_sewershed["SOURCE_AREA_SEWERSHED"] = month_patterns_sewershed["SOURCE_AREA"].apply(lambda v: geoid_to_sewershed.get(v), meta=("SOURCE_AREA", str))
month_patterns_sewershed = month_patterns_sewershed.dropna(subset=["SOURCE_AREA"])
# Convert the count to float so Dask knows it's a float.
month_patterns_sewershed["SOURCE_AREA_DEVICE_COUNT"] = month_patterns_sewershed["SOURCE_AREA_DEVICE_COUNT"].astype(float)

### Cases for device counting
1. Device is in home area, and sewershed (`area` == `visitor_home_area`) - Count as non-visitor
2. Device is outside home area but in home sewershed (`area` != `visitor_home_area` AND `sewershed` == `visitor_home_sewershed`) - Don't count (potential to double-count devices)
3. Device is outside home area and home sewershed (`area` != `visitor_home_area` AND `sewershed` != `visitor_home_sewershed`) - Count as visitor.

In [None]:
month_patterns_sewershed["IS_HOME_DEVICE"] = month_patterns_sewershed["AREA"] == month_patterns_sewershed["SOURCE_AREA"] 
month_patterns_sewershed["IS_SAME_SEWERSHED_DEVICE"] = (~month_patterns_sewershed["IS_HOME_DEVICE"]) & (month_patterns_sewershed["AREA_SEWERSHED"] == month_patterns_sewershed["SOURCE_AREA_SEWERSHED"])
month_patterns_sewershed["IS_VISITOR_DEVICE"] = (~month_patterns_sewershed["IS_HOME_DEVICE"]) & (~month_patterns_sewershed["IS_SAME_SEWERSHED_DEVICE"])

In [None]:
# Calculate visitor device counts
visitor_counts_all = month_patterns_sewershed.loc[month_patterns_sewershed.IS_VISITOR_DEVICE]
agg_visitor_counts = visitor_counts_all.groupby(["AREA_SEWERSHED","DATE_RANGE_START","SOURCE_AREA"])["SOURCE_AREA_DEVICE_COUNT"].sum()
sewershed_visitor_counts = agg_visitor_counts.groupby(["AREA_SEWERSHED", "DATE_RANGE_START"]).sum()

In [None]:
if os.path.isfile(MONTHLY_VISIT_COUNTS_FILE):
    os.remove(MONTHLY_VISIT_COUNTS_FILE)
sewershed_visitor_counts.to_csv(MONTHLY_VISIT_COUNTS_FILE, single_file=True)

## Monthly Supplement

In [None]:
MONTHLY_SUPPLEMENT_FILES = glob.glob("/biostats_share/hillandr/data/WW_Mobility_2025_03_03/raw_neigh_patterns_meta/*.csv.gz")
MONTHLY_SUPPLEMENT_OUTPUT_FILE = "/biostats_share/hillandr/data/WW_Mobility_2025_03_03/processed/colorado_monthly_supplement.csv"

In [None]:
# Delete existing file
if os.path.isfile(MONTHLY_SUPPLEMENT_OUTPUT_FILE):
    print(f"Deleting existing file '{MONTHLY_SUPPLEMENT_OUTPUT_FILE}'...")
    os.remove(MONTHLY_SUPPLEMENT_OUTPUT_FILE)

# Process files into single file.
is_first_file = True
for i, supp_file_path in enumerate(MONTHLY_SUPPLEMENT_FILES):
    tmp_df = pd.read_csv(supp_file_path, dtype={"CENSUS_BLOCK_GROUP": str})
    tmp_df = tmp_df.loc[(tmp_df.ISO_COUNTRY_CODE == "US") & (tmp_df.REGION == "co")]
    if len(tmp_df) == 0:
        print(f"Skipping {supp_file_path}...")
        continue
    tmp_df["DATE_RANGE_START"] = tmp_df[["YEAR", "MONTH"]].apply(lambda row: f"{row.YEAR:02d}-{row.MONTH:02d}-01", axis=1)
    tmp_df.rename(columns={"CENSUS_BLOCK_GROUP": "AREA"},inplace=True)
    tmp_df["AREA_SEWERSHED"] = tmp_df["AREA"].map(geoid_to_sewershed)
    tmp_df.dropna(subset=["AREA_SEWERSHED"], inplace=True)
    tmp_df.to_csv(MONTHLY_SUPPLEMENT_OUTPUT_FILE, mode="a", header=is_first_file, index=False)
    is_first_file=False
    print(f"\r{i+1}/{len(MONTHLY_SUPPLEMENT_FILES)} complete...", end="")
print()

## Combined Monthly Visitation/Home Devices data

In [None]:
MONTHLY_VISITATION_JOINED = "/biostats_share/hillandr/data/WW_Mobility_2025_04_04/processed/2025_04_04_monthly_devices.csv"

In [None]:
month_visitation = pd.read_csv(MONTHLY_VISIT_COUNTS_FILE, parse_dates=["DATE_RANGE_START"])
month_supplement = pd.read_csv(MONTHLY_SUPPLEMENT_OUTPUT_FILE, parse_dates=["DATE_RANGE_START"])

In [None]:
month_supp_group = month_supplement[["AREA_SEWERSHED", "DATE_RANGE_START", "NUMBER_DEVICES_RESIDING", "NUMBER_DEVICES_PRIMARY_DAYTIME"]].groupby(["AREA_SEWERSHED", "DATE_RANGE_START"]).sum().reset_index()

In [None]:
month_data_joined = pd.merge(month_visitation, month_supp_group, on=["AREA_SEWERSHED", "DATE_RANGE_START"])

In [None]:
n_months = (12*5)+1
n_months * 71

In [None]:
month_data_joined.SOURCE_AREA_DEVICE_COUNT.isna().any()

In [None]:
month_data_joined.sort_values(["AREA_SEWERSHED", "DATE_RANGE_START"], inplace=True)

In [None]:
month_data_joined.to_csv(MONTHLY_VISITATION_JOINED, index=False)