In [None]:
# ------------------------------------------
# 1. Import Libraries
# ------------------------------------------
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, when, sum
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
# ------------------------------------------
# 2. Initialize Spark Session
# ------------------------------------------
spark = SparkSession.builder \
    .appName("Crop Yield Prediction - Decision Tree") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


In [None]:
# ------------------------------------------
# 3. Load Dataset
# ------------------------------------------
file_path = "crop_yield.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

if data.count() > 0:
    print("Dataset loaded successfully!")
else:
    print("No data found in the dataset.")





Dataset loaded successfully!


In [None]:
# Display dataset schema and preview data
data.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Soil_Type: string (nullable = true)
 |-- Crop: string (nullable = true)
 |-- Rainfall_mm: double (nullable = true)
 |-- Temperature_Celsius: double (nullable = true)
 |-- Fertilizer_Used: boolean (nullable = true)
 |-- Irrigation_Used: boolean (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Days_to_Harvest: integer (nullable = true)
 |-- Yield_tons_per_hectare: double (nullable = true)



In [None]:
data.show(5)

+------+---------+-------+-----------------+-------------------+---------------+---------------+-----------------+---------------+----------------------+
|Region|Soil_Type|   Crop|      Rainfall_mm|Temperature_Celsius|Fertilizer_Used|Irrigation_Used|Weather_Condition|Days_to_Harvest|Yield_tons_per_hectare|
+------+---------+-------+-----------------+-------------------+---------------+---------------+-----------------+---------------+----------------------+
|  West|    Sandy| Cotton|897.0772391101236| 27.676966373377603|          false|           true|           Cloudy|            122|     6.555816258223593|
| South|     Clay|   Rice|992.6732816189208|  18.02614225436302|           true|           true|            Rainy|            140|       8.5273409063236|
| North|     Loam| Barley|147.9980252926104|  29.79404241557257|          false|          false|            Sunny|            106|     1.127443335982929|
| North|    Sandy|Soybean|986.8663313367325|  16.64419019137728|          fa

In [None]:
# ------------------------------------------
# 4. Check for Missing Values
# ------------------------------------------
print("\nMissing Values in Each Column:")
missing_values = data.select(
    [(sum(col(column).isNull().cast("int")).alias(column)) for column in data.columns]
)
missing_values.show()



Missing Values in Each Column:
+------+---------+----+-----------+-------------------+---------------+---------------+-----------------+---------------+----------------------+
|Region|Soil_Type|Crop|Rainfall_mm|Temperature_Celsius|Fertilizer_Used|Irrigation_Used|Weather_Condition|Days_to_Harvest|Yield_tons_per_hectare|
+------+---------+----+-----------+-------------------+---------------+---------------+-----------------+---------------+----------------------+
|     0|        0|   0|          0|                  0|              0|              0|                0|              0|                     0|
+------+---------+----+-----------+-------------------+---------------+---------------+-----------------+---------------+----------------------+



In [None]:
# ------------------------------------------
# 5. Handle Categorical Columns
# ------------------------------------------
categorical_columns = ["Region", "Soil_Type", "Weather_Condition"]  # Exclude 'Crop'
crop_column = "Crop"

# StringIndexer for categorical columns
indexers = [
    StringIndexer(inputCol=c, outputCol=c + "_Index").fit(data)
    for c in categorical_columns + [crop_column]
]

for indexer in indexers:
    data = indexer.transform(data)

# Keep the original 'Crop' column for reference during prediction
data = data.withColumnRenamed(crop_column, "Crop_Name")

# Drop other categorical columns
data = data.drop(*categorical_columns)

# One-hot encode the indexed columns
encoder = OneHotEncoder(
    inputCols=[c + "_Index" for c in categorical_columns],
    outputCols=[c + "_OHE" for c in categorical_columns]
)
data = encoder.fit(data).transform(data)


In [None]:
# ------------------------------------------
# 6. Assemble Features
# ------------------------------------------
feature_columns = [
    c for c in data.columns
    if c not in ["Yield_tons_per_hectare", "Crop_Name", crop_column + "_Index"]
]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
full_data = assembler.transform(data).select("features", "Yield_tons_per_hectare", "Crop_Name")


In [None]:
# ------------------------------------------
# 7. Train/Test Split
# ------------------------------------------
train_data, test_data = full_data.randomSplit([0.8, 0.2], seed=42)


In [None]:
# ------------------------------------------
# 8. Train Initial Decision Tree
# ------------------------------------------
dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="Yield_tons_per_hectare",
    maxDepth=5,
    seed=42
)
dt_model = dt.fit(train_data)


