In [0]:
from pyspark.sql.functions import col, array_contains, explode, sum, size, regexp_replace, round, stddev, mean, count, when
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
import plotly.graph_objects as go
import plotly.io as pio

patterns = spark.sql("select * from safegraph.patterns")
places = spark.sql("select * from safegraph.places")
# cen = spark.sql("select * from safegraph.censusblock_table")

# Join Data

In [0]:
df = places.join(patterns,places.placekey == patterns.placekey,"left")

# Drop Unneccesary Columns

In [0]:
columns_to_drop = ['brands', 'safegraph_brand_ids', 'store_id', 'naics_code', 'street_address', 'iso_country_code', 'opened_on', 'closed_on', 'tracking_closed_since', 'phone_number', 'wkt_area_sq_meters', 'date_range_start', 'date_range_end', 'related_same_day_brand', 'related_same_month_brand', 'device_type', 'visitor_country_of_origin', 'poi_cbg', 'parent_placekey', 'open_hours', 'visits_by_day', 'visitor_home_cbgs', 'visitor_daytime_cbgs', 'top_category', 'sub_category']
df = df.drop(*columns_to_drop)

# Location Name Filter

In [0]:
df = df.filter(
    (col("top_category") == "Religious Organizations") &
    (col("location_name").rlike("Latter|latter|Saints|saints|LDS|\b[Ww]ard\b")) &
    (col("location_name").rlike("^((?!Reorganized).)*$")) &
    (col("location_name").rlike("^((?!All Saints).)*$")) &
    (col("location_name").rlike("^((?![cC]ath).)*$")) &
    (col("location_name").rlike("^((?![Bb]ody).)*$")) &
    (col("location_name").rlike("^((?![Pp]eter).)*$")) &
    (col("location_name").rlike("^((?![Cc]atholic).)*$")) &
    (col("location_name").rlike("^((?![Pp]res).)*$")) &
    (col("location_name").rlike("^((?![Mm]inist).)*$")) &
    (col("location_name").rlike("^((?![Mm]ission).)*$")) &
    (col("location_name").rlike("^((?![Ww]orship).)*$")) &
    (col("location_name").rlike("^((?![Rr]ain).)*$")) &
    (col("location_name").rlike("^((?![Bb]aptist).)*$")) &
    (col("location_name").rlike("^((?![Mm]eth).)*$")) &
    (col("location_name").rlike("^((?![Ee]vang).)*$")) &
    (col("location_name").rlike("^((?![Ll]utheran).)*$")) &
    (col("location_name").rlike("^((?![Oo]rthodox).)*$")) &
    (col("location_name").rlike("^((?![Ee]piscopal).)*$")) &
    (col("location_name").rlike("^((?![Tt]abernacle).)*$")) &
    (col("location_name").rlike("^((?![Hh]arvest).)*$")) &
    (col("location_name").rlike("^((?![Aa]ssem).)*$")) &
    (col("location_name").rlike("^((?![Mm]edia).)*$")) &
    (col("location_name").rlike("^((?![Mm]artha).)*$")) &
    (col("location_name").rlike("^((?![Cc]hristian).)*$")) &
    (col("location_name").rlike("^((?![Uu]nited).)*$")) &
    (col("location_name").rlike("^((?![Ff]ellowship).)*$")) &
    (col("location_name").rlike("^((?![Ww]esl).)*$")) &
    (col("location_name").rlike("^((?![C]cosmas).)*$")) &
    (col("location_name").rlike("^((?![Gg]reater).)*$")) &
    (col("location_name").rlike("^((?![Pp]rison).)*$")) &
    (col("location_name").rlike("^((?![Cc]ommuni).)*$")) &
    (col("location_name").rlike("^((?![Cc]lement).)*$")) &
    (col("location_name").rlike("^((?![Vv]iridian).)*$")) &
    (col("location_name").rlike("^((?![Dd]iocese).)*$")) &
    (col("location_name").rlike("^((?![Hh]istory).)*$")) &
    (col("location_name").rlike("^((?![Ss]chool).)*$")) &
    (col("location_name").rlike("^((?![Tt]hougt).)*$")) &
    (col("location_name").rlike("^((?![Hh]oliness).)*$")) &
    (col("location_name").rlike("^((?![Mm]artyr).)*$")) &
    (col("location_name").rlike("^((?![Jj]ames).)*$")) &
    (col("location_name").rlike("^((?![Ff]ellowship).)*$")) &
    (col("location_name").rlike("^((?![Hh]ouse).)*$")) &
    (col("location_name").rlike("^((?![Gg]lory).)*$")) &
    (col("location_name").rlike("^((?![Aa]nglican).)*$")) &
    (col("location_name").rlike("^((?![Pp]oetic).)*$")) &
    (col("location_name").rlike("^((?![Ss]anctuary).)*$")) &
    (col("location_name").rlike("^((?![Ee]quipping).)*$")) &
    (col("location_name").rlike("^((?![Jj]ohn).)*$")) &
    (col("location_name").rlike("^((?![Aa]ndrew).)*$")) &
    (col("location_name").rlike("^((?![Ee]manuel).)*$")) &
    (col("location_name").rlike("^((?![Rr]edeemed).)*$")) &
    (col("location_name").rlike("^((?![Pp]erfecting).)*$")) &
    (col("location_name").rlike("^((?![Aa]ngel).)*$")) &
    (col("location_name").rlike("^((?![Aa]rchangel).)*$")) &
    (col("location_name").rlike("^((?![Mm]icheal).)*$")) &
    (col("location_name").rlike("^((?![Tt]hought).)*$")) &
    (col("location_name").rlike("^((?![Pp]ariosse).)*$")) &
    (col("location_name").rlike("^((?![Cc]osmas).)*$")) &
    (col("location_name").rlike("^((?![Dd]eliverance).)*$")) &
    (col("location_name").rlike("^((?![Ss]ociete).)*$")) &
    (col("location_name").rlike("^((?![Tt]emple).)*$")) &
    (col("location_name").rlike("^((?![Ss]eminary).)*$")) &
    (col("location_name").rlike("^((?![Ee]mployment).)*$")) &
    (col("location_name").rlike("^((?![Ii]nstitute).)*$")) &
    (col("location_name").rlike("^((?![Cc]amp).)*$")) &
    (col("location_name").rlike("^((?![Ss]tudent).)*$")) &
    (col("location_name").rlike("^((?![Ee]ducation).)*$")) &
    (col("location_name").rlike("^((?![Ss]ocial).)*$")) &
    (col("location_name").rlike("^((?![Ww]welfare).)*$")) &
    (col("location_name").rlike("^((?![Cc][Ee][Ss]).)*$")) &
    (col("location_name").rlike("^((?![Ff]amily).)*$")) &
    (col("location_name").rlike("^((?![Mm]ary).)*$")) &
    (col("location_name").rlike("^((?![Rr]ussian).)*$")) &
    (col("location_name").rlike("^((?![Bb]eautif).)*$")) &
    (col("location_name").rlike("^((?![Hh]eaven).)*$")) &    
    (col("location_name").rlike("^((?!Inc).)*$")) &
    (col("location_name").rlike("^((?!God).)*$"))
  )

