In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=df7462d561cf30dd16f128ff7c5e3b436194dab691e0e0deaeb769e305236376
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
from pyspark import ml
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.sql.functions import when, col, isnan, count, mean
import pyspark.sql.types as pyspark_type

In [None]:
# Create a SparkSession
spark_sess= SparkSession.builder.appName("Farmers Market Analysis").getOrCreate()

dataframe = spark_sess.read.csv("/content/data/farmers_markets_from_usda.csv", header=True, inferSchema=True)

In [None]:
dataframe.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+-----------+----------+---------+--------------------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+----------------+
|   FMID|          MarketName|             Website|            Facebook|             Twitter|             Youtube|          OtherMedia|              street|        city|              County|               State|  zip|         Season1Date|         Season1Time|         Season2Date|         Season2Time|Season3Date|Season3Time|Season4Date|Season4

In [None]:
# Print the number of rows in the dataframe
print('Number of rows:', dataframe.count())

# Print the number of columns in the dataframe
print('Number of columns:', len(dataframe.columns))

Number of rows: 8804
Number of columns: 59


In [None]:
dataframe.describe().show()

+-------+-----------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+-----------+---------+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+------+----+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+-------------------+
|summary|             FMID|          MarketName|             Website|            Facebook|    Twitter|             Youtube|          OtherMedia|              street|       city|   County|  State|               zip|         Season1Date|         Season1Time|         Season2Date|         Seas

In [None]:
#drop the columns with more than 40% null data
dataframe = dataframe.select([col_name for col_name in dataframe.columns if dataframe.select(col(col_name)).na.drop().count() / dataframe.count() >= 0.4])

In [None]:
dataframe.show()

+-------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+-----+--------------------+--------------------+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+----------------+
|   FMID|          MarketName|             Website|            Facebook|              street|        city|              County|               State|  zip|         Season1Date|         Season1Time|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Nuts|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|      updateTime|
+-------+--------------------+------

In [None]:
dataframe.select([count(when(isnan(dataframe_col) | col(dataframe_col).isNull(), dataframe_col)).alias(dataframe_col) for dataframe_col in dataframe.columns]).show()

+----+----------+-------+--------+------+----+------+-----+---+-----------+-----------+---+---+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+----------+
|FMID|MarketName|Website|Facebook|street|city|County|State|zip|Season1Date|Season1Time|  x|  y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Nuts|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|updateTime|
+----+----------+-------+--------+------+----+------+-----+---+-----------+-----------+---+---+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+-

In [None]:
d1 = dataframe.drop_duplicates().count()
d2 = dataframe.count()
print('Total number of duplicate rows', d2  - d1)
dataframe = dataframe.drop_duplicates()

Total number of duplicate rows 0


In [None]:
dataframe.show(2)

+-------+--------------------+--------------------+--------+--------------------+---------+------+-------------+-----+--------------------+--------------------+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+---------------+
|   FMID|          MarketName|             Website|Facebook|              street|     city|County|        State|  zip|         Season1Date|         Season1Time|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Nuts|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|     updateTime|
+-------+--------------------+--------------------+--------+--------------------+---------+------+------------

In [None]:
# List of column names to be processed
columns = ["Credit", 'WIC', 'WICcash', 'SFMNP', 'SNAP', 'Organic', 'Bakedgoods', 'Cheese', 'Crafts', 'Flowers', 'Eggs', 'Seafood', 'Herbs', 'Vegetables', 'Honey', 'Jams', 'Maple', 'Meat', 'Nursery',
           'Plants', 'Poultry', 'Prepared', 'Soap', 'Trees', 'Wine', 'Coffee', 'Beans', 'Fruits', 'Grains', 'Juices', 'Mushrooms', 'PetFood', 'Tofu', 'WildHarvested', 'NUTS']

# Loop through each column to process
for cols_name in columns:
    # Apply transformation to the current column
    dataframe = dataframe.withColumn(cols_name,
                                     # Check if values are "Y", "true", or "True", replace with 1
                                     when(col(cols_name).isin("Y", "true", "True"), 1)
                                     # Check if values are "N", "False", "No", "NOT APPLICABLE", or "-", replace with 0
                                     .when(col(cols_name).isin("N", "False", "No", "NOT APPLICABLE", "-"), 0)
                                     # If value doesn't match any condition, set to None
                                     .otherwise(None))


In [None]:
dataframe.show()

+-------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+--------------+-----+--------------------+--------------------+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+----+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+-------------------+
|   FMID|          MarketName|             Website|            Facebook|              street|       city|        County|         State|  zip|         Season1Date|         Season1Time|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|NUTS|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|         updateTime|
+-------+--------------------+--------------------+-----

