## Assignment 4 - Spark ML

## Learning Outcomes
In this assignment you will: 

-  Use ML piplenes
-  Improve a Random Forest model
-  Perform Hyperparameter tuning

**Submission Format**

- If you used databricks, please submit the **published** notebook link in a word or pdf document. Do not submit HTML, Jupyter notebook, or archive (DBC) formats.
- If you used a local instance of spark, please submit a Jupyter notebook.

#### Question 1:  (5 marks)

In our learning from this module, we have identified a fairly significant link by leveraging the ML pipeline, a more sophisticated model, and better hyperparameter tuning. However these results are still a bit disappointing. With that being said, we're working with very few features and we've likely made some assumptions that just aren't quite valid (like zip code shortening). Also, just because a rich zip code exists doesn't mean that the farmer's market would be held in that zip code too. In fact we might want to start looking at neighboring zip codes or doing some sort of distance measure to predict whether or not there exists a farmer's market in a certain mile radius from a wealthy zip code.

With that being said, we've got a lot of other potential features and plenty of other parameters to tune on our random forest so play around with the above pipeline and see if you can improve it further! Note: adding a feaure for the distance measure is just an example and not a mandatory change to improve the model's performance. We also aren't concerned about if the model's perforamnce is actually improved! We simply want to see if changes have been made to the code for possible improvements. 

Learn mode about the Farmers Markets dataset, here: https://catalog.data.gov/dataset/farmers-markets-directory-and-geographic-data
    
You may use the same classifier we built in the notebook in this module.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html

#### Question 2 ( 7 marks)


Using the Apache Spark ML pipeline, build a model to predict the price of a diamond based on the available features.

Read from the following notebook for details about dataset.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html

Note:  
- If you receive an R_Squared value that is negative, that is okay. This may occur due to the low sample size of the data.

# Question 1

In the original pipeline, we grouped ZIP codes by truncating them to four digits (ZIP3), which helped with data sparsity but introduced a spatial assumption: that ZIPs beginning with the same 4 digits are similar. In reality, this can merge distinct communities with very different income levels or access to markets.

We also assumed that a farmers market exists in the same ZIP as the customers it serves. However, people frequently cross ZIP boundaries to attend markets. Some markets may serve nearby affluent areas while being located in lower-income ZIPs.

To improve the model, we reverted to using full 5-digit ZIPs to retain geographic precision. We also used an additional dataset https://simplemaps.com/data/us-zips to grab x y coordinates for the 2013_soi_zipcode_agi dataset. Now we can create the feature how many farmer markets within 10km. 

We achieved an accuracy of 50% which is marginally better but we still have a long way to go. 

In [1]:
from pyspark.sql import SparkSession
from IPython.display import display, HTML
from pyspark.sql.functions import col, lpad

import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 8g pyspark-shell"

spark = SparkSession.builder \
    .appName("VSCodeTest") \
    .master("local[*]") \
    .getOrCreate()

print("✅ Spark session running in VS Code!")
print("Spark version:", spark.version)

✅ Spark session running in VS Code!
Spark version: 4.0.0


In [2]:
from pyspark.sql.functions import col, lpad

# Load the datasets
taxes2013 = (spark.read \
    .option("header", "true") \
    .csv("C:/Users/jverc/Downloads/2013_soi_zipcode_agi.csv")
    .filter(col("zipcode").isNotNull())  # filter out rows with null zip codes
    .withColumn("zipcode", lpad(col("zipcode").cast("string"), 5, "0")) # pad with zeros
    .filter(col("zipcode") != "00000")
)

markets = (spark.read \
    .option("header", "true") \
    .csv("C:/Users/jverc/Downloads/market_data.csv")
    .filter(col("x") != "null") # filter out rows with null x values
)

zip_long_lat = (spark.read \
    .option("header", "true") \
    .csv("C:/Users/jverc/Downloads/uszips.csv")
    .withColumn("zip", lpad(col("zip").cast("string"), 5, "0")) # pad with zeros
)

In [3]:
taxes2013.createOrReplaceTempView("taxes2013")
markets.createOrReplaceTempView("markets")
zip_long_lat.createOrReplaceTempView("zip_long_lat")

In [4]:
spark.table("taxes2013").describe().show()

+-------+------------------+------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----

In [5]:
spark.table("zip_long_lat").describe().show()

