# **Farmer Market ML Model Using Spark ML**

In [None]:
!pip install pyspark



In [None]:
# import modules from pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import col

# setup the necessary contexts
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
sqlContext = SQLContext(spark)



In [None]:
markets = (spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("farmers_markets_from_usda.csv.gz"))

taxes2013 = (spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("zipcodeagi13.csv.gz")) #"2013_soi_zipcode_agi.csv"

# Register spark SQL tables
taxes2013.createOrReplaceTempView("taxes2013")
markets.createOrReplaceTempView("markets")

In [None]:
# Compared to the previous model, I add features AGI, number_of_returns_with_total_income from the dataset, also limit the zipcode with the actual zipcode from 00501 to 99950
cleaned_taxes = taxes2013 \
    .filter((col("zipcode") <= 99950) & (col("zipcode") >= 501)) \
    .select(
        col("zipcode").cast("int").alias("zipcode"),
        col("mars1").cast("int").alias("single_returns"),
        col("mars2").cast("int").alias("joint_returns"),
        col("numdep").cast("int").alias("numdep"),
        col("A00100").cast("double").alias("AGI"),
        col("A02650").cast("double").alias("total_income_amount"),
        col("A00300").cast("double").alias("taxable_interest_amount"),
        col("N02650").cast("double").alias("number_of_returns_with_total_income"),
        col("a01000").cast("double").alias("net_capital_gains"),
        col("a00900").cast("double").alias("biz_net_income")
    )

cleaned_taxes.createOrReplaceTempView("cleaned_taxes")
pd_cleaned_taxes = cleaned_taxes.toPandas()
pd_cleaned_taxes

Unnamed: 0,zipcode,single_returns,joint_returns,numdep,AGI,total_income_amount,taxable_interest_amount,number_of_returns_with_total_income,net_capital_gains,biz_net_income
0,35004,950,260,710,19524.0,19851.0,183.0,1530.0,4.0,1657.0
1,35004,590,410,860,48895.0,49338.0,172.0,1330.0,54.0,788.0
2,35004,290,490,620,55761.0,56170.0,185.0,910.0,139.0,584.0
3,35004,90,490,530,52579.0,52977.0,89.0,610.0,173.0,339.0
4,35004,40,460,450,63848.0,64329.0,205.0,510.0,709.0,1720.0
...,...,...,...,...,...,...,...,...,...,...
166123,83414,30,60,60,2709.0,2850.0,34.0,60.0,28.0,8.0
166124,83414,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
166125,83414,0,0,30,1787.0,1829.0,0.0,20.0,0.0,0.0
166126,83414,0,40,40,4263.0,4580.0,242.0,30.0,1394.0,743.0


In [None]:
sqlContext.cacheTable("cleaned_taxes")
cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")
pd_cleanedTaxes = cleanedTaxes.toPandas()
pd_cleanedTaxes

Unnamed: 0,zipcode,single_returns,joint_returns,numdep,AGI,total_income_amount,taxable_interest_amount,number_of_returns_with_total_income,net_capital_gains,biz_net_income
0,35004,950,260,710,19524.0,19851.0,183.0,1530.0,4.0,1657.0
1,35004,590,410,860,48895.0,49338.0,172.0,1330.0,54.0,788.0
2,35004,290,490,620,55761.0,56170.0,185.0,910.0,139.0,584.0
3,35004,90,490,530,52579.0,52977.0,89.0,610.0,173.0,339.0
4,35004,40,460,450,63848.0,64329.0,205.0,510.0,709.0,1720.0
...,...,...,...,...,...,...,...,...,...,...
166123,83414,30,60,60,2709.0,2850.0,34.0,60.0,28.0,8.0
166124,83414,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0
166125,83414,0,0,30,1787.0,1829.0,0.0,20.0,0.0,0.0
166126,83414,0,40,40,4263.0,4580.0,242.0,30.0,1394.0,743.0


In [None]:
summedTaxes = cleanedTaxes.groupBy("zipcode").sum()
pd_summedTaxes = summedTaxes.toPandas()
pd_summedTaxes

Unnamed: 0,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(AGI),sum(total_income_amount),sum(taxable_interest_amount),sum(number_of_returns_with_total_income),sum(net_capital_gains),sum(biz_net_income)
0,35071,210426,2890,3270,4540,425459.0,429825.0,2588.0,7290.0,3278.0,10490.0
1,36525,219150,320,370,690,39742.0,40117.0,91.0,890.0,23.0,480.0
2,36538,219228,50,40,120,5035.0,5067.0,4.0,140.0,0.0,0.0
3,85253,511518,3540,3820,4070,3131454.0,3182950.0,100700.0,7860.0,576957.0,103310.0
4,85321,511926,610,430,1130,49119.0,49500.0,218.0,1370.0,142.0,732.0
...,...,...,...,...,...,...,...,...,...,...,...
27683,25119,150714,80,90,150,7317.0,7360.0,2.0,210.0,0.0,0.0
27684,25607,153642,90,180,230,12877.0,12918.0,19.0,280.0,0.0,0.0
27685,26431,158586,1140,1140,1600,119880.0,120676.0,453.0,2670.0,628.0,1893.0
27686,54515,327090,110,120,130,10188.0,10462.0,31.0,250.0,290.0,308.0