# Popularity By Day Filter

In [0]:
days_of_week = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

for day in days_of_week:
    df = df.withColumn(day, col("popularity_by_day").getItem(day))

df = df.drop("popularity_by_day")

In [0]:
days_of_week = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
for day in days_of_week:
    df = df.filter((col('Sunday') * 2) > col(day))

df = df.filter(df.Sunday > 5)

# Websites Filter

In [0]:
# unique_websites = df.select(col('websites')).distinct()
# display(unique_websites)

df = df.withColumn('websites', regexp_replace(col('websites'), r'[\[\]]', ''))
df = df.filter((col('websites').like('%lds.org%')) | (col('websites').like('%mormon.org%')) | (col('websites').like('%churchofjesuschrist.org%')) | (col('websites').like('%comeuntochrist.org%')) | (col('websites') == ''))

# unique_websites = df.select(col('websites')).distinct()
# display(unique_websites)

# Create Tract Column

In [0]:
df = df.select("*", explode(df.visitor_home_aggregation))\
    .withColumnRenamed('key', 'tract')\
    .withColumnRenamed('value', 'tract_visitors')

# State Scaling

In [0]:
df = df.withColumn('visit_visitor_ratio', col('raw_visit_counts') / col('raw_visitor_counts'))\
    .withColumn('tract_visitors_to_visits', col('tract_visitors') * col('visit_visitor_ratio'))\
    .withColumn('normalized_visit_ratio', col('normalized_visits_by_state_scaling') / col('raw_visit_counts'))\
    .withColumn('tract_visits', col('tract_visitors_to_visits') * col('normalized_visit_ratio'))\
    .withColumn('adjusted_tract_visitors', round(col('tract_visits') / col('visit_visitor_ratio'), 0))
