In [1]:
import os
import attr
import math
from pprint import pprint as pp
from glob import glob
import numpy as np
import pandas as pd

# 20210629_lccf

----

The purpose of this notebook is to generate test data for LCCF performance scaling experiments on the model run in the script `scripts/20210623_lccf.py`.

In this notebook, I will load files `data/lccf/*.csv` and cp the rows to emulate large-scale Safegraph data ingests. The starting point of this notebook looks like:

```bash
$ ls ../data/lccf
census_pop1_rows1.csv   contacts_pop1_rows1.csv travel_pop1_rows1.csv
```

The goal is to make `*rows2.csv`, `*rows4.csv`, etc. files that have 2x, 4x, etc. as many rows as the `*rows1.csv` files.

----

To test performance scaling as a function of population size, I will also make `*_pop2_rows0.csv`, `*_pop4_rows0.csv`, etc. which are equivalent to `*rows0.csv`, but with 2x, 4x, etc. greater contact/travel/census population. `pop1` is implied if no `pop` identifier is included in the file name.

## Work Plan

- Input: how many ZCTAs in the simulation?
- Determine number of census DF replicates we need to concat.
- Concat m replicates of census DF, with unique ZCTA IDs
    - `prefix` column confers uniqueness
- Concat m replicates of travel DF, with unique ZCTA IDs
    - `prefix` column confers uniqueness
    - Remember that this assumes no travel between replicates (cities)

In [2]:
AGE_GROUPS = ('<5', '5-17', '18-49', '50-64', '65+', )
N_ROWS = 10000

In [3]:
!ls ../data/lccf

census_pop1_rows1.csv   contacts_pop1_rows1.csv travel_pop1_rows1.csv


