In [1]:
import pandas as pd
import numpy as np
import json

import time

import os
import sys
from glob import glob
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode
from pyspark.sql import functions as sf
from pyspark.sql.types import *

from matplotlib import pyplot as plt
import statsmodels.formula.api as smf
from functools import reduce



# suppress warnings for readability
import warnings
warnings.filterwarnings('ignore')
# turn off scientific notation in pandas objects
pd.set_option('display.float_format', lambda x: '%.8f' % x)

In [2]:
# Initialize spark session
spark = SparkSession.builder.appName('SafeGraph').config("spark.driver.memory", "15g").getOrCreate()

In [3]:
# read in 2019 ACS 5-year population data and SafeGraph's home_panel
census_dir = "/Users/esrieves/Documents/school/Research/foot_traffic/data/census/safegraph_open_census_data_2019/data/cbg_b01.csv"

In [None]:
## FUNCTIONS

In [4]:
## Read patterns data + clean
def read_patterns_data(all_csv_files):
    
    dtypes = {'poi_cbg':str,'naics_code':str}
    df_files = (pd.read_csv(f, compression="gzip",converters=dtypes) for f in all_csv_files)
    df = pd.concat(df_files, ignore_index=True)
    df = df.dropna(subset=["visitor_home_cbgs", "poi_cbg", "placekey"]) # drop empty visitor home cbgs
    return(df)

def trim_patterns_columns(patterns_df):
    
    keep_cols = [
        'placekey', 'naics_code', 'region',  'date_range_start',
        'date_range_end', 'raw_visitor_counts', 'poi_cbg', 'visitor_home_cbgs'
    ]
    
    # trim excess after .
    # convert naics to broader 2 digit
    patterns_df = patterns_df[keep_cols].assign(
            date_range_start = patterns_df['date_range_start'].str[:10],
            date_range_end = patterns_df['date_range_end'].str[:10],
            naics_code = patterns_df['naics_code'].str[:2],
            poi_cbg = patterns_df['poi_cbg'].str[:12]
        )
    
    return(patterns_df)

def read_and_trim_patterns(patterns_dir):
    
    patterns = read_patterns_data(patterns_dir)
    patterns = trim_patterns_columns(patterns)
    
    return(patterns)

In [None]:
def read_clean_patterns(all_csv_files):
    df = spark.read.option('header', 'True') \
        .option('inferSchema','True') \
        .option('escape', "\"") \
        .csv(all_csv_files)
    
    df = df.dropna(subset=["visitor_home_cbgs", "poi_cbg", "placekey"])
    df = df.where("visitor_home_cbgs!='{}'")

    # Add leading zero to poi_cbg column (lost in csv format)
    df = df.withColumn("dest_cbg", sf.format_string("%012d","poi_cbg"))

    # Remove additional NAICS code digits > 2
    df = df.withColumn("naics", df.naics_code.substr(1,2))
    
    return(df)

In [None]:
def census_process(census_dir):
    df = spark.read.option('header', 'True') \
        .option('inferSchema','True') \
        .option('escape', "\"") \
        .csv(census_dir)
    
    # select cbg and population columns, rename
    df = df.select('census_block_group', 'B01001e1')\
        .withColumnRenamed('B01001e1','population')
    
    fips_state_map = {
        '01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', '06': 'CA',
        '08': 'CO', '09': 'CT', '10': 'DE', '11': 'DC', '12': 'FL',
        '13': 'GA', '15': 'HI', '16': 'ID', '17': 'IL', '18': 'IN',
        '19': 'IA', '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME',
        '24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', '28': 'MS',
        '29': 'MO', '30': 'MT', '31': 'NE', '32': 'NV', '33': 'NH',
        '34': 'NJ', '35': 'NM', '36': 'NY', '37': 'NC', '38': 'ND',
        '39': 'OH', '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI',
        '45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', '49': 'UT',
        '50': 'VT', '51': 'VA', '53': 'WA', '54': 'WV', '55': 'WI',
        '56': 'WY', '72': 'PR'
    }
    
    # functions for spark imported as sf
    mapping_fips = sf.create_map([sf.lit(x) in chain(*map_dict.items())])
    
    
    state_fips = mapping_fips(df.census_block_group.substr(1,2))
    
    # work on this to spark
    state_pop = (
        census_df
        .assign(region = state_fips)
        .groupby('region', as_index = False)['population'].sum()
    )

In [5]:
## Read and process census data

# read 2019 census data and find population by state
def read_census_pop(census_dir):
    
    dtypes = {'census_block_group':str, 'B01001e1':int}
    census_pop = pd.read_csv(census_dir, dtype = dtypes)[['census_block_group', 'B01001e1']].rename(columns = {'B01001e1':'population'})
    
    return(census_pop)