In [None]:
cleanedMarkets = (markets
  .selectExpr("*")
  .groupBy("zip")
  .count()
  .selectExpr("double(count) as count", "zip"))

joined = (cleanedMarkets.join(summedTaxes, cleanedMarkets.zip == summedTaxes.zipcode, "inner"))
pd_joined = joined.toPandas()
pd_joined

Unnamed: 0,count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(AGI),sum(total_income_amount),sum(taxable_interest_amount),sum(number_of_returns_with_total_income),sum(net_capital_gains),sum(biz_net_income)
0,1.0,35071,35071,210426,2890,3270,4540,425459.0,429825.0,2588.0,7290.0,3278.0,10490.0
1,1.0,85321,85321,511926,610,430,1130,49119.0,49500.0,218.0,1370.0,142.0,732.0
2,1.0,91367,91367,548202,10360,7610,10400,1870437.0,1915969.0,20817.0,20270.0,98110.0,105361.0
3,2.0,95476,95476,572856,7930,6000,8680,1397084.0,1430340.0,19539.0,15630.0,192523.0,94644.0
4,2.0,80033,80033,480198,7370,3950,5750,680721.0,691183.0,5231.0,13100.0,21096.0,22839.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6058,1.0,29639,29639,177834,200,230,350,24505.0,24800.0,134.0,510.0,374.0,447.0
6059,1.0,57430,57430,344580,540,450,500,74532.0,76631.0,815.0,1090.0,7833.0,3171.0
6060,1.0,23109,23109,138654,400,470,510,62323.0,63225.0,649.0,1010.0,1709.0,1600.0
6061,1.0,54515,54515,327090,110,120,130,10188.0,10462.0,31.0,250.0,290.0,308.0


In [None]:
joined.printSchema()

root
 |-- count: double (nullable = false)
 |-- zip: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- sum(zipcode): long (nullable = true)
 |-- sum(single_returns): long (nullable = true)
 |-- sum(joint_returns): long (nullable = true)
 |-- sum(numdep): long (nullable = true)
 |-- sum(AGI): double (nullable = true)
 |-- sum(total_income_amount): double (nullable = true)
 |-- sum(taxable_interest_amount): double (nullable = true)
 |-- sum(number_of_returns_with_total_income): double (nullable = true)
 |-- sum(net_capital_gains): double (nullable = true)
 |-- sum(biz_net_income): double (nullable = true)



In [None]:
from pyspark.sql import functions as F

# Data Engineering:
# Drop 'zipcode' and 'sum(zipcode)' columns
joined = joined.drop("zipcode", "sum(zipcode)")

# Add 'total_taxpayers' column (single_returns + 2 * joint_returns)
joined = joined.withColumn("total_taxpayers", F.col("sum(single_returns)") + 2 * F.col("sum(joint_returns)"))

# Add 'average_income_amount' column (total_income_amount / taxpayers)
joined = joined.withColumn("average_income_amount", F.col("sum(total_income_amount)") / F.col("total_taxpayers"))

pd_joined = joined.toPandas()
pd_joined

Unnamed: 0,count,zip,sum(single_returns),sum(joint_returns),sum(numdep),sum(AGI),sum(total_income_amount),sum(taxable_interest_amount),sum(number_of_returns_with_total_income),sum(net_capital_gains),sum(biz_net_income),total_taxpayers,average_income_amount
0,1.0,35071,2890,3270,4540,425459.0,429825.0,2588.0,7290.0,3278.0,10490.0,9430,45.580594
1,1.0,85321,610,430,1130,49119.0,49500.0,218.0,1370.0,142.0,732.0,1470,33.673469
2,1.0,91367,10360,7610,10400,1870437.0,1915969.0,20817.0,20270.0,98110.0,105361.0,25580,74.901056
3,2.0,95476,7930,6000,8680,1397084.0,1430340.0,19539.0,15630.0,192523.0,94644.0,19930,71.768189
4,2.0,80033,7370,3950,5750,680721.0,691183.0,5231.0,13100.0,21096.0,22839.0,15270,45.264113
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6058,1.0,29639,200,230,350,24505.0,24800.0,134.0,510.0,374.0,447.0,660,37.575758
6059,1.0,57430,540,450,500,74532.0,76631.0,815.0,1090.0,7833.0,3171.0,1440,53.215972
6060,1.0,23109,400,470,510,62323.0,63225.0,649.0,1010.0,1709.0,1600.0,1340,47.182836
6061,1.0,54515,110,120,130,10188.0,10462.0,31.0,250.0,290.0,308.0,350,29.891429


In [None]:
joined.printSchema()