In [4]:
!head ../data/lccf/*.csv

==> ../data/lccf/census_pop1_rows1.csv <==
"","GEOID","NAME","age_bin","group_pop"
"1","75001","ZCTA5 75001","<5",794
"2","75001","ZCTA5 75001","18-49",9420
"3","75001","ZCTA5 75001","5-17",1404
"4","75001","ZCTA5 75001","50-64",2259
"5","75001","ZCTA5 75001","65+",1115
"6","75002","ZCTA5 75002","<5",4227
"7","75002","ZCTA5 75002","18-49",29659
"8","75002","ZCTA5 75002","5-17",15710
"9","75002","ZCTA5 75002","50-64",14706

==> ../data/lccf/contacts_pop1_rows1.csv <==
,age1,age2,daily_per_capita_contacts
0,<5,<5,2.160940833918119
1,5-17,<5,0.5973413405271149
2,18-49,<5,0.3822025191217617
3,50-64,<5,0.3523966597811896
4,65+,<5,0.18975609071541075
5,<5,5-17,2.164117384279739
6,5-17,5-17,8.146970087503425
7,18-49,5-17,2.431391745980527
8,50-64,5-17,1.885100325362032

==> ../data/lccf/travel_pop1_rows1.csv <==
,Unnamed: 0,source,destination,age,n,date,destination_type
30555,30555,76511,76511,<5,35.05384615384615,2020-03-11,local
30556,30556,76511,76511,18-49,472.2846153846154,2020-03-11,loc

Are ZCTAs from census a subset of travel? Vice versa?

In [8]:
@attr.s
class ScalingExp:
    """One scaling experiment."""

    ref_census_csv = attr.ib(type=str)
    ref_travel_csv = attr.ib(type=str)
    ref_contacts_csv = attr.ib(type=str)
    census_usecols = attr.ib(type=list, default=[])
    census_travel = attr.ib(type=list, default=[])
    census_contacts = attr.ib(type=list, default=[])
    pop_factor = attr.ib(type=int, default=1)
    n_zcta = attr.ib(type=int, default=None)
    
    def get_m_replicates(self):
        n_zcta = None
    
    def get_extended_travel(self) -> pd.DataFrame:
        c = self.census
        t = self.travel
        
        return t.merge(c, left_on='source', right_on='GEOID')
        
        
    @property
    def census(self, force_refresh=False) -> pd.DataFrame:
        if hasattr(self, '_census') and not force_refresh:
            return self._census
        else:
            return self.parse_census()

    @property
    def contacts(self, force_refresh=False) -> pd.DataFrame:
        if hasattr(self, '_contacts') and not force_refresh:
            return self._contacts
        else:
            return self.parse_contacts()

    @property
    def travel(self, force_refresh=False) -> pd.DataFrame:
        if hasattr(self, '_travel') and not force_refresh:
            return self._travel
        else:
            return self.parse_travel()

    def parse_census(self) -> pd.DataFrame:
        df = pd.read_csv(self.ref_census_csv, usecols=self.census_usecols)
        assert not df.isna().any().any(), ('found null values in df', df.isna().any())
        # df.rename(columns={'GEOID': 'vertex', 'age_bin': 'age_group'}, inplace=True)
        # df.set_index(['vertex', 'age_group'], inplace=True)
        # filter to zcta that we want to model in the simulation (vertex coords)
        self._census = df
        return df

    def parse_contacts(self) -> pd.DataFrame:
        self._contacts = pd.read_csv(self.ref_contacts_csv, usecols=self.contacts_usecols)
        return self._contacts

    def parse_travel(self) -> pd.DataFrame:
        self._travel = pd.read_csv(self.ref_travel_csv, usecols=self.travel_usecols)
        return self._travel

In [9]:
exp = ScalingExp(
    ref_census_csv='../data/lccf/census_pop1_rows1.csv',
    ref_travel_csv='../data/lccf/travel_pop1_rows1.csv',
    ref_contacts_csv='../data/lccf/contacts_pop1_rows1.csv',
)
exp.get_extended_travel()

TypeError: __init__() missing 3 required positional arguments: 'census_usecols', 'census_travel', and 'census_contacts'

In [None]:
schemas = {
    'census': {
        'value_col': None,
        'usecols': ("GEOID", "age_bin", "group_pop"),
        'glob': list(),
        'keys': None
    },
    'contacts': {
        'value_col': None,
        'usecols': ('age1', 'age2', 'daily_per_capita_contacts'),
        'glob': list(),
        'keys': None
    },
    'travel': {
        'value_col': None,
        'usecols': ('source', 'destination', 'age', 'n', 'date', 'destination_type'),
        'glob': list(),
        'keys': None
    },
}

In [None]:
def create_scaled_csvs(dir_fp: str, scales: tuple = (2, 4, 8), verbose=2):
    assert os.path.isdir(dir_fp)
    for schema in schemas:
        query = f"{schema}*.csv"
        hits = glob(os.path.join(dir_fp, query))
        assert len(hits) == 1, f"found {len(hits)} hits for glob query {query}, expected 1"
        schemas[schema]['glob'] = hits
    if verbose:
        print(f'schemas:')
        pp(schemas)

In [None]:
create_scaled_csvs('../data/lccf')

**Question**: what should we use as the `value_col` for each schema? Also gives us a reminder of what the schema is. Let's `head`...

We're going to need more than just cp rows. Specifically:
- Census data probably needs unique `primaryKey == [GEOID, age_bin]`
- Travel data needs n^2 rows where n is the number of GEOIDs

So what we really mean by "scaling rows" is that we're scaling by `n`, number of GEOIDs in the census CSV. Easy way to do this is just prepend non-zero integer to GEOID.

----

## Parsers for each schema

In [None]:
def parse_census(csv_fp: str) -> pd.DataFrame:
    df = pd.read_csv(csv_fp, usecols=schemas['census']['usecols'])
    assert not df.isna().any().any(), ('found null values in df', df.isna().any())
    # df.rename(columns={'GEOID': 'vertex', 'age_bin': 'age_group'}, inplace=True)
    # df.set_index(['vertex', 'age_group'], inplace=True)
    # filter to zcta that we want to model in the simulation (vertex coords)
    return df

In [None]:
def parse_contacts(csv_fp: str) -> pd.DataFrame:
    return pd.read_csv(csv_fp, usecols=schemas['contacts']['usecols'])

In [None]:
def parse_travel(csv_fp: str) -> pd.DataFrame:
    return pd.read_csv(csv_fp, usecols=schemas['travel']['usecols'])

In [None]:
for schema in schemas:
    schemas[schema]['parser'] = globals()[f'parse_{schema}']
pp(schemas)

## Extend census data

In [None]:
def get_extended_census(census_df: pd.DataFrame, n: int = 0) -> pd.DataFrame:
    """Returns a pd.DataFrame with `n` unique GEOID-like integers.
    Iterates over GEOIDs in `census_df`, prepending positive integer
    to generate unique IDs.
    """
    assert not 'prefix' in census_df.columns
    n = n if n else len(census_df)
    out_df = census_df
    out_df['prefix'] = 0
    i = 0
    while len(out_df) < n:
        census_df['prefix'] = i
        out_df = pd.concat((out_df,) + (census_df,) * i)
        i += 1
    out_df['unique_GEOID'] = (out_df['prefix'].astype(str) + 
                              out_df['GEOID'].astype(str)).astype(int)
    # del out_df['prefix']
    assert out_df['unique_GEOID'].unique().all()
    return out_df.iloc[:n]

In [None]:
census_df = parse_census('../data/lccf/census_pop1_rows1.csv')
pp(len(census_df))
extended_census = get_extended_census(census_df, n=N_ROWS)
schemas['census']['keys'] = extended_census['GEOID']
extended_census

## Generate unique pairwise combinations of GEOIDs for travel data

We assume the worst case scenario here: a length n^2 index for n GEOIDs. Multiply by any other demographic coordinates, in this case, age group. In reality, probably a few less, since the graph of travel between GEOIDs is not complete.

In [None]:
def get_unique_pairs(geoids: pd.Series, age_groups: pd.Series) -> pd.DataFrame:
    return (geoids
            .to_frame(name='source')
            # like itertools.product
            .merge(geoids.to_frame(name='destination'), how='cross')
            .merge(age_groups.to_frame(name='age'), how='cross')
           )

Testing:

In [None]:
ref_travel

In [None]:
ref_travel.merge(census_df, left_on='source', right_on='unique_GEOID')

In [None]:
extended_census

In [None]:
test_get_pairs = get_unique_pairs(schemas['census']['keys'], pd.Series(AGE_GROUPS))
test_get_pairs

In [None]:
expected_rows = (N_ROWS**2)*5
print(f'We expect there to be {expected_rows} rows in the above frame')
assert test_get_pairs.shape[0] == expected_rows

## Fabricate Travel Matrix

In [None]:
def get_extended_travel(ref_travel: pd.DataFrame, census_df: pd.DataFrame) -> pd.DataFrame:
    # get unique pairs
    # construct DF
    out = get_unique_pairs(census_df['unique_GEOID'], pd.Series(AGE_GROUPS))
    out['n'] = 0
    # set index for both DFs
#     ref_travel = ref_travel.set_index(['source', 'destination'], inplace=False)
#     census_df = (census_df
#                  .rename(columns={'unique_GEOID': 'source'}, inplace=False)
#                  .set_index(['source'], inplace=False))
    
    # merge on index
    # fillna zero if ref_travel index DNE
    return census_df
    return (ref_travel
            .merge(census_df, left_on='source', right_on='unique_GEOID')
           )

Testing:

In [None]:
ref_travel = parse_travel('../data/lccf/travel_pop1_rows1.csv')
extended_travel = get_extended_travel(ref_travel, extended_census)
extended_travel.unique_GEOID

In [None]:
ref_travel.source

In [None]:
ref_travel

# Sandbox

In [None]:
schemas['census']['keys']

In [None]:
df = parse_census('../data/lccf/census_pop1_rows1.csv')

In [None]:
df.describe()

In [None]:
parse_travel('../data/lccf/travel_pop1_rows1.csv')['n'].describe()