def find_state_population(census_df):
    
    fips_state_map = {
        '01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', '06': 'CA',
        '08': 'CO', '09': 'CT', '10': 'DE', '11': 'DC', '12': 'FL',
        '13': 'GA', '15': 'HI', '16': 'ID', '17': 'IL', '18': 'IN',
        '19': 'IA', '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME',
        '24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', '28': 'MS',
        '29': 'MO', '30': 'MT', '31': 'NE', '32': 'NV', '33': 'NH',
        '34': 'NJ', '35': 'NM', '36': 'NY', '37': 'NC', '38': 'ND',
        '39': 'OH', '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI',
        '45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', '49': 'UT',
        '50': 'VT', '51': 'VA', '53': 'WA', '54': 'WV', '55': 'WI',
        '56': 'WY', '72': 'PR'
    }
    
    state_fips = census_df['census_block_group'].str[:2].apply(lambda x: fips_state_map[x])
    
    state_pop = (
        census_df
        .assign(region = state_fips)
        .groupby('region', as_index = False)['population'].sum()
    )
    
    return(state_pop)

def get_census_state_population(census_dir):
    
    census_state_population = read_census_pop(census_dir)
    census_state_population = find_state_population(census_state_population)
    
    return(census_state_population)

# read home_panel_summary file and summarize by date and state
def read_home_panel(all_csv_files_panel, monthly = False):
    dtypes = {'month':str, 'year':str, 'region':str, 'census_block_group':str, 'number_devices_residing':int}
    
    panel_files = (pd.read_csv(f, dtype=dtypes) for f in all_csv_files_panel)
    home_panel = pd.concat(panel_files, ignore_index=True)
    
    if monthly:
        home_panel = (
            home_panel
            .assign(date_range_start = pd.to_datetime(home_panel[['year', 'month']].assign(day = 1)))
        )
    
    return(home_panel)
    
def group_home_panel(home_panel_df):
    
    home_panel_grouped = (
        home_panel_df
        .assign(
            region = home_panel_df['region'].str.upper(),
            date_range_start = home_panel_df['date_range_start'].astype(str).str[:10]
        )
        .groupby(['region', 'date_range_start'], as_index = False)['number_devices_residing'].sum()
    )
    
    return(home_panel_grouped)

def read_and_group_home_panel(home_panel_dir, monthly = False):
    
    home_panel = read_home_panel(home_panel_dir, monthly = monthly)
    home_panel = group_home_panel(home_panel)
    
    return(home_panel)

# join state census populations and group home_panel summary
def join_census_home_panel(state_pop_df, state_home_panel_df):
    
    census_home_panel = state_home_panel_df.merge(state_pop_df, on = 'region')
    census_home_panel = census_home_panel.assign(state_multiplier = census_home_panel['population'] / census_home_panel['number_devices_residing'])
    
    return(census_home_panel)

# put it all together in a single function
def read_and_join_census_home_panel(census_dir, home_panel_dir, monthly = False):
    
    state_pop_df = get_census_state_population(census_dir)
    state_home_panel_df = read_and_group_home_panel(home_panel_dir, monthly = monthly)
    census_home_panel = join_census_home_panel(state_pop_df, state_home_panel_df)
    
    return(census_home_panel)

In [6]:
## parser function
# input: JSON dictionary element
# output: parsed JSON element ready for explosion
def parser(element):
    parsed = json.loads(element)

    if parsed is not None:
        return parsed
    else:
        return None

In [7]:
## combine census data, patterns, and panel data to create state rate
def census_patterns(pattern_files, census_file, home_panel_files):
    # clean MSA data
    patterns = read_and_trim_patterns(pattern_files)
    
    # read and join home panel and census data
    census_home_panel_byState_monthly = read_and_join_census_home_panel(census_file, home_panel_files, monthly = True)
    
    # merge census/panel/patterns data 
    df_nrms = patterns.merge(census_home_panel_byState_monthly, on = ['date_range_start', 'region'], how = 'left')
    
    return(df_nrms)

In [8]:
## Aggregate spark data to create visitor flows
def spark_agg(spark_df):
    # Aggregate visitors based on sender and destination tracts, NAICS code
    visitor_flows = spark_df.groupby(["sender_tract","date_range_start","naics_code"])\
        .agg(sf.sum("norm_visitors").alias("monthly_visitors_per_naics_tracts_NORMALIZED"))
    
    return(visitor_flows)

