## Spark ML Exploration

## Learning Outcomes
Objectives:

-  Use ML piplenes
-  Improve a Random Forest model
-  Perform Hyperparameter tuning

Analysis 1: Predicting Farmer's Markets Locations

From our preliminary analysis using the ML pipeline, we noted certain links but observed results that could be further optimized. Despite the limitations like potential invalid assumptions such as zip code shortening, there's room for refinement. One proposition is to consider neighboring zip codes or utilize distance measures to predict the likelihood of a farmer's market within a certain radius from affluent zip codes.

The challenge is not strictly about enhancing the model's performance, but also about making iterative changes for potential improvements. The dataset in focus is the Farmers Markets dataset, which can be reviewed https://catalog.data.gov/dataset/farmers-markets-directory-and-geographic-data. For a baseline, refer to the model setup in this https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html.

Analysis 2: Predicting Diamond Prices

Leverage the power of the Apache Spark ML pipeline to construct a model predicting diamond prices based on the provided features. Detailed dataset information is available in this https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html.

###For this assignment, I have shifted focus towards selecting features that could provide a more accurate representation of income and population in each zip code. To achieve this, data integration was carried out from two sources - the zcta_coordinates dataset for zip code coordinates, and the Census Bureau's API for population details corresponding to these zip codes. A haversine algorithm was employed to decrease granularity. Furthermore, a square root transformation was used to tackle data skewness, enhancing the model's performance and computational efficiency.

In [0]:
# Read the data into a DataFrame
taxes2013 = spark.read \
  .format("csv") \
  .option("header", "true") \
  .load("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv")

# Create a temporary view
taxes2013.createOrReplaceTempView("taxes2013")


In [0]:
# Read the data into a DataFrame
markets = spark.read \
  .format("csv") \
  .option("header", "true") \
  .load("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv")

# Create a temporary view
markets.createOrReplaceTempView("markets")

### Adding coordinates for zip codes 

In [0]:
# Read the data into a DataFrame
zip_to_coords_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/zcta_coordinates-1.csv")

# Convert columns to appropriate types if necessary
from pyspark.sql.functions import col
zip_to_coords_df = zip_to_coords_df.withColumn("lon", col("lon").cast("double"))
zip_to_coords_df = zip_to_coords_df.withColumn("lat", col("lat").cast("double"))

# Create a temporary view for Spark SQL
zip_to_coords_df.createOrReplaceTempView("zip_to_coords_view")

##API to populate tax zipcodes 

In [0]:
%python
import requests
import json
from pyspark.sql import Row

# Define the base URL for the API
base_url = "https://api.census.gov/data/2019/acs/acs5"

# Define the parameters for the API call
params = {
    "get": "NAME,B01001_001E",
    "for": "zip code tabulation area:*"
}

# Make the API call
response = requests.get(base_url, params=params)

if response.status_code == 200:
    data = json.loads(response.text)
    
    # Convert the data to a DataFrame
    rdd = sc.parallelize(data[1:])
    df = rdd.map(lambda x: Row(Name=x[0], Population=int(x[1]), State=x[2], Zipcode=x[3])).toDF()

    # Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("census_data")
    
    df.show()
else:
    print(f"Failed to retrieve data. Status code: {response.status_code}")

