In [4]:
import pyspark
print(pyspark.__version__)

3.5.4


In [5]:
! python --version

Python 3.7.4


In [1]:
#spark packages
from pyspark import sql, SparkConf, SparkContext
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

In [None]:
#initialize spark session
spark_session = sql.SparkSession.builder.appName("HDFS").getOrCreate()
spark_context = SparkContext.getOrCreate(SparkConf().setAppName("HDFS"))
logs = spark_context.setLogLevel("ERROR")
print("Spark session initialize")

In [None]:
#connection parameters for spark to Amazon S3
spark_session._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAUGQ637RYKQTW47FH")
spark_session._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "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")
spark_session._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark_session._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark_session._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark_session._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-west-1.amazonaws.com")
print("Connection to S3 Completed")

In [None]:
#code to load and display dataset from S3 using spark session object
dataset = spark_session.read.csv(r"C:\Users\sai ganesh\OneDrive\Desktop\project\yield_df.csv", inferSchema=True, header=True)
dataset.show()

In [None]:
#describing dataset with details like count, mean, standard deviation of each dataset attributes
dataset.toPandas().describe()

In [None]:
#visualizing distribution of numerical data
dataset.toPandas().hist(figsize=(10, 8))
plt.title("Histogram distribution of dataset values")
plt.show()

In [None]:
#graph of different countries found in dataset for making crop yield
from pyspark.sql import functions
areas = dataset.select('Area').filter(functions.col('Area').isNotNull()).toPandas().values.ravel()
names, count = np.unique(areas, return_counts = True)
height = count
bars = names
y_pos = np.arange(len(bars))
plt.figure(figsize = (14, 3)) 
plt.bar(y_pos, height)
plt.xticks(y_pos, bars)
plt.xlabel("Different Area Graph for Crop Yield")
plt.ylabel("Count")
plt.xticks(rotation=90)
plt.show()

In [None]:
#visualizing graph of different crops found in dataset
from pyspark.sql import SQLContext
sql = SQLContext(spark_session)
#converting spark dataframe to spark sql query
dataset.registerTempTable("crop")
df = sql.sql("SELECT Item from crop")
df = df.toPandas()
unique, count = np.unique(df['Item'], return_counts=True)
values = []
for i in range(len(unique)):
    values.append([unique[i], count[i]])
values = pd.DataFrame(values, columns = ['Crop', 'Count'])   
plt.figure(figsize=(8,3))
sns.barplot(x='Crop',y='Count', data=values)
plt.title('Most Common Crop Yield by Different Countries')
plt.xticks(rotation=90)
plt.show()

In [None]:
#query to visualize different yield of crop by different countries
df = sql.sql("SELECT Area, Item, yield from crop")
df = df.toPandas()
data = df.groupby(['Item', 'Area'])['yield'].sum().sort_values(ascending=False).nlargest(30).reset_index()
sns.catplot(x="Item", y="yield", hue='Area', data=data, kind='point')
plt.title("Crop Yield Graphs of Different Countries")
plt.xticks(rotation=90)
plt.show()

In [None]:
#graph of Top 20 highest average rainfall area wise 
df = sql.sql("SELECT Area, average_rain_fall_mm_per_year from crop")
df = df.toPandas()
df = df.groupby('Area')['average_rain_fall_mm_per_year'].mean().sort_values(ascending=False).nlargest(20).reset_index()
plt.figure(figsize=(8,4))
plt.plot(df['Area'], df['average_rain_fall_mm_per_year'])
plt.title("Top 20 Area Wise Average Rainfall Graph")
plt.xticks(rotation=90)
plt.show()


In [None]:
#graph of Top 20 highest area wise pesticides consumption 
df = sql.sql("SELECT Area, pesticides_tonnes from crop")
df = df.toPandas()
df = df.groupby('Area')['pesticides_tonnes'].mean().sort_values(ascending=False).nlargest(20).reset_index()
plt.figure(figsize=(8,4))
plt.plot(df['Area'], df['pesticides_tonnes'])
plt.title("Top 20 Area Wise Average Pesticides Consumption Graph")
plt.xticks(rotation=90)
plt.show()

