In [49]:
pip install pyspark


Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip





In [50]:
pip install findspark


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [51]:
# Initialize findspark and Spark session
import findspark
findspark.init()

# Import required libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
# Data analysis libraries
import pandas as pd
import numpy as np

# Data visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning preprocessing from scikit-learn
from sklearn.preprocessing import StandardScaler as SklearnStandardScaler


In [52]:
from pyspark.sql.functions import col, when, to_date, year, month


In [53]:
# Create a Spark session 

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Demeter") \
    .getOrCreate()
print(spark)


<pyspark.sql.session.SparkSession object at 0x000001C8BD646050>


In [54]:
def load_data(file_path):
    """
    Load the dataset with semicolon separator
    
    Args:
        file_path (str): Path to the dataset
    
    Returns:
        Spark DataFrame
    """
    # Read CSV with semicolon separator
    df = spark.read.option("sep", ";") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(file_path)
    
    # Print original column names
    print("Original Column Names:", df.columns)
    
    # Rename columns to remove any unwanted characters
    for old_col in df.columns:
        new_col = old_col.strip()  # Remove leading/trailing whitespace
        df = df.withColumnRenamed(old_col, new_col)
    
    # Print cleaned column names
    print("Cleaned Column Names:", df.columns)
    
    return df

In [55]:
def preprocess_data2(df):
    """
    Preprocess the data:
    1. Handle missing values
    2. Detect and handle outliers
    3. Feature engineering
    
    Args:
        df (Spark DataFrame): Input DataFrame
    
    Returns:
        Preprocessed Spark DataFrame
    """
    # 3.1 Missing Value Handling
    # Replace missing values with median for numeric columns
    numeric_cols = ['Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 
                    'N', 'P', 'K', 'Soil_Quality']
    
    # Function to calculate median safely
    from pyspark.sql.functions import col, percentile_approx
    
    # Replace missing values
    for col_name in numeric_cols:
        # Calculate median using percentile_approx
        median_val = df.select(percentile_approx(col(col_name), 0.5)).first()[0]
        df = df.na.fill({col_name: median_val})
    
    # 3.2 Outlier Detection and Handling (Using IQR method)
    def remove_outliers(dataframe, columns):
        for col_name in columns:
            # Calculate Q1, Q3, and IQR
            q1 = dataframe.approxQuantile(col_name, [0.25], 0.01)[0]
            q3 = dataframe.approxQuantile(col_name, [0.75], 0.01)[0]
            iqr = q3 - q1
            
            # Define outlier bounds
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            # Filter out outliers
            dataframe = dataframe.filter(
                (col(col_name) >= lower_bound) & 
                (col(col_name) <= upper_bound)
            )
        return dataframe
    
    df = remove_outliers(df, numeric_cols)
    
    # 3.3 Feature Engineering
    # Parse Date column
    df = df.withColumn("Date", expr("to_date(Date, 'yyyy-MM-dd')"))
    
    # Extract month and year from Date
    df = df.withColumn("Month", month(col("Date")))
    df = df.withColumn("Year", year(col("Date")))
    
    return df

In [56]:
def preprocess_data(df):
    """
    Preprocess the data:
    1. Handle missing values
    2. Detect and handle outliers
    3. Feature engineering
    
    Args:
        df (Spark DataFrame): Input DataFrame
    
    Returns:
        Preprocessed Spark DataFrame
    """
    from pyspark.sql.functions import col, percentile_approx, lit, expr

    # List of numeric columns
    numeric_cols = ['Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 
                    'N', 'P', 'K', 'Soil_Quality']

    # Handle missing values for numeric columns by replacing with median
    for col_name in numeric_cols:
        median_val = df.select(percentile_approx(col(col_name), 0.5)).first()[0]
        if median_val is not None:  # Fill only if median is computed
            df = df.na.fill({col_name: median_val})

    # Categorical columns (fill missing with a placeholder or default value)
    df = df.na.fill({"Crop_Type": "Unknown", "Soil_Type": "Unknown"})

    # Handle missing values for Date (fill with a default date if needed)
    df = df.withColumn("Date", expr("nvl(Date, '2000-01-01')"))

    # Remove outliers using IQR for numeric columns
    def remove_outliers(dataframe, columns):
        for col_name in columns:
            q1, q3 = dataframe.approxQuantile(col_name, [0.25, 0.75], 0.01)
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            dataframe = dataframe.filter((col(col_name) >= lower_bound) & 
                                         (col(col_name) <= upper_bound))
        return dataframe

    df = remove_outliers(df, numeric_cols)

    # Feature Engineering: Convert Date and extract Month, Year
    df = df.withColumn("Date", expr("to_date(Date, 'yyyy-MM-dd')"))
    df = df.withColumn("Month", month(col("Date")))
    df = df.withColumn("Year", year(col("Date")))

    return df


