In [14]:
import pandas as pd
import numpy as np
import glob

import os, gzip, shutil

from tqdm import tqdm

import polars as pl

import matplotlib.pyplot as plt
import itertools

import time

In [15]:
in_dir = "/Volumes/TOSHIBA EXT/2020/03/**/*.gz"
files = glob.glob(in_dir, recursive=True)
print(len(files))

116


In [16]:
print(files[0])

/Volumes/TOSHIBA EXT/2020/03/02/SAFEGRAPH/WP/core_poi-geometry-patterns-part1.csv.gz


In [17]:
fields = ['date_range_start', 'date_range_end', 'postal_code', 'visitor_home_aggregation', 'visits_by_day']
start_time = time.time()
df = pd.read_csv(files[0], usecols=fields)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"pd.concat took {elapsed_time:.4f} seconds")

pd.concat took 4.2405 seconds


In [24]:
na_rows = sum(df.isna().sum(axis=1) > 0)
print(f"{na_rows} ({na_rows / df.shape[0] * 100 :.2f}%) rows out of {df.shape[0]} have NAs")

338382 (65.61%) rows out of 515778 have NAs


In [27]:
515778 - 338382

177396

In [28]:
df = df.dropna()

In [32]:
sum(df.visitor_home_aggregation == '{}')

35818

In [33]:
sum(df.visitor_home_aggregation == '')

0

In [48]:
import argparse
import pandas as pd
import json
from tqdm import tqdm
import time
import glob
import sys
import os

# user-defined arguments
batchsize = 5

# Load file paths
in_dir = "/Volumes/TOSHIBA EXT/2020/03/**/*.gz"
output_dir = "../data/processed/safegraph/hardrive/"
files = glob.glob(in_dir, recursive=True)

# Split files into batches
batch_list = [files[i:i+batchsize] for i in range(0, len(files), batchsize)]

# Ensure the batch index is within range
# if batch_index >= len(batch_list):
#     print(f'Batch index {batch_index} is out of range.')
#     exit(1)

# Load mappings
zip_county_map = pd.read_csv('../data/raw/ZIP_COUNTY_CROSSWALK.csv')
tract_zip_map = pd.read_csv('../data/raw/ZIP_TRACT_032020.csv')

# Process the mappings
zip_county_map = zip_county_map.sort_values(by=['ZIP', 'TOT_RATIO'], ascending=False)
zip_county_map = zip_county_map.drop_duplicates(subset='ZIP', keep='first')
zip_county_map['ZIP'] = zip_county_map['ZIP'].astype(str)
zip_county_map['COUNTY'] = zip_county_map['COUNTY'].astype(str)
tract_zip_map['ZIP'] = tract_zip_map['ZIP'].astype(str)
tract_zip_map['TRACT'] = tract_zip_map['TRACT'].astype(str)
tract_county_map = zip_county_map.merge(tract_zip_map[['ZIP', 'TRACT']], left_on='ZIP', right_on='ZIP', how='right')

# Function to process each row
def pandas_sum_values_or_zero(x):
    if pd.isna(x["visitor_home_aggregation"]):
        pass
    else:
        row_dict = json.loads(x["visitor_home_aggregation"])
        df = pd.DataFrame(list(row_dict.items()), columns=['tract', 'visitor_home_aggregation'])
        df['COUNTY_DEST'] = x['COUNTY']
        df['date_range_start'] = x['date_range_start']
        df['date_range_end'] = x['date_range_end']
        days = json.loads(x['visits_by_day'])
        for i in range(len(days)):
            df[f'VISITS_DAY_{i}'] = days[i]
        return df


fields = ['date_range_start', 'date_range_end', 'postal_code', 'visitor_home_aggregation', 'visits_by_day']