show_df = df.take(5)
display(show_df)

placekey,location_name,category_tags,latitude,longitude,city,region,postal_code,websites,placekey.1,raw_visit_counts,raw_visitor_counts,distance_from_home,median_dwell,bucketed_dwell_times,popularity_by_hour,visitor_home_aggregation,normalized_visits_by_state_scaling,normalized_visits_by_region_naics_visits,normalized_visits_by_region_naics_visitors,normalized_visits_by_total_visits,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday,tract,tract_visitors,visit_visitor_ratio,tract_visitors_to_visits,normalized_visit_ratio,tract_visits,adjusted_tract_visitors
zzy-222@5w9-hk8-54v,The Church of Jesus Christ of Latter day Saints,Churches,43.66527,-116.394127,Meridian,ID,83646,,zzy-222@5w9-hk8-54v,188.0,64.0,1871.0,82.0,"Map(5-10 -> 7, 21-60 -> 40, 61-120 -> 70, <5 -> 1, >240 -> 3, 11-20 -> 12, 121-240 -> 55)","List(1, 0, 0, 1, 0, 0, 0, 2, 38, 51, 80, 70, 64, 31, 25, 15, 11, 7, 20, 22, 23, 5, 0, 1)","Map(16027021903 -> 4, 16001010331 -> 32, 16001010334 -> 4, 16001010313 -> 4, 16001010335 -> 4, 16001010225 -> 4, 16015950200 -> 4)",3028.717947813511,0.0013868090850748,0.0036148979945007,2.261060343491147e-05,11,9,28,7,6,6,121,16001010331,32,2.9375,94.0,16.110201850071867,1514.3589739067554,516.0
zzy-222@5w9-hk8-54v,The Church of Jesus Christ of Latter day Saints,Churches,43.66527,-116.394127,Meridian,ID,83646,,zzy-222@5w9-hk8-54v,188.0,64.0,1871.0,82.0,"Map(5-10 -> 7, 21-60 -> 40, 61-120 -> 70, <5 -> 1, >240 -> 3, 11-20 -> 12, 121-240 -> 55)","List(1, 0, 0, 1, 0, 0, 0, 2, 38, 51, 80, 70, 64, 31, 25, 15, 11, 7, 20, 22, 23, 5, 0, 1)","Map(16027021903 -> 4, 16001010331 -> 32, 16001010334 -> 4, 16001010313 -> 4, 16001010335 -> 4, 16001010225 -> 4, 16015950200 -> 4)",3028.717947813511,0.0013868090850748,0.0036148979945007,2.261060343491147e-05,11,9,28,7,6,6,121,16001010313,4,2.9375,11.75,16.110201850071867,189.29487173834443,64.0
zzy-222@5w9-hk8-54v,The Church of Jesus Christ of Latter day Saints,Churches,43.66527,-116.394127,Meridian,ID,83646,,zzy-222@5w9-hk8-54v,188.0,64.0,1871.0,82.0,"Map(5-10 -> 7, 21-60 -> 40, 61-120 -> 70, <5 -> 1, >240 -> 3, 11-20 -> 12, 121-240 -> 55)","List(1, 0, 0, 1, 0, 0, 0, 2, 38, 51, 80, 70, 64, 31, 25, 15, 11, 7, 20, 22, 23, 5, 0, 1)","Map(16027021903 -> 4, 16001010331 -> 32, 16001010334 -> 4, 16001010313 -> 4, 16001010335 -> 4, 16001010225 -> 4, 16015950200 -> 4)",3028.717947813511,0.0013868090850748,0.0036148979945007,2.261060343491147e-05,11,9,28,7,6,6,121,16001010225,4,2.9375,11.75,16.110201850071867,189.29487173834443,64.0
zzy-222@5w9-hk8-54v,The Church of Jesus Christ of Latter day Saints,Churches,43.66527,-116.394127,Meridian,ID,83646,,zzy-222@5w9-hk8-54v,188.0,64.0,1871.0,82.0,"Map(5-10 -> 7, 21-60 -> 40, 61-120 -> 70, <5 -> 1, >240 -> 3, 11-20 -> 12, 121-240 -> 55)","List(1, 0, 0, 1, 0, 0, 0, 2, 38, 51, 80, 70, 64, 31, 25, 15, 11, 7, 20, 22, 23, 5, 0, 1)","Map(16027021903 -> 4, 16001010331 -> 32, 16001010334 -> 4, 16001010313 -> 4, 16001010335 -> 4, 16001010225 -> 4, 16015950200 -> 4)",3028.717947813511,0.0013868090850748,0.0036148979945007,2.261060343491147e-05,11,9,28,7,6,6,121,16001010334,4,2.9375,11.75,16.110201850071867,189.29487173834443,64.0
zzy-222@5w9-hk8-54v,The Church of Jesus Christ of Latter day Saints,Churches,43.66527,-116.394127,Meridian,ID,83646,,zzy-222@5w9-hk8-54v,188.0,64.0,1871.0,82.0,"Map(5-10 -> 7, 21-60 -> 40, 61-120 -> 70, <5 -> 1, >240 -> 3, 11-20 -> 12, 121-240 -> 55)","List(1, 0, 0, 1, 0, 0, 0, 2, 38, 51, 80, 70, 64, 31, 25, 15, 11, 7, 20, 22, 23, 5, 0, 1)","Map(16027021903 -> 4, 16001010331 -> 32, 16001010334 -> 4, 16001010313 -> 4, 16001010335 -> 4, 16001010225 -> 4, 16015950200 -> 4)",3028.717947813511,0.0013868090850748,0.0036148979945007,2.261060343491147e-05,11,9,28,7,6,6,121,16027021903,4,2.9375,11.75,16.110201850071867,189.29487173834443,64.0


