In [1]:
import glob
from tqdm import tqdm
import pickle
import json
from collections import Counter
import pandas as pd
import numpy as np

In [2]:
from google.colab import drive as mountGoogleDrive 
mountGoogleDrive.mount('/content/drive')

Mounted at /content/drive


In [3]:
in_dir = r'/content/drive/MyDrive/Research/Safegraph/scratch/20221205/'
out_dir = r'/content/drive/MyDrive/Research/Safegraph/scratch/20221205/'

In [4]:
def print_bad_files(in_dir):
    """Combines pickle files of lists of json files that didn't load
    """
    pickle_files = glob.glob(f'{in_dir}/*.pickle')
    bad_files = []
    for picklepath in pickle_files:
        with open(picklepath, 'rb') as f:
            bad_files.append(pickle.load(f))
    bad_files = [item for sublist in bad_files for item in sublist]
    print(bad_files)


def combine_batch_csv(dir):
    """Combines Safegraph dataframes into one
    """
    csv_files = glob.glob(f'{dir}/*.csv')

    df_list = []

    for filename in tqdm(csv_files):
        df = pd.read_csv(filename, header=0, index_col=0)
        df = df[~df['node.safegraph_core.postal_code'].isnull()]
        df = df[~df['poi_cbg'].isnull()]
        df_list.append(df)

    df = pd.concat(df_list, axis=0)

    return df

In [5]:
print_bad_files(f"{in_dir}/visitor_home_aggregation/")

['New York,NYfrom2021-06-14to2021-06-21PART51.json', 'New York,NYfrom2021-07-05to2021-07-12PART138.json', 'New York,NYfrom2021-07-12to2021-07-19PART7.json', 'New York,NYfrom2021-11-01to2021-11-08PART49.json', 'New York,NYfrom2021-11-08to2021-11-15PART70.json']


In [6]:
df = combine_batch_csv(f"{in_dir}/visitor_home_aggregation/")

100%|██████████| 9/9 [01:23<00:00,  9.29s/it]


In [7]:
df['node.safegraph_core.postal_code'] = df['node.safegraph_core.postal_code'].apply(str)

In [8]:
zcta_map = pd.read_csv(r'https://raw.githubusercontent.com/nychealth/coronavirus-data/master/Geography-resources/ZCTA-to-MODZCTA.csv',
                       dtype={'ZCTA': str, 'MODZCTA': str})
df = df.merge(zcta_map, left_on='node.safegraph_core.postal_code', right_on='ZCTA', how='left')

In [9]:
uhf_map = pd.read_csv('https://raw.githubusercontent.com/nychealth/coronavirus-data/master/Geography-resources/UHF_resources/uhf-to-zip%20crosswalk.csv')
modzcta_map = {}

for uhf_id in np.unique(uhf_map['UHF_id']):
  modzcta_ids = uhf_map.loc[uhf_map['UHF_id'] == uhf_id,'Zipcodes'].values[0].split(",")
  for m in modzcta_ids:
    modzcta_map[int(m)] = uhf_id

modzcta_map = pd.DataFrame.from_dict(modzcta_map, orient='index', columns=['UHF_id']).reset_index()
modzcta_map = modzcta_map.rename(columns={"index": "MODZCTA"})
modzcta_map['MODZCTA'] = modzcta_map['MODZCTA'].apply(str)
modzcta_map['UHF_id'] = modzcta_map['UHF_id'].apply(str)

In [10]:
df = df.merge(modzcta_map, on='MODZCTA', how='left')

WEIRD there's some zip codes zip codes in NYC that aren't mapped to MODZCTAs

In [11]:
np.unique(df[df['MODZCTA'].isnull()]['node.safegraph_core.postal_code'])

array(['10008', '10041', '10045', '10048', '10055', '10078', '10079',
       '10080', '10104', '10105', '10106', '10107', '10108', '10109',
       '10116', '10118', '10120', '10121', '10122', '10123', '10150',
       '10151', '10155', '10156', '10158', '10163', '10166', '10175',
       '10176', '10178', '10184', '10275', '10281', '10286', '10549',
       '10927', '11019', '11026', '11028', '11030', '11096', '11242',
       '11249', '11250', '11252', '11439', '11740', '11758', '11764',
       '11788', '11978', '12065', '12345', '12489', '12723', '12734',
       '12977', '13165', '13214', '13323', '13413', '13417', '13424',
       '13495', '14413', '14516', '14534', '14870', '7002'], dtype=object)

In [12]:
df.shape

(5360882, 10)

In [13]:
df[df['UHF_id'].isnull()].shape

(44242, 10)

In [14]:
44242 / 5360882

