In [None]:
!pip install pyspark
!pip install pyngrok

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("CropYieldAnalysis").config("spark.ui.port","4050").getOrCreate()

In [None]:

# Load the CSV file
df = spark.read.csv("uncleaned_crop_yield.csv", header=True, inferSchema=False)
# Display initial data
print("Initial Data:")
df.show(5)

In [None]:
# Drop duplicate rows
df = df.dropDuplicates()

# Display schema and information about the DataFrame
print("DataFrame Info:")
df.printSchema()

# Show a summary of the DataFrame (similar to .info() in Pandas)
print("Data Summary:")
df.describe().show()

In [None]:
# Handle missing values
# Fill numeric columns with their mean
from pyspark.sql.functions import col
numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ("int", "double")]
for col_name in numeric_columns:
    mean_value = df.select(col_name).groupBy().avg(col_name).first()[0]
    df = df.fillna({col_name: mean_value})

# Fill categorical columns with a placeholder
categorical_columns = [col_name for col_name, dtype in df.dtypes if dtype == "string"]
df = df.fillna({col_name: "Unknown" for col_name in categorical_columns})

# Verify missing values
print("Missing values after filling:")
df.select([col(c).isNull().alias(c) for c in df.columns]).show()

In [None]:
from pyspark.sql.functions import trim

# Strip whitespace from string columns
for col_name in categorical_columns:
    df = df.withColumn(col_name, trim(col(col_name)))

# Display the DataFrame to verify the result
print("DataFrame after trimming whitespace from string columns:")
df.show(truncate=False)


In [None]:
from pyspark.sql.functions import col, when, upper, isnan

# Assume `df` is the PySpark DataFrame

# Convert all string columns to uppercase
string_columns = [col_name for col_name, dtype in df.dtypes if dtype == "string"]
for col_name in string_columns:
    df = df.withColumn(col_name, upper(col(col_name)))

# Display a sample of the data after converting strings to uppercase
print("Sample data after converting strings to uppercase:")
df.show(5, truncate=False)

# Count null values for all columns after the transformation
null_counts_after_upper = (
    df.select([when(col(c).isNull() | isnan(c), 1).alias(c) for c in df.columns])
    .groupBy()
    .sum()
)

print("Null counts per column after converting strings to uppercase:")
null_counts_after_upper.show()

# Count the number of rows after the transformation
row_count_after_upper = df.count()
print(f"Number of rows after converting strings to uppercase: {row_count_after_upper}")

# Collect and display a sample of the processed data
processed_data_after_upper = df.take(5)  # Collect the first 5 rows
for row in processed_data_after_upper:
    print(row)


In [None]:
from pyspark.sql.types import StringType

# Convert 'Region' column to StringType (already default for strings in PySpark)
df = df.withColumn("Region", col("Region").cast(StringType()))

# Convert 'Crop' column to StringType
df = df.withColumn("Crop", col("Crop").cast(StringType()))

# Display schema to confirm the datatype
df.printSchema()

# Show the updated DataFrame
df.select("Region", "Crop").show()

In [None]:
from pyspark.sql.functions import round

# Round the 'Rainfall_mm' column to 2 decimal places
df = df.withColumn("Rainfall_mm", round(col("Rainfall_mm"), 2))

# Display the updated DataFrame
df.show(5)

In [None]:
from pyspark.sql.functions import col

# Identify numeric columns
numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ("int", "double")]

# Loop through numeric columns to filter outliers
for col_name in numeric_columns:
    # Calculate Q1 and Q3 using approxQuantile
    Q1, Q3 = df.approxQuantile(col_name, [0.25, 0.75], relativeError=0.01)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Filter rows within the bounds
    df = df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))

# Show the DataFrame information
print("Filtered DataFrame:")
df.printSchema()
df.show()


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

# Define categorical columns to encode
categorical_columns = ['Region', 'Soil_Type', 'Crop', 'Weather_Condition', 'Irrigation_Used', 'Fertilizer_Used']

# Step 1: Remove existing index/encoded columns if present
columns_to_remove = [f"{col}_Index" for col in categorical_columns] + [f"{col}_Encoded" for col in categorical_columns]
df = df.drop(*[c for c in columns_to_remove if c in df.columns])

# Step 2: Apply StringIndexer and OneHotEncoder
indexers = [StringIndexer(inputCol=col_name, outputCol=f"{col_name}_Index") for col_name in categorical_columns]
encoders = [OneHotEncoder(inputCol=f"{col_name}_Index", outputCol=f"{col_name}_Encoded", dropLast=False) for col_name in categorical_columns]