In [None]:
# Loop through each column in the DataFrame
for col_name in dataframe.columns:
    # Get the data type of the current column
    col_dtype = dataframe.schema[col_name].dataType

    # Print the current column name
    print(f'Column: {col_name}')

    # Check the data type and apply appropriate handling
    if col_dtype in (int, float):
        # For numeric columns (int or float), fill null values with mean
        mean_value = 0  # You might want to calculate the actual mean value here
        dataframe = dataframe.withColumn(col_name, when(col(col_name).isNull(), mean_value).otherwise(col(col_name)))
    elif str(col_dtype) == 'DoubleType()':
        # For DoubleType columns, fill null values with most common value (mode)
        most_common_value = 0.0  # You might want to calculate the actual most common value here
        dataframe = dataframe.withColumn(col_name, when(col(col_name).isNull(), most_common_value).otherwise(col(col_name)))
    elif str(col_dtype) == 'StringType()':
        # For StringType columns, fill null values with 'NOT APPLICABLE'
        most_common_value = 'NOT APPLICABLE'
        dataframe = dataframe.withColumn(col_name, when(col(col_name).isNull(), most_common_value).otherwise(col(col_name)))
    elif str(col_dtype) == 'BoolType()':
        # For BoolType columns, fill null values with False
        most_common_value = False
        dataframe = dataframe.withColumn(col_name, when(col(col_name).isNull(), most_common_value).otherwise(col(col_name)))


Column: FMID
Column: MarketName
Column: Website
Column: Facebook
Column: street
Column: city
Column: County
Column: State
Column: zip
Column: Season1Date
Column: Season1Time
Column: x
Column: y
Column: Credit
Column: WIC
Column: WICcash
Column: SFMNP
Column: SNAP
Column: Organic
Column: Bakedgoods
Column: Cheese
Column: Crafts
Column: Flowers
Column: Eggs
Column: Seafood
Column: Herbs
Column: Vegetables
Column: Honey
Column: Jams
Column: Maple
Column: Meat
Column: Nursery
Column: NUTS
Column: Plants
Column: Poultry
Column: Prepared
Column: Soap
Column: Trees
Column: Wine
Column: Coffee
Column: Beans
Column: Fruits
Column: Grains
Column: Juices
Column: Mushrooms
Column: PetFood
Column: Tofu
Column: WildHarvested
Column: updateTime


In [None]:
dataframe = dataframe.dropna()

In [None]:
dataframe = dataframe.drop('FMID', 'MarketName','Website','Facebook', 'street', 'city', 'Twitter','Youtube','OtherMedia', 'Season1Date', 'Season1Time', 'Nuts', 'updateTime')

In [None]:
dataframe.show()

+--------------+--------------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|        County|         State|  zip|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|
+--------------+--------------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|        Ramsey|     Minnesota|55109|  -93.0877|44.960899|     0|  0|      0|    0|   0|      0| 

In [None]:
# Convert the 'zip' column to float data type
dataframe = dataframe.withColumn('zip', dataframe['zip'].cast('int'))

# Convert the 'x' column to float data type
dataframe = dataframe.withColumn('x', dataframe['x'].cast('float'))

# Convert the 'y' column to float data type
dataframe = dataframe.withColumn('y', dataframe['y'].cast('float'))


In [None]:
dataframe.show()

+--------------+--------------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|        County|         State|  zip|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|
+--------------+--------------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|        Ramsey|     Minnesota|55109|  -93.0877|  44.9609|     0|  0|      0|    0|   0|      0| 

In [None]:
ml_string_columns = [dataframe_col for dataframe_col, col_dtype in dataframe.dtypes if col_dtype == 'string']

string_encoder = [StringIndexer(inputCol=single_col, outputCol=single_col+"_").setHandleInvalid("keep") for single_col in ml_string_columns]

pipeline = Pipeline(stages=string_encoder)

ml_dataframe = pipeline.fit(dataframe).transform(dataframe)

In [None]:
ml_col_to_use = [col + "_" for col in ml_string_columns]

ml_numerical_cols = [dataframe_col for dataframe_col, col_dtype in dataframe.dtypes if col_dtype in ["int", "float", "double"]]

ml_col_to_use += ml_numerical_cols

In [None]:
ml_dataframe.select(ml_col_to_use).show()

+-------+------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|County_|State_|  zip|         x|        y|Credit|WIC|WICcash|SFMNP|SNAP|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|WildHarvested|
+-------+------+-----+----------+---------+------+---+-------+-----+----+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------------+
|  192.0|  14.0|55109|  -93.0877|  44.9609|     0|  0|      0|    0|   0|      0|         0|     0|     0|      0|   0|      0|    0|         

