# Feature Development Challenge 

In [0]:
%pip install "databricks-mosaic<0.4,>=0.3"

Python interpreter will be restarted.
Collecting databricks-mosaic<0.4,>=0.3
  Downloading databricks_mosaic-0.3.14-py3-none-any.whl (81.5 MB)
Collecting keplergl==0.3.2
  Downloading keplergl-0.3.2.tar.gz (9.7 MB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
    Preparing wheel metadata: started
    Preparing wheel metadata: finished with status 'done'
Collecting h3==3.7.3
  Downloading h3-3.7.3-cp39-cp39-manylinux2010_x86_64.whl (806 kB)
Collecting Shapely>=1.6.4.post2
  Downloading shapely-2.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.5 MB)
Collecting geopandas>=0.5.0
  Downloading geopandas-0.14.3-py3-none-any.whl (1.1 MB)
Collecting traittypes>=0.2.1
  Downloading traittypes-0.2.1-py2.py3-none-any.whl (8.6 kB)
Collecting fiona>=1.8.21
  Downloading fiona-1.9.6-cp39-cp39-manylinux2014_x86_64.wh

In [0]:
import pyspark.sql.functions as F
import mosaic as mos
mos.enable_mosaic(spark, dbutils)

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

from pyspark.sql.functions import col, substring, when, avg, count, lit
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np


                Please use a Databricks:
                    - Photon-enabled Runtime for performance benefits
                    - Runtime ML for spatial AI benefits
                Mosaic will stop working on this cluster after v0.3.x.




In [0]:
poi = spark.read.parquet("dbfs:/data/one_file/safegraph_poi.parquet")
spend = spark.read.parquet("dbfs:/data/safegraph_spend/*")
poi_cbg = spark.read.parquet("dbfs:/data/one_file/safegraph_poi_cbg.parquet")
placekey = spark.read.parquet("dbfs:/data/one_file/passby_placekey.parquet")
visitors = spark.read.parquet("dbfs:/data/one_file/passby_visitors.parquet")
cbg = spark.read.format("json")\
  .load('dbfs:/data/census/BG/tl_2020_16_bg.geojson')\
  .withColumn("geometry", mos.st_geomfromgeojson(F.to_json(F.col("geometry"))))\
  .select("properties.*", "geometry")\
  .drop("shape_area", "shape_leng")\
  .dropna(subset="GEOID")

## Build Target Table

722511 Full-Service Restaurants

In [0]:
cols = ["BRAND", "CITY", "STATE", "ZIP_CODE", "MONTH_STARTING", "MONTH_ENDING", "month", "year"]

placekey = placekey.drop('NAME')

visitors_pk = visitors.withColumn("month", F.month("MONTH_STARTING")) \
                   .withColumn("year", F.year("MONTH_STARTING")) \
                   .withColumn("MONTH_ID", F.concat(F.lpad(F.col("month"), 2, "0"), F.col("year"))) \
                   .drop(*cols) \
                   .join(placekey, on=['STORE_ID', "STREET_ADDRESS"], how='left')

In [0]:
cols = ['month', 'year']

spend_df = spend.withColumn("month", F.month("SPEND_DATE_RANGE_START")) \
             .withColumn("year", F.year("SPEND_DATE_RANGE_START")) \
             .withColumn("MONTH_ID", F.concat(F.lpad(F.col("month"), 2, "0"), F.col("year"))) \
             .drop(*cols) \
             .filter(F.col("year") == 2023)

In [0]:
poi_merged = poi.join(poi_cbg, on=["PLACEKEY", "LATITUDE", "LONGITUDE"], how="inner") \

poi_spend = poi_merged.join(spend_df, on=["PLACEKEY", "CITY", "REGION"], how='left') \
                      .drop('STORE_ID') \
                      .withColumnRenamed('LOCATION_NAME', 'NAME')

In [0]:
poi_spend_visits = poi_spend.join(visitors_pk, on=['PLACEKEY', 'NAME', 'STREET_ADDRESS', 'MONTH_ID'], how='left')

In [0]:
target = poi.join(poi_cbg, on="PLACEKEY", how="inner") \
        .filter(F.col("NAICS_CODE").rlike("^722511")) \
        .groupBy("TRACTCE") \
        .agg(F.count("NAICS_CODE").alias("count")) \
        .groupBy('TRACTCE') \
        .agg(F.expr("percentile_approx(count, 0.5)").alias("median_count")) \
        .join(cbg.select('TRACTCE'), on="TRACTCE", how="right") \
        .fillna(0) \
        .dropDuplicates() \
        .sort("TRACTCE")