In [None]:
# ------------------------------------------
# 9. Evaluate Initial Model
# ------------------------------------------
predictions = dt_model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="Yield_tons_per_hectare",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

r2 = RegressionEvaluator(
    labelCol="Yield_tons_per_hectare",
    predictionCol="prediction",
    metricName="r2"
).evaluate(predictions)

print(f"RMSE (Full Model) = {rmse:.3f}")
print(f"R² (Full Model)  = {r2:.3f}")

# Use Mean Absolute Error (MAE) as another metric
mae = RegressionEvaluator(
    labelCol="Yield_tons_per_hectare",
    predictionCol="prediction",
    metricName="mae"
).evaluate(predictions)

# Calculate accuracy percentage (R^2 * 100 for interpretability)
accuracy_percentage = r2 * 100

print(f"Mean Absolute Error (Decision Tree Full Model) = {mae:.3f}")
print(f"Decision Tree Model Accuracy = {accuracy_percentage:.2f}%")


RMSE (Full Model) = 0.546
R² (Full Model)  = 0.896
Mean Absolute Error (Decision Tree Full Model) = 0.436
Decision Tree Model Accuracy = 89.65%


In [None]:
# ------------------------------------------
# 10. Extract Feature Importance
# ------------------------------------------
importances_array = dt_model.featureImportances.toArray()

feature_importance_dict = {
    name: imp for name, imp in zip(feature_columns, importances_array)
}

sorted_features_by_imp = sorted(
    feature_importance_dict.items(),
    key=lambda x: x[1],
    reverse=True
)

print("\nDecision Tree Feature Importances:")
for feat, imp_val in sorted_features_by_imp:
    print(f"{feat}: {imp_val}")

# Identify the top 3 features
top_3_features = [feat for feat, _ in sorted_features_by_imp[:3]]
print("\nTop 3 Features:", top_3_features)



Decision Tree Feature Importances:
Rainfall_mm: 0.6422548834665147
Fertilizer_Used: 0.21861117683157716
Irrigation_Used: 0.13913393970190813
Temperature_Celsius: 0.0
Days_to_Harvest: 0.0
Region_Index: 0.0
Soil_Type_Index: 0.0
Weather_Condition_Index: 0.0
Region_OHE: 0.0
Soil_Type_OHE: 0.0
Weather_Condition_OHE: 0.0

Top 3 Features: ['Rainfall_mm', 'Fertilizer_Used', 'Irrigation_Used']


In [None]:
# ------------------------------------------
# 11. Retrain Model with Top Features
# ------------------------------------------
selected_data = data.select(
    *top_3_features, "Yield_tons_per_hectare"
)

assembler_3 = VectorAssembler(
    inputCols=top_3_features,
    outputCol="features"
)
final_data_3 = assembler_3.transform(selected_data).select("features", "Yield_tons_per_hectare")

train_data_3, test_data_3 = final_data_3.randomSplit([0.8, 0.2], seed=42)

dt_3 = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="Yield_tons_per_hectare",
    maxDepth=30,
    seed=42
)
dt_model_3 = dt_3.fit(train_data_3)

In [None]:


# Evaluate retrained model
predictions_3 = dt_model_3.transform(test_data_3)

rmse_3 = evaluator.evaluate(predictions_3)
r2_3 = RegressionEvaluator(
    labelCol="Yield_tons_per_hectare",
    predictionCol="prediction",
    metricName="r2"
).evaluate(predictions_3)