## Process w spark (json explode, use state multiplier, clean)
def spark_process(df_nrms):
    # reads pd df to spark df
    df = spark.createDataFrame(df_nrms)
    
    
    ## Implement parser to parse and explode visitor home cbgs
    jsonudf = udf(parser, MapType(StringType(), IntegerType()))

    visitor_home_cbgs_parsed = df.withColumn("parsed_visitor_home_cbgs", jsonudf("visitor_home_cbgs"))
    visitor_home_cbgs_exploded = visitor_home_cbgs_parsed.select("placekey", "poi_cbg", "naics_code",
                                                             "date_range_start", "date_range_end","state_multiplier",
                                                             explode("parsed_visitor_home_cbgs"))
    
    # use state multiplier and clean data, agg to tract level
    df = visitor_home_cbgs_exploded.withColumn("dest_tract", visitor_home_cbgs_exploded.poi_cbg.substr(1,11)) \
        .withColumn("sender_tract", visitor_home_cbgs_exploded.key.substr(1,11)) \
        .withColumn('norm_visitors',(sf.round((visitor_home_cbgs_exploded.value*visitor_home_cbgs_exploded.state_multiplier),0)))\
        .drop("date_range_end")\
        .drop("placekey")\
        .drop("poi_cbg")\
        .drop("key")\
        .drop("value")\
        .drop("state_multiplier")\
        .drop("dest_tract")
    
    visitor_flows = spark_agg(df)
    
    return(df)



In [9]:
# get patterns files for each MSA
def get_pattern_files(MSA):
    file_path = "/Users/esrieves/Documents/school/Research/foot_traffic/data/Inputs/%s_MSA" %MSA
    file_ext_patterns = "core_poi-patterns.csv.gz"
    
    all_pattern_files = [file
                for path, subdir, files in os.walk(file_path)
                for file in glob(os.path.join(path, file_ext_patterns))]
    
    return(all_pattern_files)

In [10]:
# get panel files for each MSA
def get_panel_files(MSA):
    file_path = "/Users/esrieves/Documents/school/Research/foot_traffic/data/Inputs/%s_MSA" %MSA
    file_ext_panel = "home_panel_summary.csv"

    all_panel_files = [file
                for path, subdir, files in os.walk(file_path)
                for file in glob(os.path.join(path, file_ext_panel))]
    return(all_panel_files)

In [11]:
# RUN analyses of aggregating data, cleaning in spark
def run_MSA(patterns,census,panel):
    df = census_patterns(patterns,census,panel)
    df_spark = spark_process(df)
    return(df_spark)

In [12]:
import time

In [None]:
start_time = time.time()
pattern_files = get_pattern_files("NYC")
panel_files = get_panel_files("NYC")
flows = run_MSA(pattern_files,census_dir,panel_files)
flows.write\
    .mode("overwrite")\
    .option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
    .option("header","True")\
    .csv("/Users/esrieves/Documents/school/Research/foot_traffic/data/Outputs/test2/NYC_MSA_18to21_visitor_flows")
end_time = time.time()

print(start_time - end_time)

In [None]:
import time
## RUN
msas = ['LA','Chicago','Dallas','Houston','DC']

start_time = time.time()

for msa in msas:
    msa_str = str(msa)
    print(msa_str)
    pattern_files = get_pattern_files(msa_str)
    panel_files = get_panel_files(msa_str)
    #output_string = "/Users/esrieves/Documents/school/Research/foot_traffic/data/Outputs/top_20_normalized/%s_MSA_18to21_visitor_flows" %msa_str
    #print(output_string)
    print("finished file accumulation, starting flows")
    
    flows = run_MSA(pattern_files,census_dir,panel_files)
    print("finished with flows, now writing %s" %msa_str)
    flows.write\
    .mode("overwrite")\
    .option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
    .option("header","True")\
    .csv("/Users/esrieves/Documents/school/Research/foot_traffic/data/Outputs/test2/%s_MSA_18to21_visitor_flows" %msa_str)
    print("finished writing file %s" %msa_str)
    
end_time = time.time()

print(start_time - end_time)

LA
finished file accumulation, starting flows
finished with flows, now writing LA


In [55]:
df.show()

+----------+----------------+------------+-------------+
|naics_code|date_range_start|sender_tract|norm_visitors|
+----------+----------------+------------+-------------+
|        52|      2018-07-01| 36061011100|         45.0|
|        52|      2018-07-01| 13121009801|         45.0|
|        52|      2018-07-01| 13121008800|         45.0|
|        52|      2018-07-01| 13121008902|         45.0|
|        52|      2018-07-01| 13121010603|         45.0|
|        81|      2018-07-01| 13015960700|         45.0|
|        54|      2018-07-01| 13117130408|         45.0|
|        54|      2018-07-01| 13121011504|         45.0|
|        54|      2018-07-01| 13121011503|         79.0|
|        54|      2018-07-01| 13121011503|         45.0|
|        54|      2018-07-01| 13121011611|         45.0|
|        54|      2018-07-01| 13117130302|         45.0|
|        54|      2018-07-01| 13057090502|         79.0|
|        54|      2018-07-01| 13121011619|         45.0|
|        54|      2018-07-01| 1