display(target.limit(5))

TRACTCE,median_count
101,139
102,492
200,32
201,8
202,8


## Average Spend Residential 

I created a feature called "Average Spend per Customer on Residential Buildings and Dwellings." This feature helps us understand how much money people spend on homes in different areas. First, I looked at the "poi_spend_visits" data and only kept information about places related to NAICS code 531110, which includes businesses that rent out homes. Then, I grouped this data by the "TRACTCE" column, which shows which area each home is in, and found the average amount of money each customer spends using the "MEDIAN_SPEND_PER_CUSTOMER" column. I rounded this number to two decimal places and called it "avg_spend_residential." After that, I combined this information with the "target" data, which shows the number of businesses in each area. I filled in any missing information with zeros and sorted the data by area. Here's a simple outline of what I did:


** **

```
1. Filtered poi_spend_visits to include only NAICS code 531110 (rental businesses).
2. Grouped the data by area (TRACTCE) and found the average spend per customer.
3. Joined this data with the target data to include the number of businesses in each area.
4. Filled in missing information with zeros and sorted the data by area.
```


Reference Code: 
* https://stackoverflow.com/questions/58708230/plotly-how-to-plot-a-regression-line-using-plotly-and-plotly-express
* Code taken from the feature challenge 

In [0]:
feature_table = poi_spend_visits.filter(F.col("NAICS_CODE") == "531110") \
            .groupBy('TRACTCE') \
            .agg(F.avg("MEDIAN_SPEND_PER_CUSTOMER").alias("avg_spend_residential")) \
            .withColumn('avg_spend_residential', F.round(F.col('avg_spend_residential'), 2)) \
            .join(target, on="TRACTCE", how="right") \
            .na.fill(0) \
            .select("TRACTCE", "avg_spend_residential", "median_count") \
            .sort(F.asc("TRACTCE"))

In [0]:
feature_table_df = feature_table.toPandas()

fig = px.scatter(feature_table_df, x='avg_spend_residential', y='median_count', trendline="ols",
             labels={'avg_spend_residential': 'Average Spend per Customer on Residential Buildings', 
                     'median_count': 'Median Count of Businesses'},
             title='Relationship between Average Spend per Customer on Residential Buildings and Median Count of Full-Service Restaurants')

fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
display(feature_table.limit(5))

TRACTCE,avg_spend_residential,median_count
101,0.0,139
102,39.45,492
200,0.0,32
201,0.0,8
202,0.0,8



## Income Spend Map

This feature, called "Income Spend Map", provides insights into the average spend per customer based on different income ranges. 


Here's how it was created:
```
1. Unmap BUCKETED_CUSTOMER_INCOMES: Extract income data from `poi_spend_visits` by exploding the `BUCKETED_CUSTOMER_INCOMES` column to separate income ranges and their corresponding values.

2. Unmap MEAN_SPEND_PER_CUSTOMER_BY_INCOME: Similarly, extract spend data from `poi_spend_visits` by exploding the `MEAN_SPEND_PER_CUSTOMER_BY_INCOME` column to obtain spend values for each income range.

3. Aggregate Data: Join the unmapped income and spend data based on "TRACTCE" and "income" columns. Then, group the data by "TRACTCE" and pivot the "income" column to create separate columns for each income range. Aggregate the spend values using the first value encountered for each income range.

4. Define Income Ranges: Define the income ranges to be considered, such as "<25k", "25-45k", "45-60k", etc.

5. Create Map Column: Using `create_map()` function, create a map where each income range is paired with its corresponding spend value. This map will serve as the "Income Spend Map" feature.

6. Join with feature_table: Join the newly created income spend map with the existing `feature_table` based on the "TRACTCE" column, ensuring that each region gets its income spend map.

7. Handle Missing Values: Fill any missing values resulting from the join operation with zeros to maintain data integrity.
```


In [0]:
from pyspark.sql import functions as F

unmapped_incomes = poi_spend_visits.select(
    F.col("TRACTCE"),
    F.explode("BUCKETED_CUSTOMER_INCOMES").alias("income", "income_value")
)


unmapped_spend = poi_spend_visits.select(
    F.col("TRACTCE"),
    F.explode("MEAN_SPEND_PER_CUSTOMER_BY_INCOME").alias("income", "spend_value")
)


