# MODELLING

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('FoodPricePrediction').master('local[*]').getOrCreate()

spark.sparkContext.appName

25/04/15 04:15:33 WARN Utils: Your hostname, codespaces-ebd91c resolves to a loopback address: 127.0.0.1; using 10.0.1.23 instead (on interface eth0)
25/04/15 04:15:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/15 04:15:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/15 04:15:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


'FoodPricePrediction'

In [2]:
path = "data/FoodPriceData"
food_price_data = spark.read.csv(path, inferSchema=True, header=True)

In [3]:
food_price_data.show(5)

+----+-----+-------+--------+--------+------------------+-------------+-----+---------+------+---------------+----------------+--------------------+
|year|month| region|  county|  market|          category|    commodity| unit|pricetype| price|avg_rainfall_mm|normalized_price|log_normalized_price|
+----+-----+-------+--------+--------+------------------+-------------+-----+---------+------+---------------+----------------+--------------------+
|2014|    1|  Coast| Mombasa| Mombasa|cereals and tubers|        Maize|   KG|Wholesale| 38.44|         259.33|           38.44|  3.6747805297344347|
|2014|    1|  Coast| Mombasa| Mombasa|   pulses and nuts|        Beans|   KG|Wholesale| 79.99|         259.33|           79.99|  4.3943256902608985|
|2014|    1|  Coast| Mombasa| Mombasa|   pulses and nuts|  Beans (dry)|90 KG|Wholesale|5738.0|         259.33|           63.76|   4.170688128809434|
|2014|    1|Eastern|   Kitui|   Kitui|   pulses and nuts|  Beans (dry)|   KG|   Retail|  74.0|         259

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import PipelineModel

In [5]:
# Drop nulls from essential columns
model_data = food_price_data.dropna(subset=[
    "region", "county", "market", "category", "commodity", "unit", "pricetype",
    "log_normalized_price", "avg_rainfall_mm"
])


In [6]:
# Index categorical columns
categorical_cols = ["region", "county", "market", "category", "commodity", "unit", "pricetype"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed", handleInvalid="keep") for col in categorical_cols]

In [7]:
# Assemble features
feature_cols = [col+"_indexed" for col in categorical_cols] + ["month", "year", "avg_rainfall_mm"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [8]:
# Define model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="log_normalized_price")

In [9]:
# Build pipeline
pipeline = Pipeline(stages=indexers + [assembler, dt])

In [10]:
# Split the data
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

In [11]:
# Evaluator
evaluator = RegressionEvaluator(
    labelCol="log_normalized_price",
    predictionCol="prediction",
    metricName="rmse"
)

In [12]:
# Cross-validation with updated param grid
paramGrid = (
    ParamGridBuilder()
    .addGrid(dt.maxDepth, [5, 10])
    .addGrid(dt.maxBins, [100])
    .build()
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

In [13]:
# Train model with CV
cv_model = cv.fit(train_data)

25/04/15 04:15:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [14]:
# Predict
predictions = cv_model.transform(test_data)

In [15]:
# Evaluate
rmse = evaluator.evaluate(predictions)
print(f"Cross-Validated RMSE: {rmse:.2f}")

Cross-Validated RMSE: 0.24


In [16]:
# Save entire model pipeline
cv_model.bestModel.write().overwrite().save("model/log_price_pipeline_model")

                                                                                