+-----------+----------+-----+-------+
|       Name|Population|State|Zipcode|
+-----------+----------+-----+-------+
|ZCTA5 25245|       600|   54|  25245|
|ZCTA5 25268|       964|   54|  25268|
|ZCTA5 25286|      1700|   54|  25286|
|ZCTA5 25303|      6764|   54|  25303|
|ZCTA5 25311|     10964|   54|  25311|
|ZCTA5 25419|     11062|   54|  25419|
|ZCTA5 25434|      2821|   54|  25434|
|ZCTA5 25446|      1193|   54|  25446|
|ZCTA5 25106|       598|   54|  25106|
|ZCTA5 25501|      1198|   54|  25501|
|ZCTA5 25507|       776|   54|  25507|
|ZCTA5 25515|      2042|   54|  25515|
|ZCTA5 25520|      1497|   54|  25520|
|ZCTA5 25535|      2796|   54|  25535|
|ZCTA5 25564|      1174|   54|  25564|
|ZCTA5 25601|      5565|   54|  25601|
|ZCTA5 25606|      1531|   54|  25606|
|ZCTA5 25608|      1108|   54|  25608|
|ZCTA5 25617|       706|   54|  25617|
|ZCTA5 25621|      1626|   54|  25621|
+-----------+----------+-----+-------+
only showing top 20 rows



In [0]:
%python
taxes_selected = spark.sql("""
    SELECT 
        STATE, 
        zipcode, 
        agi_stub, 
        N1
    FROM taxes2013
    WHERE zipcode != 0 AND zipcode != 99999
""")

taxes_selected.show()

+-----+-------+--------+---------+
|STATE|zipcode|agi_stub|       N1|
+-----+-------+--------+---------+
|   AL|  35004|       1|1530.0000|
|   AL|  35004|       2|1330.0000|
|   AL|  35004|       3| 910.0000|
|   AL|  35004|       4| 610.0000|
|   AL|  35004|       5| 510.0000|
|   AL|  35004|       6|  40.0000|
|   AL|  35005|       1|1390.0000|
|   AL|  35005|       2|1030.0000|
|   AL|  35005|       3| 470.0000|
|   AL|  35005|       4| 230.0000|
|   AL|  35005|       5| 180.0000|
|   AL|  35005|       6|   0.0000|
|   AL|  35006|       1| 440.0000|
|   AL|  35006|       2| 330.0000|
|   AL|  35006|       3| 190.0000|
|   AL|  35006|       4| 130.0000|
|   AL|  35006|       5| 140.0000|
|   AL|  35006|       6|   0.0000|
|   AL|  35007|       1|4300.0000|
|   AL|  35007|       2|2680.0000|
+-----+-------+--------+---------+
only showing top 20 rows



###I have decided to shift the focus of my feature selection to fields that could provide a more accurate representation of the income for each zip code. Specifically, I have used N1 and agi_stub from the tax dataset, representing the number of tax returns and the respective income brackets. Using a weighted average approach, I estimated the adjusted gross income (AGI) for each zip code

In [0]:
%python
from pyspark.sql.functions import when, col, sum

# Define the median income for each agi_stub
income_bracket = {1: 12500, 2: 37500, 3: 62500, 4: 87500, 5: 112500, 6: 212500}

# Create a new column for the median income of each agi_stub
for bracket, income in income_bracket.items():
    taxes2013 = taxes2013.withColumn('agi_stub_{}'.format(bracket), 
                                     when(col('agi_stub') == bracket, col('N1') * income))

# Group by zipcode and calculate the total count and total AGI
taxes_aggregate = taxes2013.groupBy('zipcode').agg(
    sum(col('N1')).alias('totalCount'),
    sum(col('agi_stub_1')).alias('totalAGI_1'),
    sum(col('agi_stub_2')).alias('totalAGI_2'),
    sum(col('agi_stub_3')).alias('totalAGI_3'),
    sum(col('agi_stub_4')).alias('totalAGI_4'),
    sum(col('agi_stub_5')).alias('totalAGI_5'),
    sum(col('agi_stub_6')).alias('totalAGI_6')
)

# Calculate the estimated AGI for each zipcode
taxes_aggregate = taxes_aggregate.withColumn('estimated_AGI', 
    (col('totalAGI_1') + col('totalAGI_2') + col('totalAGI_3') + col('totalAGI_4') + col('totalAGI_5') + col('totalAGI_6')) / col('totalCount'))

taxes_aggregate.createOrReplaceTempView("taxes_aggregate_view")

taxes_aggregate.show()