agg_income_spend = unmapped_incomes.join(
    unmapped_spend,
    ["TRACTCE", "income"]
).groupBy("TRACTCE").pivot("income").agg(
    F.first("spend_value").alias("mean_spend_per_customer_by_income")
)


income_ranges = ["<25k", "25-45k", "45-60k", "60-75k", "75-100k", "100-150k", ">150k"]


income_spend_map = F.create_map(*sum([[F.lit(income), F.col(income)] for income in income_ranges], []))


agg_income_spend = agg_income_spend.withColumn("income_spend_map", income_spend_map).select("TRACTCE", "income_spend_map")

feature_table = feature_table.join(
    agg_income_spend,
    on="TRACTCE",
    how="left"
)

feature_table = feature_table.na.fill(0)

In [0]:
display(feature_table.limit(5))

TRACTCE,avg_spend_residential,median_count,income_spend_map
21801,0.0,0,"Map(45-60k -> 34.49, 100-150k -> 42.82, 25-45k -> 32.73, <25k -> 33.96, 75-100k -> 42.13, >150k -> 49.36, 60-75k -> 40.62)"
21103,0.0,8,"Map(45-60k -> 22.7, 100-150k -> 27.11, 25-45k -> 22.75, <25k -> 20.5, 75-100k -> 24.36, >150k -> 27.86, 60-75k -> 21.67)"
960103,0.0,32,"Map(45-60k -> 42.61, 100-150k -> 42.0, 25-45k -> 58.12, <25k -> 25.0, 75-100k -> 45.5, >150k -> 24.0, 60-75k -> 30.0)"
950203,0.0,88,"Map(45-60k -> 18.45, 100-150k -> 32.4, 25-45k -> 19.0, <25k -> 27.14, 75-100k -> 13.34, >150k -> 17.98, 60-75k -> 21.5)"
20601,0.0,0,"Map(45-60k -> 10.5, 100-150k -> 37.1, 25-45k -> 16.96, <25k -> 12.09, 75-100k -> 9.54, >150k -> 141.67, 60-75k -> 170.0)"


In [0]:
feature_table_sample = feature_table.select(
    F.col("TRACTCE"),
    F.explode("income_spend_map").alias("income_range", "count")
)

In [0]:
feature_table_sample = feature_table_sample.groupBy("TRACTCE").pivot("income_range").agg(F.first("count"))

In [0]:
display(feature_table_sample.limit(5))

TRACTCE,100-150k,25-45k,45-60k,60-75k,75-100k,<25k,>150k
970400,20.56,100.0,225.0,71.08,47.74,47.74,94.02
21801,42.82,32.73,34.49,40.62,42.13,33.96,49.36
2223,4.13,17.96,40.16,31.17,23.87,25.56,54.91
1203,18.5,14.0,10.5,66.65,29.99,36.59,33.66
21103,27.11,22.75,22.7,21.67,24.36,20.5,27.86


In [0]:
feature_table = feature_table.join(feature_table_sample, on="TRACTCE", how="right").drop("income_spend_map")

In [0]:
display(feature_table.limit(5))

TRACTCE,avg_spend_residential,median_count,100-150k,25-45k,45-60k,60-75k,75-100k,<25k,>150k,100-150k.1,25-45k.1,45-60k.1,60-75k.1,75-100k.1,<25k.1,>150k.1
1203,0.0,56,18.5,14.0,10.5,66.65,29.99,36.59,33.66,18.5,14.0,10.5,66.65,29.99,36.59,33.66
2223,0.0,12,4.13,17.96,40.16,31.17,23.87,25.56,54.91,4.13,17.96,40.16,31.17,23.87,25.56,54.91
21103,0.0,8,27.11,22.75,22.7,21.67,24.36,20.5,27.86,27.11,22.75,22.7,21.67,24.36,20.5,27.86
21801,0.0,0,42.82,32.73,34.49,40.62,42.13,33.96,49.36,42.82,32.73,34.49,40.62,42.13,33.96,49.36
970400,0.0,68,20.56,100.0,225.0,71.08,47.74,47.74,94.02,20.56,100.0,225.0,71.08,47.74,47.74,94.02


In [0]:
feature_table_df = feature_table.toPandas()

fig = px.scatter(feature_table_df, x='100-150k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='25-45k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='45-60k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='60-75k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='75-100k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='<25k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()

In [0]:
fig = px.scatter(feature_table_df, x='>150k', y='median_count', trendline="ols")
fig.update_traces(line=dict(color='red'))

fig.show()