Our next objective is to determine the distance people traveled to grocery stores by census block group (CBG) using Safegraph data. In particular, we would like to know for each CBG, the average distance they traveled to the listed grocery stores in March 2019, October 2019, March 2020, and October 2020. (We select March and October to avoid summer and holidays with more noise from tourists and festivity shopping). To be consistent with Safegraph, we will use haversine distance for this project.

In [1]:
!gdown --id 1h9e1lLMVhQp1dguiuyhOMYa_zRS6dKIn -O weekly-patterns-nyc-2019-2020-sample.csv 
!gdown --id 1w9-wx4qjwdxxVVomVYbvVTfZUATt9kmS -O nyc_supermarkets.csv
!gdown --id 1_70Rh-j_ttFlfQ_VH3vbjnRrvcRxQdB7 -O nyc_cbg_centroids.csv
!gdown --id 1jkhqKXzSczLyBe-n-prdwwjZypEMYcbQ -O core-places-nyc.csv

!pip install pyspark
!pip install pyproj

Downloading...
From: https://drive.google.com/uc?id=1h9e1lLMVhQp1dguiuyhOMYa_zRS6dKIn
To: /content/weekly-patterns-nyc-2019-2020-sample.csv
100% 114M/114M [00:00<00:00, 203MB/s]
Downloading...
From: https://drive.google.com/uc?id=1w9-wx4qjwdxxVVomVYbvVTfZUATt9kmS
To: /content/nyc_supermarkets.csv
100% 170k/170k [00:00<00:00, 20.5MB/s]
Downloading...
From: https://drive.google.com/uc?id=1_70Rh-j_ttFlfQ_VH3vbjnRrvcRxQdB7
To: /content/nyc_cbg_centroids.csv
100% 326k/326k [00:00<00:00, 49.1MB/s]
Downloading...
From: https://drive.google.com/uc?id=1jkhqKXzSczLyBe-n-prdwwjZypEMYcbQ
To: /content/core-places-nyc.csv
100% 46.3M/46.3M [00:00<00:00, 124MB/s]
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████

In [2]:
import csv
import json
import pandas as pd
import numpy as np
from pyproj import Proj, transform
from math import radians, cos, sin, asin, sqrt, acos

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)
spark

In [3]:
#get a list of all the supermarkets to include
nyc_supermarkets = spark.read.load('nyc_supermarkets.csv', format = 'csv', header = True, inferSchema = True) \
                  .select('safegraph_placekey').distinct().rdd.flatMap(lambda x: x).collect()

#list of all the dates to include, Only visit patterns where date_range_start or date_range_end overlaps with Mar 2019, Oct 2019, Mar 2020, Oct 2020
date_list = ['2019-03', '2019-10', '2020-03', '2020-10']

#filter the visits in weeklyPatterns by nyc_supermarkets and date_list
weeklyPatterns = spark.read.option("header", True).option("escape", "\"").csv('weekly-patterns-nyc-2019-2020-sample.csv') \
          .select('placekey', 
                  'poi_cbg', 
                  'visitor_home_cbgs',
                  F.substring('date_range_start',0,7).alias('date_range_start'),
                  F.substring('date_range_end',0,7).alias('date_range_end')) \
          .filter(F.col('placekey').isin(nyc_supermarkets)) \
          .filter((F.col('date_range_start').isin(date_list)) |
                  (F.col('date_range_end').isin(date_list)))
          
#make one date column
weeklyPatterns = weeklyPatterns.withColumn('year_month', F.when(F.col('date_range_start').isin(date_list), F.col('date_range_start')).otherwise(F.col('date_range_end'))) \
                        .drop('date_range_start', 'date_range_end')




#User defined function to load json column
def loadJson(string):
  return json.loads(string)

jsonUDF = F.udf(loadJson, T.MapType(T.StringType(), T.IntegerType()))

#use the udf to convert visitor_home_cbgs to json and explode it
weeklyPatterns = weeklyPatterns.select('placekey', 
                                      'poi_cbg', 
                                      F.explode(jsonUDF('visitor_home_cbgs')).alias('visitor_home_cbgs', 'num_visitors'),
                                      'year_month')