# Group Tract by Visitors

In [0]:
tract_df = df.select(['tract', 'adjusted_tract_visitors'])
tract_df = tract_df.groupBy('tract').agg(sum('adjusted_tract_visitors').alias('adjusted_tract_visitors'))
tract_df.show(n=5)

+-----------+-----------------------+
|      tract|adjusted_tract_visitors|
+-----------+-----------------------+
|16029960100|                15624.0|
|49035113406|                  138.0|
|49057200800|                  317.0|
|16031950100|                 6123.0|
|49049000102|                  427.0|
+-----------+-----------------------+
only showing top 5 rows



# Outliers

In [0]:
std_value, avg = tract_df.select(stddev(col('adjusted_tract_visitors'))).collect()[0][0], tract_df.select(mean(col('adjusted_tract_visitors'))).collect()[0][0]

upper_limit, lower_limit = avg + (3 * std_value), avg - (3 * std_value)

tract_df = tract_df.withColumn(
    'adjusted_visitors',
    round(when(col('adjusted_tract_visitors') > upper_limit, upper_limit)
    .when(col('adjusted_tract_visitors') < lower_limit, lower_limit)
    .otherwise(col('adjusted_tract_visitors')), 0)
)
tract_df.drop('adjusted_tract_visitors')

DataFrame[tract: string, adjusted_visitors: double]

# Joing Data back into Safegraph

In [0]:
combined = tract_df.join(other=df, on='tract', how='inner')

# Map Visual