def process_batch(i):
    # Select the batch of files to process
    files = batch_list[i]
    li = []
    print('reading files')
    for file in tqdm(files, file=sys.stdout):
        df = pd.read_csv(file, usecols=fields)
        # drop na rows
        na_rows = sum(df.isna().sum(axis=1) > 0)
        nrows = df.shape[0]
        print(f"{na_rows} ({na_rows / nrows * 100 :.2f}%) rows out of {nrows} have NAs")
        df = df.dropna()
        # drop empty strings
        df = df.loc[df["visitor_home_aggregation"] != '{}']
        df = df.loc[df["visits_by_day"] != ""]
        df = df.loc[df["visitor_home_aggregation"] != ""]
        df = df.merge(zip_county_map, left_on='postal_code', right_on='ZIP', how='left')
        li.append(df)
    batch_df = pd.concat(li, axis=0, ignore_index=True)

    print('unloading json visitor_home_aggregation')
    tqdm.pandas(desc="json unloaded", file=sys.stdout)
    df_list = batch_df.progress_apply(pandas_sum_values_or_zero, axis=1, raw=False, result_type="reduce")
    
    print('concatentating data frames')
    start_time = time.time()
    sum_df = pd.concat(df_list.tolist())
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"pd.concat took {elapsed_time:.4f} seconds")

    print('merging county info')
    start_time = time.time()
    sum_df = sum_df.merge(tract_county_map[['TRACT', 'COUNTY']], left_on='tract', right_on='TRACT', how='left')
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"pd.merge took {elapsed_time:.4f} seconds")
    print('percent rows with no tract', sum(sum_df.TRACT.isna()) / sum_df.shape[0])

    sum_df.drop(['tract', 'TRACT'], axis=1, inplace=True)
    sum_df.rename(columns={'COUNTY' : 'COUNTY_ORIG'}, inplace=True)
    print('pd.groupby')
    start_time = time.time()
    sum_df = sum_df.groupby(['date_range_start', 'date_range_end', 'COUNTY_ORIG', 'COUNTY_DEST']).sum().reset_index()
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"pd.groupby took {elapsed_time:.4f} seconds")
    
    os.makedirs(output_dir, exist_ok=True)
    output_file = os.path.join(output_dir, f'batch_{i}.csv')
    print(f'saving batch {i}')
    sum_df.to_csv(output_file, index=False)

    return sum_df

In [49]:
# Process the batch
process_batch(0)

reading files
  0%|                                                                                             | 0/5 [00:00<?, ?it/s]338382 (65.61%) rows out of 515778 have NAs
 20%|█████████████████                                                                    | 1/5 [00:04<00:16,  4.24s/it]339671 (65.75%) rows out of 516621 have NAs
 40%|██████████████████████████████████                                                   | 2/5 [00:12<00:19,  6.58s/it]339599 (65.87%) rows out of 515591 have NAs
 60%|███████████████████████████████████████████████████                                  | 3/5 [00:16<00:11,  5.53s/it]339001 (65.81%) rows out of 515145 have NAs
 80%|████████████████████████████████████████████████████████████████████                 | 4/5 [00:20<00:05,  5.02s/it]338129 (65.72%) rows out of 514532 have NAs
100%|█████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:25<00:00,  5.06s/it]
unloading json visitor_home_aggregation
json 

Unnamed: 0,date_range_start,date_range_end,COUNTY_ORIG,COUNTY_DEST,visitor_home_aggregation,VISITS_DAY_0,VISITS_DAY_1,VISITS_DAY_2,VISITS_DAY_3,VISITS_DAY_4,VISITS_DAY_5,VISITS_DAY_6
0,2020-03-02T00:00:00+10:00,2020-03-09T00:00:00+10:00,15003,60010,8,58,74,90,108,104,136,94
1,2020-03-02T00:00:00+10:00,2020-03-09T00:00:00+10:00,55059,60010,32,368,416,384,364,500,404,308
2,2020-03-02T00:00:00-05:00,2020-03-09T00:00:00-04:00,10001,10001,34599,153088,225436,158693,221919,242704,251697,204858
3,2020-03-02T00:00:00-05:00,2020-03-09T00:00:00-04:00,10001,10003,5302,97761,103734,98641,105434,122288,160216,111946
4,2020-03-02T00:00:00-05:00,2020-03-09T00:00:00-04:00,10001,10005,4598,23187,21490,22487,23216,25771,38430,24080
...,...,...,...,...,...,...,...,...,...,...,...,...
304225,2020-03-02T00:00:00-10:00,2020-03-09T00:00:00-10:00,56005,15003,4,23,26,26,27,28,49,67
304226,2020-03-02T00:00:00-10:00,2020-03-09T00:00:00-10:00,56021,15003,8,338,4040,3932,3868,4076,4152,4042
304227,2020-03-02T00:00:00-10:00,2020-03-09T00:00:00-10:00,56025,15001,4,18,11,7,12,17,16,18
304228,2020-03-02T00:00:00-10:00,2020-03-09T00:00:00-10:00,56039,15007,4,16,26,19,19,29,39,27