#project longitutde and latidute for all cbgs to espg 2263
cbgs_pandas = pd.read_csv('nyc_cbg_centroids.csv')
inProj, outProj = Proj(init='epsg:4326'), Proj(init='epsg:2263')
cbgs_pandas['newLon'], cbgs_pandas['newLat'] = transform(inProj, outProj, cbgs_pandas['longitude'].tolist(), cbgs_pandas['latitude'].tolist())

nyc_cbg_centroids = spark.createDataFrame(cbgs_pandas)

#inner join with nyc_cbg_centroids so that we have the projected longitude and latidude for all store cbgs
weeklyPatterns = weeklyPatterns.join(nyc_cbg_centroids, weeklyPatterns['poi_cbg'] == nyc_cbg_centroids['cbg_fips'], 'inner') \
                          .withColumnRenamed('newLon', 'store_lon') \
                          .withColumnRenamed('newLat', 'store_lat') \
                          .drop('cbg_fips', 'latitude', 'longitude')

#inner join with nyc_cbg_centroids so that we have the projected longitude and latidude for all visitor cbgs in weekly pattern that exist in nyc_cbg_centroids
weeklyPatterns = weeklyPatterns.join(nyc_cbg_centroids, weeklyPatterns['visitor_home_cbgs'] == nyc_cbg_centroids['cbg_fips'], 'inner') \
                          .withColumnRenamed('newLon', 'visitor_lon') \
                          .withColumnRenamed('newLat', 'visitor_lat') \
                          .drop('poi_cbg', 'latitude', 'longitude')




#Compute Haversince distance between visitor and store cbgs
# def haversine_distance(start_lng, start_lat, end_lng, end_lat):
#   start_lng, start_lat, end_lng, end_lat = map(radians, [start_lng, start_lat, end_lng, end_lat])
#   radius = 3961 #in miles
#   distance = (radius * acos((cos(start_lat)*cos(end_lat)*cos(end_lng-start_lng))+sin((start_lat)*sin(end_lat))))
#   distance = distance/5280
#   return(abs(round(distance, 2)))

# udf_haversine_distance = F.udf(haversine_distance)

#Compute Euclidean distances distance between visitor and store cbgs
def euclidean_distance(start_lng, start_lat, end_lng, end_lat):
  distance = sqrt((end_lng-start_lng)**2+(end_lat-start_lat)**2)
  #convert from feet to miles
  distance = distance/5280
  return(abs(round(distance, 2)))

udf_euclidean_distance = F.udf(euclidean_distance)




#Add column computing distance between start and end
weeklyPatterns = weeklyPatterns.withColumn('distance', udf_euclidean_distance('visitor_lon', 'visitor_lat', 'store_lon', 'store_lat').cast('double')) \
                            .drop('placekey', 'visitor_home_cbgs', 'store_lon', 'store_lat', 'visitor_lon', 'visitor_lat')

#grouped by cbg, pivot to create a column for each month_year with avg distance as the value
weeklyPatterns_meanAvg = weeklyPatterns.groupBy('cbg_fips') \
              .pivot('year_month') \
              .agg(((F.sum('distance') * F.sum('num_visitors'))/F.sum('num_visitors')).alias('avg_distance')) \
              .drop('year_month') \
              .sort(F.col('cbg_fips').asc())

weeklyPatterns_meanAvg.write.csv('final_challenge/final_challenge_atkins.csv', header=True)    

  in_crs_string = _prepare_from_proj_string(in_crs_string)
  in_crs_string = _prepare_from_proj_string(in_crs_string)


In [4]:
#Extra Credit 1: Compute the median traveled distances instead of the mean. 

weeklyPatterns_medianAvg = weeklyPatterns.withColumn('distance', F.expr('explode(array_repeat(distance,int(num_visitors)))')) \
              .groupBy('cbg_fips') \
              .pivot('year_month') \
              .agg((F.expr("percentile(distance, 0.5)")).alias('median_distance')) \
              .drop('year_month') \
              .sort(F.col('cbg_fips').asc())

weeklyPatterns_meanAvg.write.csv('final_challenge/extraCredit1_atkins.csv', header=True)

In [5]:
#Extra Credit 2: Compute averge distance traveled for all grocery stores in the Safegraph data, match all placekeys with naics_code starts with 4451 in the core-places-nyc.
#filter to only stores in nyc 5 boros

