In [54]:
import sys
sys.path.append("/usr/local/lib/python3.10/dist-packages")  

import findspark
findspark.init("/opt/spark/spark-3.5.4-bin-hadoop3")  # Path to your Spark installation


from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


spark = SparkSession.builder.appName("Yield Prediction").getOrCreate()


25/01/22 10:02:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [55]:
data = spark.read.csv("./yield_df.csv", header=True, inferSchema=True)
data.show()

+----+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|s_no|   Area|       Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+----+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|   0|Albania|      Maize|1990|      36613|                       1485.0|            121.0|   16.37|
|   1|Albania|   Potatoes|1990|      66667|                       1485.0|            121.0|   16.37|
|   2|Albania|Rice, paddy|1990|      23333|                       1485.0|            121.0|   16.37|
|   3|Albania|    Sorghum|1990|      12500|                       1485.0|            121.0|   16.37|
|   4|Albania|   Soybeans|1990|       7000|                       1485.0|            121.0|   16.37|
|   5|Albania|      Wheat|1990|      30197|                       1485.0|            121.0|   16.37|
|   6|Albania|      Maize|1991|      29068|                       1485.0|            121.0|

In [56]:
# Handle missing values (if any)
data = data.dropna()


In [57]:
from pyspark.ml.feature import StringIndexer



# Preprocess the data (index, encode, assemble)
area_indexer = StringIndexer(inputCol="Area", outputCol="Area_indexed").fit(data)


item_indexer = StringIndexer(inputCol="Item", outputCol="Item_indexed").fit(data)



encoder = OneHotEncoder(inputCols=["Area_indexed", "Item_indexed"], 
                        outputCols=["Area_encoded", "Item_encoded"])



assembler = VectorAssembler(
    inputCols=["Area_encoded", "Item_encoded", "average_rain_fall_mm_per_year", 
               "pesticides_tonnes", "avg_temp"], 
    outputCol="features")



# Create pipeline and transform data
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[area_indexer, item_indexer, encoder, assembler])
prepared_data = pipeline.fit(data).transform(data)

# Split into training and test sets
train_data, test_data = prepared_data.randomSplit([0.8, 0.2])

In [58]:
data.printSchema()

root
 |-- s_no: integer (nullable = true)
 |-- Area: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- hg/ha_yield: integer (nullable = true)
 |-- average_rain_fall_mm_per_year: double (nullable = true)
 |-- pesticides_tonnes: double (nullable = true)
 |-- avg_temp: double (nullable = true)



In [59]:
#from pyspark.ml.regression import LinearRegression

#lr = LinearRegression(featuresCol="features", labelCol="hg/ha_yield")

#model = lr.fit(train_data)

#predictions = model.transform(test_data)
#predictions.show()




In [60]:
lr = LinearRegression(featuresCol="features", labelCol="hg/ha_yield", regParam=0.1)
lr_model = lr.fit(train_data)

# Predictions and Evaluation
lr_predictions = lr_model.transform(test_data)
lr_evaluator = RegressionEvaluator(labelCol="hg/ha_yield", predictionCol="prediction", metricName="rmse")
lr_rmse = lr_evaluator.evaluate(lr_predictions)
lr_r2 = lr_evaluator.setMetricName("r2").evaluate(lr_predictions)

print(f"Linear Regression: RMSE = {lr_rmse:.2f}, R2 = {lr_r2:.2f}")



lr_predictions.show(5)


Linear Regression: RMSE = 42503.12, R2 = 0.75
+----+-------+--------+----+-----------+-----------------------------+-----------------+--------+------------+------------+----------------+-------------+--------------------+------------------+
|s_no|   Area|    Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|Area_indexed|Item_indexed|    Area_encoded| Item_encoded|            features|        prediction|
+----+-------+--------+----+-----------+-----------------------------+-----------------+--------+------------+------------+----------------+-------------+--------------------+------------------+
|   6|Albania|   Maize|1991|      29068|                       1485.0|            121.0|   15.36|        71.0|         1.0|(100,[71],[1.0])|(9,[1],[1.0])|(112,[71,101,109,...| 27541.69234901284|
|  19|Albania|Potatoes|1993|      98446|                       1485.0|            121.0|   16.05|        71.0|         0.0|(100,[71],[1.0])|(9,[0],[1.0])|(112,[71,100,109,...

In [61]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="hg/ha_yield", maxBins=128)
dt_model = dt.fit(train_data)

# Predictions and Evaluation
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(labelCol="hg/ha_yield", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
dt_r2 = dt_evaluator.setMetricName("r2").evaluate(dt_predictions)

print(f"Decision Tree: RMSE = {dt_rmse:.2f}, R2 = {dt_r2:.2f}")


Decision Tree: RMSE = 40047.70, R2 = 0.78


In [62]:
rf = RandomForestRegressor(featuresCol="features", labelCol="hg/ha_yield", maxBins=128, numTrees=50)
rf_model = rf.fit(train_data)

# Predictions and Evaluation
rf_predictions = rf_model.transform(test_data)
rf_evaluator = RegressionEvaluator(labelCol="hg/ha_yield", predictionCol="prediction", metricName="rmse")
rf_rmse = rf_evaluator.evaluate(rf_predictions)
rf_r2 = rf_evaluator.setMetricName("r2").evaluate(rf_predictions)

print(f"Random Forest: RMSE = {rf_rmse:.2f}, R2 = {rf_r2:.2f}")



spark.stop()

Random Forest: RMSE = 44487.19, R2 = 0.72