print(f"RMSE (Top 3 Features) = {rmse_3:.3f}")
print(f"R² (Top 3 Features)  = {r2_3:.3f}")

# Use Mean Absolute Error (MAE) as another metric
mae_3 = RegressionEvaluator(
    labelCol="Yield_tons_per_hectare",
    predictionCol="prediction",
    metricName="mae"
).evaluate(predictions_3)

# Calculate accuracy percentage (R^2 * 100 for interpretability)
accuracy_percentage_3 = r2_3 * 100

print(f"Mean Absolute Error (Reduced Decision Tree Model) = {mae_3:.3f}")
print(f"Reduced Decision Tree Model Accuracy = {accuracy_percentage_3:.2f}%")


RMSE (Top 3 Features) = 0.523
R² (Top 3 Features)  = 0.905
Mean Absolute Error (Reduced Decision Tree Model) = 0.418
Reduced Decision Tree Model Accuracy = 90.52%


In [None]:
# ------------------------------------------
# 12. Predict Crop Yield with User Input
# ------------------------------------------
def validate_input(value, feature_name, min_value, max_value):
    try:
        value = float(value)
        if min_value is not None and max_value is not None:
            if value < min_value or value > max_value:
                print(f"Error: {feature_name} value must be between {min_value} and {max_value}.")
                return None
        return value
    except ValueError:
        print(f"Invalid input for {feature_name}. Please enter a numeric value.")
        return None

def predict_crop_yield(dt_model, feature_columns, crop_mapping):
    feature_ranges = {
        "Rainfall_mm": (100.0, 999.99),
        "Fertilizer_Used": (0.0, 1.0),
        "Irrigation_Used": (0.0, 1.0),
        "Crop_Index": (0, max(crop_mapping.keys()))  # Assuming Crop_Index starts from 0
    }

    print("\nEnter the following feature values:")
    user_input = []
    for feature in feature_columns:
        while True:
            value = input(f"Enter value for {feature}: ")
            validated_value = validate_input(value, feature, *feature_ranges.get(feature, (None, None)))
            if validated_value is not None:
                user_input.append(validated_value)
                break

    user_input_rdd = spark.sparkContext.parallelize([Row(**dict(zip(feature_columns, user_input)))])
    user_input_df = spark.createDataFrame(user_input_rdd)

    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    user_input_vector = assembler.transform(user_input_df).select("features")

    prediction = dt_model.transform(user_input_vector).select("prediction").collect()[0]["prediction"]
    predicted_crop = crop_mapping[int(user_input[3])]  # Assuming Crop_Index is the 4th feature
    print(f"\nPredicted Crop Yield (tons per hectare): {prediction:.3f}")
    print(f"Predicted Crop: {predicted_crop}")

crop_mapping = {row["Crop_Index"]: row["Crop_Name"] for row in data.select("Crop_Name", "Crop_Index").distinct().collect()}

print("\nCrop Index Mapping:")
for crop_index, crop_name in crop_mapping.items():
    print(f"{int(crop_index)} = {crop_name}")

# Example Usage
top_3_features = ["Rainfall_mm", "Fertilizer_Used", "Irrigation_Used", "Crop_Index"]
predict_crop_yield(dt_model_3, top_3_features, crop_mapping)



Crop Index Mapping:
4 = Cotton
1 = Rice
2 = Barley
5 = Soybean
0 = Maize
3 = Wheat

Enter the following feature values:
Enter value for Rainfall_mm: 500
Enter value for Fertilizer_Used: 0
Enter value for Irrigation_Used: 1
Enter value for Crop_Index: 3

Predicted Crop Yield (tons per hectare): 4.246
Predicted Crop: Wheat


In [None]:
# Select the relevant columns from the predictions DataFrame
export_data = predictions.select("Yield_tons_per_hectare", "prediction")

# Export to a CSV file
export_data.coalesce(1).write.csv("dt_predictions.csv", header=True, mode="overwrite")

print("Dataset exported successfully to 'dt_predictions.csv'")


Dataset exported successfully to 'dt_predictions.csv'