In [None]:
ml_dataframe = ml_dataframe.drop('WICcash', 'SFMNP', 'SNAP', 'WildHarvested', 'zip')

In [None]:
ml_col_to_use = list(set(ml_col_to_use).intersection(set(ml_dataframe.columns)))

In [None]:
ml_dataframe.select(ml_col_to_use).show()

+-------+------+----------+-----+-------+---------+-------+---------+-------+----+-----+-----+-----+------+------+--------+-------+------+------+-----+-------+----+------+------+-------+------+---+----+-----+------+----+----+----------+----------+----+
|Seafood|Coffee|Bakedgoods|Honey|Nursery|        y|Poultry|Mushrooms|County_|Tofu|  zip|Herbs|Maple|State_|Fruits|Prepared|Flowers|Juices|Crafts|Beans|PetFood|Eggs|Plants|Credit|Organic|Cheese|WIC|Wine|Trees|Grains|Jams|Meat|         x|Vegetables|Soap|
+-------+------+----------+-----+-------+---------+-------+---------+-------+----+-----+-----+-----+------+------+--------+-------+------+------+-----+-------+----+------+------+-------+------+---+----+-----+------+----+----+----------+----------+----+
|      0|     0|         0|    0|      0|  44.9609|      0|        0|  192.0|   0|55109|    0|    0|  14.0|     0|       0|      0|     0|     0|    0|      0|   0|     0|     0|      0|     0|  0|   0|    0|     0|   0|   0|  -93.0877|     

In [None]:
dataframe_final = VectorAssembler(inputCols=ml_col_to_use, outputCol="features").transform(ml_dataframe)

In [None]:
dataframe_final.show()

+--------------+--------------+----------+---------+------+---+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------+------+--------------------+
|        County|         State|         x|        y|Credit|WIC|Organic|Bakedgoods|Cheese|Crafts|Flowers|Eggs|Seafood|Herbs|Vegetables|Honey|Jams|Maple|Meat|Nursery|Plants|Poultry|Prepared|Soap|Trees|Wine|Coffee|Beans|Fruits|Grains|Juices|Mushrooms|PetFood|Tofu|County_|State_|            features|
+--------------+--------------+----------+---------+------+---+-------+----------+------+------+-------+----+-------+-----+----------+-----+----+-----+----+-------+------+-------+--------+----+-----+----+------+-----+------+------+------+---------+-------+----+-------+------+--------------------+
|        Ramsey|     Minnesota|  -93.0877|  44.9609|     0|  0|      0|         0|     0|     0|      0|  

In [None]:
(dataframe_train, dataframe_test) = dataframe_final.randomSplit([0.9, 0.1])
regression_r2_eval = RegressionEvaluator(labelCol="x", predictionCol="prediction", metricName="r2")

In [None]:
# Print a message indicating the start of linear regression training
print('Linear regression running')

# Train a Linear Regression model using the specified features and label
linear_model = LinearRegression(featuresCol='features', labelCol="x").fit(dataframe_train)

# Print a message indicating the prediction for the tested data
print('Prediction for data tested')

# Evaluate the Linear Regression model using R2 score on the test data and print the result
print("Linear regression model >> R2 score:", regression_r2_eval.evaluate(linear_model.transform(dataframe_test)))

Linear regression training
Prediction for data tested
Linear regression model >> R2 score: 1.0


In [None]:
# Print a message indicating the start of training the Random Forest model with training data
print('Training Random Forest model with train data')

# Train a Random Forest Regressor model using the specified features and label, with a specified number of bins
random_forest = RandomForestRegressor(featuresCol='features', labelCol="x", maxBins=7063).fit(dataframe_train)

# Print a message indicating prediction for the test data
print('Prediction for test data')

# Evaluate the Random Forest Regressor model using R2 score on the test data and print the result
print("Random regression forest model >> R2 score:", regression_r2_eval.evaluate(random_forest.transform(dataframe_test)))


Training Random Forest model with train data
Prediction for test data
Random regression forest model >> R2 score: 0.9405754815272025


In [None]:
# Print a message indicating the start of training the Gradient Boosting model with training data
print('Training Gradient Boosting model with train data')

# Train a Gradient Boosting Regressor model using the specified features and label, with a specified number of bins
gradient_boosting = GBTRegressor(featuresCol='features', labelCol="x", maxBins=7507).fit(dataframe_train)

# Print a message indicating prediction for the test data
print('Prediction for test data')

# Evaluate the Gradient Boosting Regressor model using R2 score on the test data and print the result
print("Gradient Boosting model >> R2 score:", regression_r2_eval.evaluate(gradient_boosting.transform(dataframe_test)))


Training Gradient Boosting model with train data
Prediction for test data
Gradient Boosting model >> R2 score: 0.9978746498249951