In [None]:
#split dataset into train and test where application using 80% dataset for training and 20% for testing
#extracting train and test features from dataset
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CropYieldPrediction").getOrCreate()
#extracing data for selected country and crop and in above line we are getting all maize yield from India
df = spark.sql("SELECT * from crop where Area='India' and Item='Potatoes'")#==============
#converting Area and Item column from string to numeric vector
indexer = StringIndexer(inputCol="Area", outputCol="AreaEncode")
encoder = indexer.fit(df)
df = encoder.transform(df)
#converting Area and Item column from string to numeric vector
indexer = StringIndexer(inputCol="Item", outputCol="ItemEncode")
encoder = indexer.fit(df)
df = encoder.transform(df)
#giving required features for training to select
requiredColumns = ['Year', 'yield', 'average_rain_fall_mm_per_year', 'pesticides_tonnes','avg_temp', 'AreaEncode', 'ItemEncode']
vec_assembler = VectorAssembler(inputCols=requiredColumns, outputCol='train',handleInvalid="skip")
transformed = vec_assembler.transform(df)
indexer = StringIndexer(inputCol="yield",outputCol="predict",handleInvalid="skip")
transformed = indexer.fit(transformed).transform(transformed)
#normalizing extracted crop features
scaler = MinMaxScaler(inputCol="train", outputCol="scaled_train")
transformed = scaler.fit(transformed).transform(transformed)
#splitting dataset into train and test
(X_train, X_test) = transformed.randomSplit([0.8, 0.2])
print("80% dataset for training : "+str(X_train.count()))
print("20% dataset for testing  : "+str(X_test.count()))

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create Spark Session
spark = SparkSession.builder.appName("CropYieldPrediction").getOrCreate()

# Load and preprocess data (similar to your previous code)
# ... (Your code for loading and preprocessing data goes here) ...

# Define features for training
requiredColumns = ['Year', 'average_rain_fall_mm_per_year', 'pesticides_tonnes', 'avg_temp', 'AreaEncode', 'ItemEncode']
vec_assembler = VectorAssembler(inputCols=requiredColumns, outputCol='features', handleInvalid="skip")
transformed = vec_assembler.transform(df)

# Split dataset into train and test
train, test = transformed.randomSplit([0.8, 0.2])

# Define models
lr = LinearRegression(featuresCol='features', labelCol='yield')
dt = DecisionTreeRegressor(featuresCol='features', labelCol='yield')


# Train models
lr_model = lr.fit(train)
dt_model = dt.fit(train)


# Make predictions
lr_predictions = lr_model.transform(test)
dt_predictions = dt_model.transform(test)


# Evaluate models
evaluator = RegressionEvaluator(labelCol="yield", predictionCol="prediction", metricName="rmse")

lr_rmse = evaluator.evaluate(lr_predictions)
dt_rmse = evaluator.evaluate(dt_predictions)


# Print RMSE values
print("Linear Regression RMSE:", lr_rmse)
print("Decision Tree RMSE:", dt_rmse)


# Select the best model based on RMSE
best_model = min([(lr_rmse, 'Linear Regression'), (dt_rmse, 'Decision Tree')], key=lambda x: x[0])[1]
print("Best Model:", best_model)



In [None]:
# Get true and predicted yield values for Decision Tree and Linear Regression
true_yield_dt = dt_predictions.select("yield").collect()
pred_yield_dt = dt_predictions.select("prediction").collect()

true_yield_lr = lr_predictions.select("yield").collect()
pred_yield_lr = lr_predictions.select("prediction").collect()

# Convert lists to numpy arrays
true_yield_dt = np.array([row['yield'] for row in true_yield_dt])
pred_yield_dt = np.array([row['prediction'] for row in pred_yield_dt])

true_yield_lr = np.array([row['yield'] for row in true_yield_lr])
pred_yield_lr = np.array([row['prediction'] for row in pred_yield_lr])

# Print first 20 values for Decision Tree
print("Decision Tree:")
for i in range(20):
    print(f"True Yield = {true_yield_dt[i]}, Predicted Yield = {pred_yield_dt[i]}")

# Print first 20 values for Linear Regression
print("Linear Regression:")
for i in range(20):
    print(f"True Yield = {true_yield_lr[i]}, Predicted Yield = {pred_yield_lr[i]}")

In [None]:
import matplotlib.pyplot as plt

# Assuming you have the true_yield_dt and pred_yield_dt arrays

# Create the plot
plt.figure(figsize=(8, 6))  # Adjust figure size as needed

# Plot the true yield
plt.plot(true_yield_dt, color='red', label='Original Crop Yield')

# Plot the predicted yield
plt.plot(pred_yield_dt, color='green', label='Decision Tree Crop Yield')

# Set labels and title
plt.xlabel('Test Data')
plt.ylabel('Crop Yield')
plt.title('Decision Tree True & Predicted Crop Yield Graph')

# Add legend
plt.legend()

# Show the plot
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Assuming you have the true_yield_lr and pred_yield_lr arrays 

# Create the plot
plt.figure(figsize=(8, 6))  # Adjust figure size as needed

# Plot the true yield
plt.plot(true_yield_lr, color='red', label='Original Crop Yield') 

# Plot the predicted yield
plt.plot(pred_yield_lr, color='green', label='Linear Regression Crop Yield')

# Set labels and title
plt.xlabel('Test Data')
plt.ylabel('Crop Yield')
plt.title('Linear Regression True & Predicted Crop Yield Graph')

# Add legend
plt.legend()

# Show the plot
plt.show()

In [None]:
import matplotlib.pyplot as plt