+-------+------------------+-----------------+------------------+----------+--------+----------+-----+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+--------------------+--------------------+--------------------+
|summary|               zip|              lat|               lng|      city|state_id|state_name| zcta|parent_zcta|        population|           density|       county_fips|county_name|    county_weights| county_names_all|   county_fips_all|           imprecise|            military|            timezone|
+-------+------------------+-----------------+------------------+----------+--------+----------+-----+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+--------------------+--------------------+--------------------+
|  count|             33782|            33782|             33782|     33782|   33782|     3

In [6]:
df_display = spark.sql("""
                       
SELECT 
  state, 
  SUBSTRING(zipcode, 1, 3) AS zip3, 
  STRING(zipcode) AS zipcode, 
                       
  -- X, Y coordinates:
  CAST(lat AS DOUBLE) AS latitude,
  CAST(lng AS DOUBLE) longitude,                                 

  -- Raw demographic & financial columns
  CAST(CAST(mars1 AS DOUBLE) AS INT) AS single_returns, 
  CAST(CAST(mars2 AS DOUBLE) AS INT) AS joint_returns, 
  CAST(CAST(numdep AS DOUBLE) AS INT) AS numdep, 
  CAST(A02650 AS DOUBLE) AS total_income_amount,
  CAST(A00200 AS DOUBLE) AS salaries_wages_amount,
  CAST(A04470 AS DOUBLE) AS itemized_deductions,
  CAST(A10600 AS DOUBLE) AS total_tax_payments,
  CAST(A59660 AS DOUBLE) AS earned_income_credit

FROM taxes2013 LEFT JOIN zip_long_lat ON taxes2013.zipcode = zip_long_lat.zip

WHERE lat IS NOT NULL
""")

df_display.show()

df_display.createOrReplaceTempView("cleaned_taxes")
spark.catalog.cacheTable("cleaned_taxes")

+-----+----+-------+--------+---------+--------------+-------------+------+-------------------+---------------------+-------------------+------------------+--------------------+
|state|zip3|zipcode|latitude|longitude|single_returns|joint_returns|numdep|total_income_amount|salaries_wages_amount|itemized_deductions|total_tax_payments|earned_income_credit|
+-----+----+-------+--------+---------+--------------+-------------+------+-------------------+---------------------+-------------------+------------------+--------------------+
|   AL| 350|  35004|33.60369|-86.49333|           950|          260|   710|            19851.0|              14383.0|             1848.0|            3275.0|              1479.0|
|   AL| 350|  35004|33.60369|-86.49333|           590|          410|   860|            49338.0|              41998.0|             5354.0|            5652.0|               569.0|
|   AL| 350|  35004|33.60369|-86.49333|           290|          490|   620|            56170.0|              4

In [7]:
spark.table("markets").describe().show()

+-------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+---------+-------+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+--------------------+------+----+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+--------------------+
|summary|              FMID|          MarketName|             Website|            Facebook|    Twitter|             Youtube|          OtherMedia|              street|     city| County|  State|               zip|         Season1Date|         Season1Time|         Season2Date|         Season2Time

In [8]:
cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")

# Haversine formula 

We will need to first cross join all the avaliable longitude and latitude between both datasets. This then allows us to compute the distance between each longitude and latitude for each zip_code and each farmers market. We can then group by and count the number of farmers markets within 10 km (driving distance) of each zip code longitude and latitude. 

# Step 1 - Cross Join Coordinates

Creates all possible ZIP–market combinations and exposes all latitude/longitude pairs needed for distance calculations.

In [9]:
zip_market_pairs = cleanedTaxes.crossJoin(
    markets.selectExpr("cast(x as double) as x", "cast(y as double) as y", "zip as market_zip")
)


# Check row counts

In [None]:
print("Rows in cleanedTaxes:", cleanedTaxes.count())
print("Rows in markets:", markets.count())

print("Expected cross join rows:", cleanedTaxes.count() * markets.count())
print("Actual zip_market_pairs rows:", zip_market_pairs.count())

Rows in cleanedTaxes: 166116
Rows in markets: 8487
Expected cross join rows: 1409826492
Actual zip_market_pairs rows: 1409826492


# Step 2 - Compute Distance with Haversine Formula

In [None]:
from pyspark.sql.functions import radians, sin, cos, sqrt, asin, col

R = 6371  # Radius of the Earth in kilometers

with_distances = zip_market_pairs.withColumn("distance_km", 2 * R * asin(sqrt(
    sin((radians(col("y") - col("latitude")) / 2)) ** 2 +
    cos(radians(col("latitude"))) * cos(radians(col("y"))) *
    sin((radians(col("x") - col("longitude")) / 2)) ** 2
)))