0.008252746469704052

In [15]:
df = df[~df['UHF_id'].isnull()]

In [16]:
df['start'] = pd.to_datetime(df.start, format='%Y-%m-%d')
df['end'] = pd.to_datetime(df.end, format='%Y-%m-%d')

df['poi_cbg'] = df['poi_cbg'].astype(int).astype(str)
df['UHF_id'] = df['UHF_id'].astype(int).astype(str)

In [17]:
df.head()

Unnamed: 0,node.placekey,node.safegraph_core.postal_code,poi_cbg,visitor_home_cbgs,visitor_home_aggregation,start,end,ZCTA,MODZCTA,UHF_id
0,22p-222@627-s4w-3dv,10027,360610222001,"{'360050449011': 4, '360610222001': 4}",{'36061020800': 4},2018-01-01,2018-01-08,10027,10027,302
1,22h-222@627-s8j-975,10001,360610091002,{'360811301004': 4},{'36061023900': 4},2018-01-01,2018-01-08,10001,10001,306
2,222-225@627-wbz-3qz,10004,360610013002,{},{},2018-01-01,2018-01-08,10004,10004,310
3,22d-223@627-s8q-tjv,10019,360610104001,"{'090091404001': 4, '360610160021': 4, '340030...","{'34025802800': 4, '06037310100': 4, '36103158...",2018-01-01,2018-01-08,10019,10019,306
4,22c-225@627-s4n-skf,10128,360610146023,"{'360050338001': 4, '360610152005': 4, '360610...",{},2018-01-01,2018-01-08,10128,10128,305


In [18]:
tract_zip_map = pd.read_csv(r'/content/drive/MyDrive/Research/Safegraph/scratch/TRACT_ZIP_122021.csv',
                            dtype={'tract': str, 'zip': str})
tract_zip_map = tract_zip_map.drop_duplicates(subset='tract', keep="first")
tract_zip_map = tract_zip_map[['tract', 'zip']].rename(columns={"zip": "ZCTA"})

In [19]:
weeks = np.unique(df['end'])

import os
files = [os.path.basename(x) for x in glob.glob(f"{out_dir}/mobility/*.csv")]
pd.to_datetime(pd.Series([file[0:10] for file in files]), format='%Y-%m-%d')
done = np.array([file[0:10] for file in files], dtype='datetime64')

weeks = [week for week in weeks if week not in done]

In [20]:
len(weeks)

5

In [21]:
batchsize = 25
batch_list = []
for i in range(0, len(weeks), batchsize):
    batch = weeks[i:i+batchsize]
    batch_list.append(batch)

In [22]:
len(batch_list)

1

In [23]:
with open(f'{out_dir}/mobility/weeks.pickle', 'wb') as handle:
  pickle.dump(weeks, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [24]:
def create_mobility_matrix(weeks):
  for week in tqdm(weeks):
    week_df = df.loc[df['end'] == week, ['UHF_id','visitor_home_aggregation']]
    week_df['visitor_home_aggregation'] = week_df['visitor_home_aggregation'].str.replace("\'", "\"")

    mobility_df = pd.DataFrame()
    for index, row in week_df.iterrows():
      row_dict = json.loads(row['visitor_home_aggregation'])
      row_df = pd.DataFrame(list(row_dict.items()), columns=['tract','visitor_home_aggregation'])
      row_df['destination'] = row['UHF_id']
      mobility_df = pd.concat([mobility_df, row_df])

    mobility_df = mobility_df.merge(tract_zip_map, on='tract', how='left')
    mobility_df = mobility_df.merge(zcta_map, on='ZCTA', how='left')
    mobility_df = mobility_df.merge(modzcta_map, on='MODZCTA', how='left')

    mobility_df = mobility_df[~mobility_df['UHF_id'].isnull()]

    mobility_df = mobility_df.groupby(['destination','UHF_id'])['visitor_home_aggregation'].sum().reset_index()

    mobility_df['end'] = week
    mobility_df = mobility_df.rename(columns={"UHF_id":"origin"})

    mobility_df.to_csv(f"{out_dir}/mobility/{pd.to_datetime(week).date()}_mobility.csv", index=False)

In [25]:
create_mobility_matrix(batch_list[0])

100%|██████████| 5/5 [14:15<00:00, 171.04s/it]


In [None]:
create_mobility_matrix(batch_list[1])

100%|██████████| 25/25 [35:23<00:00, 84.92s/it]


In [None]:
create_mobility_matrix(batch_list[2])

 76%|███████▌  | 19/25 [29:46<08:28, 84.69s/it]

In [None]:
create_mobility_matrix(batch_list[3])