In [57]:
# Step 4: Feature Selection and Correlation Analysis
def feature_correlation(df):
    """
    Visualize feature correlations
    
    Args:
        df (Spark DataFrame): Input DataFrame
    """
    # Convert to Pandas for correlation visualization
    pandas_df = df.toPandas()
    
    # Select numeric columns
    numeric_cols = ['Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 
                    'N', 'P', 'K', 'Soil_Quality', 'Crop_Yield']
    
    # Create correlation matrix
    correlation_matrix = pandas_df[numeric_cols].corr()
    
    # Visualize correlation matrix
    plt.figure(figsize=(12, 10))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', linewidths=0.5)
    plt.title('Feature Correlation Matrix')
    plt.tight_layout()
    plt.savefig('correlation_matrix.png')
    plt.close()

In [58]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

def prepare_and_train_model(df):
    """
    Prepare data for training and train the model.
    
    Args:
        df (Spark DataFrame): Preprocessed DataFrame
    
    Returns:
        dict: Contains the trained model and test data
    """
    # Validate data
    required_columns = [
        'Crop_Type', 'Soil_Type', 'Soil_pH', 'Temperature', 
        'Humidity', 'Wind_Speed', 'N', 'P', 'K', 'Crop_Yield'
    ]
    df = df.dropna(subset=required_columns)
    
    # Convert data types
    numeric_columns = ['Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 'N', 'P', 'K', 'Crop_Yield']
    for col_name in numeric_columns:
        df = df.withColumn(col_name, col(col_name).cast("double"))
    categorical_columns = ['Crop_Type', 'Soil_Type']
    for col_name in categorical_columns:
        df = df.withColumn(col_name, col(col_name).cast("string"))

    # Categorical encoding
    indexers = [
        StringIndexer(inputCol=column, outputCol=f"{column}_indexed", handleInvalid="keep")
        for column in ["Crop_Type", "Soil_Type"]
    ]

    # Define feature columns
    feature_columns = [
        'Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 
        'N', 'P', 'K', 'Crop_Type_indexed', 'Soil_Type_indexed'
    ]

    # Assemble features
    assembler = VectorAssembler(
        inputCols=feature_columns, 
        outputCol="features", 
        handleInvalid="skip"
    )

    # Scale features
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

    # Random Forest Regressor
    rf = RandomForestRegressor(
        featuresCol="scaled_features", 
        labelCol="Crop_Yield", 
        maxDepth=10, 
        numTrees=50
    )

    # Build pipeline
    pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

    # Split data
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

    # Ensure training data is not empty
    if train_data.count() == 0:
        raise ValueError("Training data is empty. Check the input DataFrame or split ratio.")

    # Train the model
    model = pipeline.fit(train_data)

    return {"model": model, "test_data": test_data}


In [60]:
df = load_data("C:/Users/rabhi/Downloads/big_data/crop-yield-dataset.csv")



Original Column Names: ['Date', 'Crop_Type', 'Soil_Type', 'Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 'N', 'P', 'K', 'Crop_Yield', 'Soil_Quality']
Cleaned Column Names: ['Date', 'Crop_Type', 'Soil_Type', 'Soil_pH', 'Temperature', 'Humidity', 'Wind_Speed', 'N', 'P', 'K', 'Crop_Yield', 'Soil_Quality']


In [61]:
df.show(5)

+----------+---------+---------+-------+------------------+-----------------+------------------+-----------------+----+----+------------------+------------------+
|      Date|Crop_Type|Soil_Type|Soil_pH|       Temperature|         Humidity|        Wind_Speed|                N|   P|   K|        Crop_Yield|      Soil_Quality|
+----------+---------+---------+-------+------------------+-----------------+------------------+-----------------+----+----+------------------+------------------+
|01/01/2014|    Wheat|    Peaty|    5.5| 9.440599409765397|             80.0| 10.95670655406815|60.50000000000001|45.0|31.5|               0.0|22.833333333333332|
|01/01/2014|     Corn|    Loamy|    6.5|20.052576424032633|79.94742357596736| 8.591576842195144|             84.0|66.0|50.0|104.87131032861858| 66.66666666666667|
|01/01/2014|     Rice|    Peaty|    5.5|12.143099171229291|             80.0| 7.227751486758685|             71.5|54.0|38.5|               0.0|27.333333333333332|
|01/01/2014|   Barley|

In [62]:
preprocessed_df = preprocess_data2(df)

In [63]:
# Correlation Analysis
feature_correlation(preprocessed_df)

In [24]:
# Prepare Data for Training
train_data, test_data, pipeline = prepare_model_data2(preprocessed_df)

NameError: name 'prepare_model_data2' is not defined

In [64]:
# Prepare Data for Training
result = prepare_and_train_model(preprocessed_df)

# Extract the training data, test data, and pipeline from the result
train_data = result["model"].stages[0].transform(preprocessed_df)  # Get the trained model's stage
test_data = result["test_data"]
pipeline = result["model"]


In [65]:
model, test_data = prepare_and_train_model(preprocessed_df)

In [66]:
result = prepare_and_train_model(preprocessed_df)
model = result["model"]
test_data = result["test_data"]


In [67]:
# Make predictions on the test data
predictions = model.transform(test_data)

In [68]:
# Initialize evaluator for R2
evaluator_r2 = RegressionEvaluator(
    labelCol="Crop_Yield", 
    predictionCol="prediction", 
    metricName="r2"
)

# Calculate R2
r2 = evaluator_r2.evaluate(predictions)

# Print R2
print(f"R2: {r2}")

R2: 0.9526619630853745


In [69]:
import os
# Define the model path (use raw string or forward slashes)
model_path ="C:/big_data/Demeter_Prediction_Model"  # Raw string for Windows paths

# Check if directory exists, if not create it
if not os.path.exists(model_path):
    os.makedirs(model_path)

# Save the model with overwrite option
try:
    model.write().overwrite().save(model_path)
    print(f"Model successfully saved to {model_path}")
except Exception as e:
    print(f"Error saving model: {str(e)}")


Error saving model: An error occurred while calling o4404.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RD

In [70]:
import os

# Définir le chemin du modèle (utilisez des chaînes brutes ou des slashs pour les chemins Windows)
model_path = "C:/big_data/Demeter_Prediction_Model"

# Vérifier si le répertoire existe, sinon le créer
if not os.path.exists(model_path):
    os.makedirs(model_path)

# Sauvegarder le modèle
try:
    model.save(model_path)  # Sauvegarde du modèle
    print(f"Model successfully saved to {model_path}")
except Exception as e:
    print(f"Error saving model: {str(e)}")


Error saving model: An error occurred while calling o4529.save.
: java.io.IOException: Path C:/big_data/Demeter_Prediction_Model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:174)
	at org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:169)
	at org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:42)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$3(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$an

In [33]:
model_path = r"C:\Users\rabhi\Downloads\big_data\Demeter_Prediction_Model"


In [34]:
import os

# Check if directory exists, if not create it
if not os.path.exists(model_path):
    os.makedirs(model_path)


In [44]:
model_path = "C:/big_data/Demeter_Prediction_Model"

In [45]:
if not os.path.exists(model_path):
    os.makedirs(model_path)

In [47]:
try:
    model.write().overwrite().save(model_path)
    print(f"Model successfully saved to {model_path}")
except Exception as e:
    print(f"Error saving model: {str(e)}")

Error saving model: An error occurred while calling o2842.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RD