# Step 3 - Filter and aggregate for nearby markets

In [12]:
# Markets within 10 km for driving distance threshold
nearby_counts = with_distances.filter("distance_km < 10") \
    .groupBy("zipcode").count() \
    .withColumnRenamed("count", "markets_within_10km")


In [13]:
nearby_counts.describe().show()

+-------+-----------------+-------------------+
|summary|          zipcode|markets_within_10km|
+-------+-----------------+-------------------+
|  count|            13136|              13136|
|   mean|44304.53882460414|  30.90849573690621|
| stddev| 29432.8179200954|   59.5588464182051|
|    min|            01001|                  6|
|    max|            99824|                654|
+-------+-----------------+-------------------+



In [14]:
nearby_counts.show()

+-------+-------------------+
|zipcode|markets_within_10km|
+-------+-------------------+
|  35004|                 12|
|  35640|                  6|
|  85022|                  6|
|  90022|                102|
|  91910|                 36|
|  92027|                 12|
|  72070|                  6|
|  90807|                 54|
|  91326|                 24|
|  91767|                 36|
|  85323|                 12|
|  90094|                126|
|  90211|                168|
|  35054|                  6|
|  36869|                 12|
|  72110|                  6|
|  91780|                 54|
|  35976|                  6|
|  36610|                  6|
|  36744|                  6|
+-------+-------------------+
only showing top 20 rows


In [15]:
summedTaxes = cleanedTaxes.groupBy("zipcode").sum()

# Join nearby market count with ZIP-level tax data
joined = (
    summedTaxes.join(nearby_counts, on="zipcode", how="left")
    .na.fill({"markets_within_10km": 0})  # fill nulls if any ZIPs had no nearby markets
    .drop("sum(latitude)", "sum(longitude)")
)

In [16]:
joined.show()

+-------+-------------------+------------------+-----------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+
|zipcode|sum(single_returns)|sum(joint_returns)|sum(numdep)|sum(total_income_amount)|sum(salaries_wages_amount)|sum(itemized_deductions)|sum(total_tax_payments)|sum(earned_income_credit)|markets_within_10km|
+-------+-------------------+------------------+-----------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+
|  35004|               1960|              2150|       3200|                258024.0|                  211893.0|                 30225.0|                33431.0|                   2048.0|                 12|
|  35444|                480|               710|       1190|                 76844.0|                   58186.0|                  4820.0|                 9837.0|       

In [17]:
joined.describe().show()

+-------+------------------+-------------------+------------------+------------------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+
|summary|           zipcode|sum(single_returns)|sum(joint_returns)|       sum(numdep)|sum(total_income_amount)|sum(salaries_wages_amount)|sum(itemized_deductions)|sum(total_tax_payments)|sum(earned_income_credit)|markets_within_10km|
+-------+------------------+-------------------+------------------+------------------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+
|  count|             27686|              27686|             27686|             27686|                   27686|                     27686|                   27686|                  27686|                    27686|              27686|
|   mean| 48851.35830383587| 2348.5772592646103|1879.35924293866

In [18]:
# joined = joined.filter(col("markets_within_10km") > 0)

In [19]:
from pyspark.sql.functions import col, when

summed_with_features = (
    joined
    .withColumn("income_per_return", 
        col("sum(total_income_amount)") / 
        (col("sum(single_returns)") + col("sum(joint_returns)"))
    )
    .withColumn("effective_tax_rate", 
        col("sum(total_tax_payments)") / 
        col("sum(total_income_amount)")
    )
    .withColumn("avg_dependents_per_return", 
        col("sum(numdep)") / 
        (col("sum(single_returns)") + col("sum(joint_returns)"))
    )
    .withColumn("credit_ratio", 
        col("sum(earned_income_credit)") / 
        col("sum(total_tax_payments)")
    )
    .withColumn("market_access_label", 
        when(col("markets_within_10km") == 0, 0)
        .when(col("markets_within_10km") <= 5, 1)
        .when(col("markets_within_10km") <= 20, 2)
        .otherwise(3)
    )
)


In [21]:
summed_with_features.show()

+-------+-------------------+------------------+-----------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+------------------+-------------------+-------------------------+-------------------+-------------------+
|zipcode|sum(single_returns)|sum(joint_returns)|sum(numdep)|sum(total_income_amount)|sum(salaries_wages_amount)|sum(itemized_deductions)|sum(total_tax_payments)|sum(earned_income_credit)|markets_within_10km| income_per_return| effective_tax_rate|avg_dependents_per_return|       credit_ratio|market_access_label|
+-------+-------------------+------------------+-----------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+------------------+-------------------+-------------------------+-------------------+-------------------+
|  35004|               1960|              2150|       3200| 