In [0]:
from pyspark.sql.functions import col, sum
# Adding Latitude and Longitude to the dataset

joined_df = spark.sql("""
    SELECT 
        f.zipcode,
        ROUND(f.estimated_AGI) as estimated_AGI,
        c.Population,
        c.State,
        z.lat as Latitude,
        z.lon as Longitude
    FROM taxes_aggregate_view AS f
    JOIN census_data AS c
    ON f.zipcode = c.Zipcode
    LEFT JOIN zip_to_coords_view AS z
    ON f.zipcode = z.GEOID10
""")

joined_df.createOrReplaceTempView("joined_df")

joined_df.show()



+-------+-------------+----------+-----+------------------+------------------+
|zipcode|estimated_AGI|Population|State|          Latitude|         Longitude|
+-------+-------------+----------+-----+------------------+------------------+
|  25817|      45192.0|       708|   54|         37.760812|-81.40611103544494|
|  25501|      39881.0|      1198|   54|         38.159839|-81.96371335314505|
|  25601|      43029.0|      5565|   54|        37.8546745|-81.99475838541792|
|  25434|      39024.0|      2821|   54|        39.4573195|-78.41869813005647|
|  25671|      38141.0|      1471|   54|37.896817999999996|-82.15554925432181|
|  25311|      45595.0|     10964|   54|         38.362155|-81.54456745557027|
|  25564|      44363.0|      1174|   54| 38.27144799999999|-81.90408735156251|
|  25446|      54688.0|      1193|   54|          39.23663|  -77.944949777423|
|  25515|      39286.0|      2042|   54|38.751594499999996|-82.14917737648028|
|  25106|      34722.0|       598|   54|        38.7

In [0]:
# Check for null values in each column
for column in joined_df.columns:
    print(column, "with null values: ", joined_df.where(col(column).isNull()).count())


zipcode with null values:  0
estimated_AGI with null values:  0
Population with null values:  0
State with null values:  0
Latitude with null values:  0
Longitude with null values:  0


####I will pick up the number of farmer markets for every zip code

In [0]:
%python
result = spark.sql("""
SELECT 
    COUNT(*) as count, 
    INT(Zip / 10) as zip 
FROM markets 
GROUP BY INT(Zip / 10)
""")

result.createOrReplaceTempView("cleaned_markets")

result = spark.sql("SELECT * FROM cleaned_markets LIMIT 10")
result.show()

+-----+----+
|count| zip|
+-----+----+
|    5|4900|
|    2|7240|
|    8|4818|
|    1|9852|
|    2|5300|
|    5|2122|
|    2|9900|
|    1|8592|
|    1|1580|
|    1|3175|
+-----+----+



###The final table consolidates relevant data from multiple sources into one comprehensive dataset. This dataset provides a detailed snapshot of each zip code area and features the following columns: zipcode;estimated_AGI;Population;Latitude;Longitude;count, and zip. 

In [0]:
%python
# Perform the join operation
final_table = spark.sql("""
    SELECT 
        a.zipcode, 
        a.estimated_AGI, 
        a.Population, 
        a.Latitude,
        a.Longitude,
        b.count, 
        b.zip 
    FROM joined_df a 
    LEFT OUTER JOIN cleaned_markets b 
    ON(a.zipcode=b.zip)
""")
final_table.show()

+-------+-------------+----------+------------------+-------------------+-----+----+
|zipcode|estimated_AGI|Population|          Latitude|          Longitude|count| zip|
+-------+-------------+----------+------------------+-------------------+-----+----+
|  35004|      49721.0|     12045|         33.615643| -86.48450973252687| null|null|
|  35444|      44643.0|      3972|         33.330044| -87.29786008162166| null|null|
|  35640|      47651.0|     25654|        34.4321505| -86.93239004931354| null|null|
|  35670|      45462.0|      6594|34.462768499999996| -86.70405302288616| null|null|
|  36067|      46077.0|     27601|        32.5138015| -86.59358225063184| null|null|
|  36526|      57043.0|     34565|        30.6089265| -87.86754901099667| null|null|
|  85022|      48409.0|     51920|        33.6250205|-112.05306184353742| null|null|
|  72472|      34731.0|      9167|         35.575924| -90.53634864884637| null|null|
|  90022|      28355.0|     67014|         34.029567|-118.1586931