# Combine into a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the DataFrame
pipeline_model = pipeline.fit(df)
encoded_df = pipeline_model.transform(df)

# Step 3: Convert OneHotEncoded vectors to separate binary columns
def extract_dense_vector(vector, index):
    return float(vector[index]) if vector is not None else 0.0

for col_name in categorical_columns:
    num_categories = encoded_df.select(f"{col_name}_Encoded").first()[0].size  # Get number of categories
    for i in range(num_categories):
        extract_udf = udf(lambda vector: extract_dense_vector(vector, i), FloatType())
        encoded_df = encoded_df.withColumn(f"{col_name}_{i}", extract_udf(col(f"{col_name}_Encoded")))

# Step 4: Drop original, indexed, and encoded columns
columns_to_drop = categorical_columns + [f"{col}_Index" for col in categorical_columns] + [f"{col}_Encoded" for col in categorical_columns]
encoded_df = encoded_df.drop(*columns_to_drop)

# Show the final DataFrame
encoded_df.show(truncate=False)
encoded_df.printSchema()


In [None]:
# Define the columns to scale
columns_to_convert = [ "Days_to_Harvest", "Yield_tons_per_hectare"]

# Step 1: Replace null values with 0.0 and cast to double
for col_name in columns_to_convert:
    encoded_df = encoded_df.withColumn(col_name,
                                       when(col(col_name).isNull(), 0.0).otherwise(col(col_name).cast("double")))


In [None]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, when

# Define the columns to scale
columns_to_scale = ["Rainfall_mm", "Temperature_Celsius"]

# Step 1: Replace null values with 0.0 and cast to double
for col_name in columns_to_scale:
    encoded_df = encoded_df.withColumn(col_name,
                                       when(col(col_name).isNull(), 0.0).otherwise(col(col_name).cast("double")))

# Step 2: Create a VectorAssembler to combine the columns into a single vector
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="features_to_scale")

# Transform the DataFrame to add the 'features_to_scale' column
df = assembler.transform(encoded_df)

# Step 3: Initialize the MinMaxScaler
scaler = MinMaxScaler(inputCol="features_to_scale", outputCol="scaled_features")

# Step 4: Fit and transform the scaler to scale the features
scaler_model = scaler.fit(df)
scaled_df = scaler_model.transform(df)

# Step 5: Convert the vector column to an array to extract individual elements
scaled_df = scaled_df.withColumn("scaled_features_array", vector_to_array(col("scaled_features")))

# Step 6: Extract the scaled features back into individual columns
for i, col_name in enumerate(columns_to_scale):
    scaled_df = scaled_df.withColumn(col_name + "_scaled", col("scaled_features_array")[i])

# Step 7: Drop intermediate columns (optional)
scaled_df = scaled_df.drop("features_to_scale", "scaled_features", "scaled_features_array", "Rainfall_mm", "Temperature_Celsius")

# Show the resulting DataFrame
scaled_df.show(truncate=False)

# Print schema to verify new scaled columns
scaled_df.printSchema()


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window_spec = Window.partitionBy("Region_0").orderBy(F.desc("Yield_tons_per_hectare"))
df = scaled_df.withColumn("Rank", F.rank().over(window_spec))
df.show()


Part 2

In [None]:
from pyspark.sql.functions import col

# Separate features (X) by dropping the target column
X = scaled_df.drop("Yield_tons_per_hectare")

# Select the target column (Y)
Y = scaled_df.select("Yield_tons_per_hectare")

# Show summary statistics for features
print("Feature Summary:")
X.describe().show()

# Show summary statistics for the target variable
print("Target Summary:")
Y.describe().show()

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Step 1: Combine features into a single vector column
feature_columns = [col for col in scaled_df.columns if col != "Yield_tons_per_hectare"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
dataset = assembler.transform(scaled_df).select("features", col("Yield_tons_per_hectare").alias("label"))

# Step 2: Split the dataset into training and testing sets (with features and label together)
train_df, test_df = dataset.randomSplit([0.8, 0.2], seed=42)

# Step 3: Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df).select("scaled_features", "label")
test_df = scaler_model.transform(test_df).select("scaled_features", "label")

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
linear_reg = LinearRegression(featuresCol="scaled_features", labelCol="label", predictionCol="prediction")
linear_model = linear_reg.fit(train_df)

# Step 5: Make predictions on the test data
predictions = linear_model.transform(test_df)

# Step 6: Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Output results
print(f"Mean Squared Error (MSE): {mse}")
print(f"R Squared (R²): {r2}")