In [22]:
summed_with_features.describe().show()

+-------+------------------+-------------------+------------------+------------------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+------------------+--------------------+-------------------------+-------------------+-------------------+
|summary|           zipcode|sum(single_returns)|sum(joint_returns)|       sum(numdep)|sum(total_income_amount)|sum(salaries_wages_amount)|sum(itemized_deductions)|sum(total_tax_payments)|sum(earned_income_credit)|markets_within_10km| income_per_return|  effective_tax_rate|avg_dependents_per_return|       credit_ratio|market_access_label|
+-------+------------------+-------------------+------------------+------------------+------------------------+--------------------------+------------------------+-----------------------+-------------------------+-------------------+------------------+--------------------+-------------------------+-------------------+-

In [23]:
nonFeatureCols = [
    "zipcode", 
    "markets_within_10km",
]
featureCols = [col for col in summed_with_features.columns if col not in nonFeatureCols]

In [24]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(summed_with_features)

In [25]:
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) # Why execute count here??
print(test.count())

19319
8367


In [26]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("markets_within_10km")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5, 10])
  .addGrid(rfModel.numTrees, [20, 60])
  .build())
# Note, that this parameter grid will take a long time
# to run in the community edition due to limited number
# of workers available! Be patient for it to run!
# If you want it to run faster, remove some of
# the above parameters and it'll speed right up!

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("markets_within_10km")))

pipelineFitted = cv.fit(training)

In [27]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel: uid=RandomForestRegressor_092eab2879f8, numTrees=60, numFeatures=13


{Param(parent='RandomForestRegressor_092eab2879f8', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestRegressor_092eab2879f8', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_092eab2879f8', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_092eab2879f8', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supporte

In [28]:
pipelineFitted.bestModel

PipelineModel_36d63e932d6b

In [29]:
holdout2 = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "markets_within_10km ", 
    """CASE double(round(prediction)) = markets_within_10km  
  WHEN true then 1
  ELSE 0
END as equal"""))
  
holdout2.show()

+-------------------+----------+-------------------+-----+
|     raw_prediction|prediction|markets_within_10km|equal|
+-------------------+----------+-------------------+-----+
|  6.634076050144942|       7.0|                  6|    0|
|  54.09309758926638|      54.0|                 60|    0|
|  14.60166220926718|      15.0|                 18|    0|
| 0.2683262488631826|       0.0|                  0|    1|
|  56.47780543037787|      56.0|                 54|    0|
|0.07385659437463538|       0.0|                  0|    1|
| 0.1470891236377311|       0.0|                  0|    1|
| 10.225444271402885|      10.0|                  6|    0|
|   8.63691886773068|       9.0|                 12|    0|
|  8.618569508942755|       9.0|                  6|    0|
|0.03481121096305478|       0.0|                  0|    1|
|0.03481121096305478|       0.0|                  0|    1|
|  47.56071160655977|      48.0|                 30|    0|
|0.12068025859237393|       0.0|                  0|    

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate Mean Squared Error
evaluator = RegressionEvaluator(
    labelCol="markets_within_10km", 
    predictionCol="prediction", 
    metricName="mse"
)
mse = evaluator.evaluate(holdout2)

mae = evaluator.setMetricName("mae").evaluate(holdout2)
rmse = evaluator.setMetricName("rmse").evaluate(holdout2)
r2 = evaluator.setMetricName("r2").evaluate(holdout2)

print(f"MSE: {mse}")
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")
print(f"R^2: {r2}")


MSE: 785.6073861599141
MAE: 7.54714951595554
RMSE: 28.028688627189002
R^2: 0.5489172314240276


In [31]:
from pyspark.sql.functions import col, abs

# Mean Absolute Percentage Error (only on non-zero true values)
mape_df = holdout2.withColumn(
    "pct_error", abs(col("prediction") - col("markets_within_10km")) / col("markets_within_10km")
).filter(col("markets_within_10km") != 0)

mape = mape_df.selectExpr("avg(pct_error)").first()[0]
print(f"MAPE: {mape * 100:.2f}%")

MAPE: 50.63%


In [32]:
holdout2.selectExpr("sum(equal)/sum(1)").show()

+---------------------+
|(sum(equal) / sum(1))|
+---------------------+
|   0.5208557427990916|
+---------------------+



In [33]:
# spark.stop(