###I dropped zip becuase it is equivalent to the zipcode column and handle null values

In [0]:
%python
final_table = final_table.drop('zip')
final_table = final_table.na.fill(0)
final_table.show()

+-------+-------------+----------+------------------+-------------------+-----+
|zipcode|estimated_AGI|Population|          Latitude|          Longitude|count|
+-------+-------------+----------+------------------+-------------------+-----+
|  35004|      49721.0|     12045|         33.615643| -86.48450973252687|    0|
|  35444|      44643.0|      3972|         33.330044| -87.29786008162166|    0|
|  35640|      47651.0|     25654|        34.4321505| -86.93239004931354|    0|
|  35670|      45462.0|      6594|34.462768499999996| -86.70405302288616|    0|
|  36067|      46077.0|     27601|        32.5138015| -86.59358225063184|    0|
|  36526|      57043.0|     34565|        30.6089265| -87.86754901099667|    0|
|  85022|      48409.0|     51920|        33.6250205|-112.05306184353742|    0|
|  72472|      34731.0|      9167|         35.575924| -90.53634864884637|    0|
|  90022|      28355.0|     67014|         34.029567|-118.15869319521369|    0|
|  91910|      45019.0|     74855|      

In [0]:
final_table.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+-------------------+
|summary|           zipcode|     estimated_AGI|        Population|         Latitude|          Longitude|              count|
+-------+------------------+------------------+------------------+-----------------+-------------------+-------------------+
|  count|             27687|             27687|             27687|            27687|              27687|              27687|
|   mean| 48848.03991042727| 47430.19789070683|11640.184888214686|38.75023619422482| -90.26027897302043|0.06540975909271499|
| stddev|27019.969998839104|13666.317327861723| 15360.77427909072|4.927206460175357| 14.048502786906592| 0.4598502109486731|
|    min|             01001|           12500.0|                 0|        19.306708|-159.50799237610255|                  0|
|    max|             99901|          141042.0|            128294|       66.1803405| -67.01129174440739|                 18|


###Granularity and Grouping:The dataset's granularity, where each row corresponds to a single zip code, could potentially cause problems due to socioeconomic and geographic factors varying greatly even within small areas. To address this, I use the Haversine formula to group zip codes within a 10-km radius, thus creating a more representative local context. The resultant features for each group include: average neighborhood Adjusted Gross Income (AGI), average population, and count of neighboring markets. This approach should offer a more detailed and comprehensive basis for the model.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
import math

# Define Haversine UDF
def haversine_udf(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance in kilometers between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a)) 
    r = 6371 # Radius of earth in kilometers.
    return c * r

In [0]:
# Register the function as a UDF
haversine_udf = F.udf(haversine_udf, DoubleType())

# Add alias to differentiate the columns
df1 = final_table.alias('df1')
df2 = final_table.alias('df2')

# Define a maximum distance (in kilometers)
max_distance = 10


In [0]:
# Cross-join with distance condition and filter out self-join results
neighbors = df1.crossJoin(df2).where(
    (haversine_udf(F.col("df1.Longitude"), F.col("df1.Latitude"), F.col("df2.Longitude"), F.col("df2.Latitude")) <= max_distance) &
    (F.col("df1.zipcode") != F.col("df2.zipcode")) # Exclude self-join results
)

# Filter pairs where at least one has a farmer's market
neighbors_with_markets = neighbors.where((F.col("df1.count") > 0) | (F.col("df2.count") > 0))