root
 |-- count: double (nullable = false)
 |-- zip: string (nullable = true)
 |-- sum(single_returns): long (nullable = true)
 |-- sum(joint_returns): long (nullable = true)
 |-- sum(numdep): long (nullable = true)
 |-- sum(AGI): double (nullable = true)
 |-- sum(total_income_amount): double (nullable = true)
 |-- sum(taxable_interest_amount): double (nullable = true)
 |-- sum(number_of_returns_with_total_income): double (nullable = true)
 |-- sum(net_capital_gains): double (nullable = true)
 |-- sum(biz_net_income): double (nullable = true)
 |-- total_taxpayers: long (nullable = true)
 |-- average_income_amount: double (nullable = true)



In [None]:
from pyspark.sql.functions import col, count, when

# Counting nulls in each column
null_counts = joined.select([count(when(col(c).isNull(), c)).alias(c) for c in joined.columns])
null_counts.show()

prepped = joined.dropna()

pd_prepped = prepped.toPandas()
pd_prepped

+-----+---+-------------------+------------------+-----------+--------+------------------------+----------------------------+----------------------------------------+----------------------+-------------------+---------------+---------------------+
|count|zip|sum(single_returns)|sum(joint_returns)|sum(numdep)|sum(AGI)|sum(total_income_amount)|sum(taxable_interest_amount)|sum(number_of_returns_with_total_income)|sum(net_capital_gains)|sum(biz_net_income)|total_taxpayers|average_income_amount|
+-----+---+-------------------+------------------+-----------+--------+------------------------+----------------------------+----------------------------------------+----------------------+-------------------+---------------+---------------------+
|    0|  0|                  0|                 0|          0|       0|                       0|                           0|                                       0|                     0|                  0|              0|                    0|
+-----+-

Unnamed: 0,count,zip,sum(single_returns),sum(joint_returns),sum(numdep),sum(AGI),sum(total_income_amount),sum(taxable_interest_amount),sum(number_of_returns_with_total_income),sum(net_capital_gains),sum(biz_net_income),total_taxpayers,average_income_amount
0,1.0,35071,2890,3270,4540,425459.0,429825.0,2588.0,7290.0,3278.0,10490.0,9430,45.580594
1,1.0,85321,610,430,1130,49119.0,49500.0,218.0,1370.0,142.0,732.0,1470,33.673469
2,1.0,91367,10360,7610,10400,1870437.0,1915969.0,20817.0,20270.0,98110.0,105361.0,25580,74.901056
3,2.0,95476,7930,6000,8680,1397084.0,1430340.0,19539.0,15630.0,192523.0,94644.0,19930,71.768189
4,2.0,80033,7370,3950,5750,680721.0,691183.0,5231.0,13100.0,21096.0,22839.0,15270,45.264113
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6058,1.0,29639,200,230,350,24505.0,24800.0,134.0,510.0,374.0,447.0,660,37.575758
6059,1.0,57430,540,450,500,74532.0,76631.0,815.0,1090.0,7833.0,3171.0,1440,53.215972
6060,1.0,23109,400,470,510,62323.0,63225.0,649.0,1010.0,1709.0,1600.0,1340,47.182836
6061,1.0,54515,110,120,130,10188.0,10462.0,31.0,250.0,290.0,308.0,350,29.891429


In [None]:
nonFeatureCols = ["zip", "count"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]

# VectorAssembler Assembles all of these columns into one single vector.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)

In [None]:
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) # Why execute count here??
print(test.count())

4294
1769


In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("count")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5, 10])
  .addGrid(rfModel.numTrees, [20, 60])
  .build())

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("count")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel: uid=RandomForestRegressor_9e2d8535a8a3, numTrees=60, numFeatures=11


{Param(parent='RandomForestRegressor_9e2d8535a8a3', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestRegressor_9e2d8535a8a3', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_9e2d8535a8a3', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_9e2d8535a8a3', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supporte

In [None]:
pipelineFitted.bestModel

holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction",
    "double(round(prediction)) as prediction",
    "count",
    """CASE double(round(prediction)) = count
  WHEN true then 1
  ELSE 0
END as equal"""))

pd_holdout = holdout.toPandas()
pd_holdout

Unnamed: 0,raw_prediction,prediction,count,equal
0,1.515545,2.0,1.0,0
1,2.088867,2.0,1.0,0
2,1.625337,2.0,1.0,0
3,1.482662,1.0,1.0,1
4,1.100080,1.0,1.0,1
...,...,...,...,...
1764,1.286251,1.0,4.0,0
1765,1.151596,1.0,5.0,0
1766,1.311554,1.0,5.0,0
1767,1.263667,1.0,6.0,0


In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

rm = RegressionMetrics(holdout.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")



MSE:  0.4030525720746185
MAE:  0.250423968343697
RMSE Squared:  0.6348642154623447
R Squared:  -0.16700962440576284
Explained Variance:  0.0849096668616998 



In [None]:
holdout.selectExpr("sum(equal)/sum(1)").show()

+---------------------+
|(sum(equal) / sum(1))|
+---------------------+
|   0.8027133973996609|
+---------------------+