In [0]:
latitudes = combined.select('latitude').rdd.flatMap(lambda x: x).collect()
longitudes = combined.select('longitude').rdd.flatMap(lambda x: x).collect()
import plotly.express as px
fig = px.scatter_geo(
    lat=latitudes,
    lon=longitudes,
    color=combined.select('adjusted_visitors').rdd.flatMap(lambda x: x).collect(),
    locationmode="USA-states"
)
fig.update_layout(
    title='Estimated Active Member Counts for Each Tract',
    geo_scope='usa',
)
fig.show()

## Personal Coding Notes below

In [0]:
df = spark.createDataFrame([
    Row(tract=400, visits=90),
    Row(tract=400, visits=80),
    Row(tract=200, visits=20)
])
display(df)

result_df = df.groupBy('tract').agg(sum('visits').alias('total_visits'))
display(result_df)

tract,visits
400,90
400,80
200,20


tract,total_visits
400,170
200,20


In [0]:
df = spark.createDataFrame([
    Row(popularity_by_day={"Wednesday": 15, "Monday": 21, "Saturday": 30, "Thursday": 16, "Tuesday": 26, "Friday": 15, "Sunday": 103}, visits=90),
    Row(popularity_by_day={"Wednesday": 16, "Monday": 22, "Saturday": 40, "Thursday": 18, "Tuesday": 28, "Friday": 16, "Sunday": 113}, visits=80),
    Row(popularity_by_day={"Wednesday": 14, "Monday": 20, "Saturday": 50, "Thursday": 19, "Tuesday": 30, "Friday": 14, "Sunday": 123}, visits=20)
])
display(df)

days_of_week = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

for day in days_of_week:
    df = df.withColumn(day, col("popularity_by_day").getItem(day))

df = df.drop("popularity_by_day")

display(df)

popularity_by_day,visits
"Map(Wednesday -> 15, Monday -> 21, Saturday -> 30, Thursday -> 16, Tuesday -> 26, Friday -> 15, Sunday -> 103)",90
"Map(Wednesday -> 16, Monday -> 22, Saturday -> 40, Thursday -> 18, Tuesday -> 28, Friday -> 16, Sunday -> 113)",80
"Map(Wednesday -> 14, Monday -> 20, Saturday -> 50, Thursday -> 19, Tuesday -> 30, Friday -> 14, Sunday -> 123)",20


visits,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday
90,21,26,15,16,15,30,103
80,22,28,16,18,16,40,113
20,20,30,14,19,14,50,123


In [0]:
df = spark.createDataFrame([
    Row(tract=400, visits=500),
    Row(tract=400, visits=80),
    Row(tract=200, visits=20)
])
df = df.filter(col('tract') > (col('visits') * 0.5))
display(df)

tract,visits
400,500
400,80
200,20


In [0]:
lds_websites = ['churchofjesuschrist.org', 'mormon.org', 'lds.org', 'comeuntochrist.org', '[]']

df = spark.createDataFrame([
    Row(websites='[lds]', visits=500),
    Row(websites='[junk.org]', visits=80),
    Row(websites='[]', visits=80)
])

# df = df.withColumn('websites', regexp_replace(col('websites'), r'[\[\]]', ''))

df = df.filter(col('websites').isin(lds_websites))
display(df)


websites,visits
[],80


In [0]:
df = spark.createDataFrame([
    Row(tract=400, visits=10000),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=500, visits=80),
    Row(tract=200, visits=20)
])
std_value, avg = df.select(stddev(col('visits'))).collect()[0][0], df.select(mean(col('visits'))).collect()[0][0]

upper_limit, lower_limit = avg + (3 * std_value), avg - (3 * std_value)
print(upper_limit)
print(lower_limit)

df = df.withColumn(
    'visits',
    round(when(col('visits') > upper_limit, upper_limit)
    .when(col('visits') < lower_limit, lower_limit)
    .otherwise(col('visits')), 0)
)
display(df)


7880.720185687851
-6560.720185687851


tract,visits
400,7881.0
500,80.0
500,80.0
500,80.0
500,80.0
500,80.0
500,80.0
500,80.0
500,80.0
500,80.0