In [0]:
# Aggregate functions to compute the mean of estimated_AGI, Population, and the count of neighbors with markets for each zipcode
aggregations = [F.round(F.mean("df2.estimated_AGI")).alias("avg_neighbor_agi"),
                F.round(F.mean("df2.Population")).alias("avg_neighbor_population"),
                F.count("df2.zipcode").alias("num_neighbors_with_markets")]

# Compute the average estimated_AGI, Population, and the count of neighbors with markets for each zipcode
neighbors_with_features = neighbors_with_markets.groupby("df1.zipcode").agg(*aggregations)

neighbors_with_features.show()

+-------+----------------+-----------------------+--------------------------+
|zipcode|avg_neighbor_agi|avg_neighbor_population|num_neighbors_with_markets|
+-------+----------------+-----------------------+--------------------------+
|  19121|         27640.0|                22874.0|                         3|
|  19147|         41711.0|                16273.0|                         6|
|  19102|         32640.0|                17916.0|                         4|
|  19122|         27640.0|                22874.0|                         3|
|  19136|         39040.0|                14868.0|                         2|
|  19030|         52793.0|                 4123.0|                         1|
|  19076|         58483.0|                 7642.0|                         1|
|  19141|         33627.0|                18753.0|                         1|
|  19140|         29281.0|                23343.0|                         2|
|  19137|         41376.0|                21427.0|              

In [0]:
# Check for null values in each column
for column in neighbors_with_features.columns:
    print(column, "with null values: ", neighbors_with_features.where(col(column).isNull()).count())

zipcode with null values:  0
avg_neighbor_agi with null values:  0
avg_neighbor_population with null values:  0
num_neighbors_with_markets with null values:  0


### I will examine the data for outliers, apply any necessary filters, then run the correlation analysis and random forest model.

In [0]:
# Note: this requires the matplotlib and seaborn libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Selecting the columns of interest
data_df = neighbors_with_features.select(["avg_neighbor_agi", "avg_neighbor_population"]).toPandas()

# Creating the boxplot
plt.figure(figsize=(12, 6))
sns.boxplot(data=data_df)
plt.xticks(rotation=90)  # Rotate the x labels for better readability if needed
plt.show()


In [0]:
from pyspark.sql.functions import expr

# Calculate bounds for avg_neighbor_agi
quantiles_agi = neighbors_with_features.approxQuantile("avg_neighbor_agi", [0.25, 0.75], 0.05)
IQR_agi = quantiles_agi[1] - quantiles_agi[0]
bounds_agi = [quantiles_agi[0] - 1.5 * IQR_agi, quantiles_agi[1] + 1.5 * IQR_agi]

# Print bounds for avg_neighbor_agi
print(f"Lower bound for avg_neighbor_agi: {bounds_agi[0]}")
print(f"Upper bound for avg_neighbor_agi: {bounds_agi[1]}")

# Calculate bounds for avg_neighbor_population
quantiles_pop = neighbors_with_features.approxQuantile("avg_neighbor_population", [0.25, 0.75], 0.05)
IQR_pop = quantiles_pop[1] - quantiles_pop[0]
bounds_pop = [quantiles_pop[0] - 1.5 * IQR_pop, quantiles_pop[1] + 1.5 * IQR_pop]

# Print bounds for avg_neighbor_population
print(f"Lower bound for avg_neighbor_population: {bounds_pop[0]}")
print(f"Upper bound for avg_neighbor_population: {bounds_pop[1]}")

# Adjust the lower bound for avg_neighbor_population
bounds_pop[0] = max(0, bounds_pop[0])
print(f"Adjusted lower bound for avg_neighbor_population: {bounds_pop[0]}")


Lower bound for avg_neighbor_agi: 20497.0
Upper bound for avg_neighbor_agi: 91313.0
Lower bound for avg_neighbor_population: -20032.5
Upper bound for avg_neighbor_population: 42683.5
Adjusted lower bound for avg_neighbor_population: 0