core_places_nyc = spark.read.load('core-places-nyc.csv', format = 'csv', header = True, inferSchema = True) \
                  .filter(F.substring('naics_code',0,4) == 4451) \
                  .select('placekey').distinct().rdd.flatMap(lambda x: x).collect()

date_list = ['2019-03', '2019-10', '2020-03', '2020-10']
nyc_cbg_list = ['36061', '36005', '36047', '36081', '36085']

weeklyPatterns = spark.read.option("header", True).option("escape", "\"").csv('weekly-patterns-nyc-2019-2020-sample.csv') \
          .select('placekey', 
                  'poi_cbg', 
                  'visitor_home_cbgs',
                  F.substring('date_range_start',0,7).alias('date_range_start'),
                  F.substring('date_range_end',0,7).alias('date_range_end')) \
          .filter(F.col('placekey').isin(core_places_nyc)) \
          .filter(F.substring('poi_cbg',0,5).isin(nyc_cbg_list)) \
          .filter((F.col('date_range_start').isin(date_list)) |
                  (F.col('date_range_end').isin(date_list)))
          
weeklyPatterns = weeklyPatterns.withColumn('year_month', F.when(F.col('date_range_start').isin(date_list), F.col('date_range_start')).otherwise(F.col('date_range_end'))) \
                        .drop('date_range_start', 'date_range_end')

def loadJson(string):
  return json.loads(string)

jsonUDF = F.udf(loadJson, T.MapType(T.StringType(), T.IntegerType()))

weeklyPatterns = weeklyPatterns.select('placekey', 
                                      'poi_cbg', 
                                      F.explode(jsonUDF('visitor_home_cbgs')).alias('visitor_home_cbgs', 'num_visitors'),
                                      'year_month')

cbgs_pandas = pd.read_csv('nyc_cbg_centroids.csv')
inProj, outProj = Proj(init='epsg:4326'), Proj(init='epsg:2263')
cbgs_pandas['newLon'], cbgs_pandas['newLat'] = transform(inProj, outProj, cbgs_pandas['longitude'].tolist(), cbgs_pandas['latitude'].tolist())

nyc_cbg_centroids = spark.createDataFrame(cbgs_pandas)

weeklyPatterns = weeklyPatterns.join(nyc_cbg_centroids, weeklyPatterns['poi_cbg'] == nyc_cbg_centroids['cbg_fips'], 'inner') \
                          .withColumnRenamed('newLon', 'store_lon') \
                          .withColumnRenamed('newLat', 'store_lat') \
                          .drop('cbg_fips', 'latitude', 'longitude')

weeklyPatterns = weeklyPatterns.join(nyc_cbg_centroids, weeklyPatterns['visitor_home_cbgs'] == nyc_cbg_centroids['cbg_fips'], 'inner') \
                          .withColumnRenamed('newLon', 'visitor_lon') \
                          .withColumnRenamed('newLat', 'visitor_lat') \
                          .drop('poi_cbg', 'latitude', 'longitude')

def euclidean_distance(start_lng, start_lat, end_lng, end_lat):
  distance = sqrt((end_lng-start_lng)**2+(end_lat-start_lat)**2)
  distance = distance/5280
  return(abs(round(distance, 2)))

udf_euclidean_distance = F.udf(euclidean_distance)

weeklyPatterns = weeklyPatterns.withColumn('distance', udf_euclidean_distance('visitor_lon', 'visitor_lat', 'store_lon', 'store_lat').cast('double')) \
                            .drop('placekey', 'visitor_home_cbgs', 'store_lon', 'store_lat', 'visitor_lon', 'visitor_lat')

weeklyPatterns_meanAvg = weeklyPatterns.groupBy('cbg_fips') \
              .pivot('year_month') \
              .agg(((F.sum('distance') * F.sum('num_visitors'))/F.sum('num_visitors')).alias('avg_distance')) \
              .drop('year_month') \
              .sort(F.col('cbg_fips').asc())

weeklyPatterns_meanAvg.write.csv('final_challenge/extraCredit2_atkins.csv', header=True)    

  in_crs_string = _prepare_from_proj_string(in_crs_string)
  in_crs_string = _prepare_from_proj_string(in_crs_string)