# Data for the bar chart
algorithms = ['Decision Tree RMSE', 'Linear Regression RMSE']
rmse_values = [dt_rmse, lr_rmse]

# Create the bar chart
plt.figure(figsize=(6, 4))  # Adjust figure size if needed
plt.bar(algorithms, rmse_values)

# Set labels and title
plt.xlabel('Algorithm Names')
plt.ylabel('RMSE')
plt.title('RMSE Comparison Graph')

# Show the plot
plt.show()

In [None]:
import matplotlib.pyplot as plt

# ... (Your existing code for training and prediction) ...

# Get true and predicted yield values
true_yield_dt = dt_predictions.select("yield").collect()
pred_yield_dt = dt_predictions.select("prediction").collect()

true_yield_lr = lr_predictions.select("yield").collect()
pred_yield_lr = lr_predictions.select("prediction").collect()

# Convert lists to numpy arrays
true_yield_dt = np.array([row['yield'] for row in true_yield_dt])
pred_yield_dt = np.array([row['prediction'] for row in pred_yield_dt])

true_yield_lr = np.array([row['yield'] for row in true_yield_lr])
pred_yield_lr = np.array([row['prediction'] for row in pred_yield_lr])

# Create scatter plots
plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.scatter(true_yield_dt, pred_yield_dt, color='blue', alpha=0.5)
plt.xlabel("True Yield (Decision Tree)")
plt.ylabel("Predicted Yield (Decision Tree)")
plt.title("Decision Tree: True vs. Predicted Yield")

# plt.subplot(1, 2, 2)
# plt.scatter(true_yield_lr, pred_yield_lr, color='red', alpha=0.5)
# plt.xlabel("True Yield (Linear Regression)")
# plt.ylabel("Predicted Yield (Linear Regression)")
# plt.title("Linear Regression: True vs. Predicted Yield")

plt.legend()
plt.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator



#creating object of decision tree algorithm
dt = DecisionTreeRegressor(featuresCol="scaled_train", labelCol = 'predict')
#training DT on selected train data
dt_model = dt.fit(X_train)
#perform prediction on test data
predict = dt_model.transform(X_test)
#collect original crop yield
true = predict.select('predict').collect()
#collect predicted crop yield
pred = predict.select('prediction').collect()
#calculate decision tree performance using RMSE meric 
evaluator = RegressionEvaluator(labelCol="predict", predictionCol="prediction", metricName="rmse")
rmse_error = evaluator.evaluate(predict)
print("Decision Tree RMSE = "+str(rmse_error)+"\n")
#plot graph of true and predicted crop yield
trueYield = [true[i]['predict'] * 100 for i in range(len(true))]
predictedYield = [pred[i]['prediction'] * 100 for i in range(len(pred))]

plt.plot(trueYield, color='red', label='Original Crop Yield')
plt.plot(predictedYield, color='green', label='Decision Tree Crop Yield')
plt.title('Decision Tree True & Predicted Crop Yield Graph')
plt.xlabel('Test Data')
plt.ylabel('Crop Yield')
plt.legend()
plt.show()

In [None]:
from pyspark.ml.regression import LinearRegression
#training linear regression on train features of crop yield dataset
lr = LinearRegression(featuresCol="scaled_train", labelCol = 'predict')
lr_model = lr.fit(X_train)
predict = lr_model.transform(X_test)
#collect original crop yield
true = predict.select(['predict']).collect()
#collect predicted crop yield
pred = predict.select(['prediction']).collect()
#calculate linear regression performance using RMSE meric 
evaluator = RegressionEvaluator(labelCol="predict", predictionCol="prediction", metricName="rmse")
lr_rmse_error = evaluator.evaluate(predict)
print("Linear Regression RMSE = "+str(lr_rmse_error)+"\n")
#plot graph of true and predicted crop yield
trueYield = []
predictedYield = []
for i in range(0, 100): 
    trueYield.append(true[i].predict*100)
for i in range(0, 100): 
    predictedYield.append(pred[i].prediction*100)
for i in range(0, 20):
    print("True Yield = "+str(trueYield[i])+" Linear Regression Predicted Yield = "+str(predictedYield[i]))
plt.plot(trueYield, color = 'red', label = 'Original Crop Yield')
plt.plot(predictedYield, color = 'green', label = 'Linear Regression Crop Yield')
plt.title('Linear Regression True & Predicted Crop Yield Graph')
plt.xlabel('Test Data')
plt.ylabel('Crop Yield')
plt.legend()
plt.show()      

In [None]:
#RMSE comaprison Graph
height = [rmse, lr_rmse_error]
bars = ['Decision Tree TMSE', 'Linear Regression RMSE']
y_pos = np.arange(len(bars))
plt.figure(figsize = (4, 3))
plt.bar(y_pos, height)
plt.xticks(y_pos, bars)
plt.xlabel("Algorithm Names")
plt.ylabel("RMSE")
plt.title("RMSE Comparison Graph")
plt.show()