In [0]:
# Filter out rows where the avg_neighbor_agi is greater than 91313.0
neighbors_with_features_filtered = neighbors_with_features.filter(neighbors_with_features.avg_neighbor_agi <= 91313.0)

# Filter out rows where the avg_neighbor_population is greater than 42683.5
neighbors_with_features_filtered = neighbors_with_features_filtered.filter(neighbors_with_features_filtered.avg_neighbor_population <= 42683.5)

# Filter out rows where avg_neighbor_population is less than 0
neighbors_with_features_filtered = neighbors_with_features_filtered.filter(neighbors_with_features_filtered.avg_neighbor_population >= 0)



In [0]:
# Write the DataFrame to a Parquet file
neighbors_with_features_filtered.write.parquet("/FileStore/neighbors_with_features_filtered.parquet")


In [0]:
# Read the Parquet file into a DataFrame
neighbors_with_features_filtered = spark.read.parquet("/FileStore/neighbors_with_features_filtered.parquet")


In [0]:

columns = [col for col in neighbors_with_features_filtered.columns if col != 'zipcode']

# Compute and print the correlation for each pair of columns
for i in range(len(columns)):
    for j in range(i + 1, len(columns)):
        print(f"Correlation between {columns[i]} and {columns[j]}: {neighbors_with_features_filtered.stat.corr(columns[i], columns[j])}")


Correlation between avg_neighbor_agi and avg_neighbor_population: 0.3060124063084086
Correlation between avg_neighbor_agi and num_neighbors_with_markets: 0.19340331846648293
Correlation between avg_neighbor_population and num_neighbors_with_markets: 0.4476717926180829


###The correlation values indicate that 'avg_neighbor_population' (0.448) has a stronger relationship with 'num_neighbors_with_markets' than 'avg_neighbor_agi' (0.193). These correlations are relatively modest, implying that further feature engineering might be beneficial. Nevertheless, this process highlights the importance of iterative feature selection and refinement in model development

In [0]:
from pyspark.ml.feature import VectorAssembler

# Exclude non-feature columns
featureCols = ["avg_neighbor_agi", "avg_neighbor_population"]

# Use VectorAssembler to combine feature columns into a single vector column
vectorAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# Include label in the dataframe for modeling
df_vector = vectorAssembler.transform(neighbors_with_features_filtered).select("features", "num_neighbors_with_markets")


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Set the model
rfModel = RandomForestRegressor(labelCol="num_neighbors_with_markets", featuresCol="features")

# Set the pipeline
pipeline = Pipeline().setStages([rfModel])

# Split the dataset into a training set and a test set
training, test = df_vector.randomSplit([0.7, 0.3])

# Specify the hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rfModel.numTrees, [10, 20]) \
    .addGrid(rfModel.maxDepth, [5]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="num_neighbors_with_markets"),
                          numFolds=3)

cvModel = crossval.fit(training)

# Make predictions on the test data
predictions = cvModel.transform(test)

# List of metrics we want to compute
metrics = ["rmse", "mae", "mse", "r2"]

# Evaluate the model on multiple metrics
for metric in metrics:
    evaluator = RegressionEvaluator(labelCol="num_neighbors_with_markets", metricName=metric)
    score = evaluator.evaluate(predictions)
    print(f"{metric.upper()} on test data = {score}")


RMSE on test data = 7.254474305541948
MAE on test data = 4.007210198984731
MSE on test data = 52.62739744976834
R2 on test data = 0.2736195127336781


###Given the computational demands of running a more extensive hyperparameter grid, I had to streamline the parameters to ensure the model would run efficiently.
###The model exhibits some predictive capability, but it's clear that there is substantial room for improvement. Notably, a significant amount of error and unexplained variance are present in the predictions.
###As a pathway towards enhancing the model's performance, future work could consider:
###Introducing additional features to the model
###Undertaking more extensive feature engineering
###Experimenting with different types of models