In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WeatherPrediction").getOrCreate()

In [None]:
data = spark.read.csv("/content/Weather Data2.csv", header=True, inferSchema=True)
data.show(10)

In [None]:
data = data.drop("Date/Time")
data.show()

In [None]:
data.groupBy('Weather').count().show()

In [None]:
#Converting categorical values to integer values (encoding)
from pyspark.ml.feature import StringIndexer
stringIndex = StringIndexer(inputCols=['Weather'],
                       outputCols=['Weather_new'])

stringIndex_model = stringIndex.fit(data)

data = stringIndex_model.transform(data)
data.show(100)

In [None]:
data=data.drop('Weather')
data.show(5)

In [None]:
colNames = data.columns
colNames
for col in data.columns:
    print(col.ljust(15), data.filter(data[col].isNull()).count())

In [None]:
from pyspark import SparkFiles
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors

In [None]:
assembler = VectorAssembler(
    inputCols=["Temp_C", "Dew Point Temp_C", "Rel Hum_%", "Wind Speed_km/h", "Visibility_km", "Press_kPa"],
    outputCol="features_")

data_for_lin_reg = assembler.transform(data)
final_data = data_for_lin_reg.select("features_", "Weather_new")

train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [None]:
lr = LinearRegression(featuresCol="features_", labelCol="Weather_new", predictionCol="predicted_")
lr_model = lr.fit(train_data)
result = lr_model.evaluate(train_data)
print(result.r2)

In [None]:
from pyspark import SparkFiles
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
df=data.withColumnRenamed('Weather_new','Weather_new-1')

In [None]:
assembler = VectorAssembler(
    inputCols=["Temp_C", "Dew Point Temp_C", "Rel Hum_%", "Wind Speed_km/h", "Visibility_km", "Press_kPa"],
    outputCol="features")
data = assembler.transform(df)
final_data = data.select("features", "Weather_new-1")
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
print('Total train data - ',train_data.count())
train_data.show(5)
train_data.groupBy('Weather_new-1').count().show()

In [None]:
print('Total test data - ',test_data.count())
test_data.show(5)
test_data.groupBy('Weather_new-1').count().show()

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="Weather_new-1")
lr_model = lr.fit(train_data)
result = lr_model.evaluate(train_data)
print(result.r2)

In [None]:
unleveled_data = test_data.select('features')
unleveled_data.show(5)

In [None]:
predictions = lr_model.transform(unleveled_data)
predictions.show()

In [None]:
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="Weather_new-1", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_mse = RegressionEvaluator(labelCol="Weather_new-1", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print("MSE on test data: {:.3f}".format(mse))

In [None]:
coefficients = lr_model.coefficients
intercept = lr_model.intercept

print("Coefficients: ", coefficients)
print("Intercept: {:.3f}".format(intercept))

Logistic Regression

In [None]:
#from pyspark.ml.classification import LogisticRegression
#log_reg = LogisticRegression(labelCol='Weather_new-1').fit(train_data)

In [None]:
#result = log_reg.evaluate(test_data).predictions
#result.show(40)

In [None]:
#result.select('Weather_new-1','prediction').show(40)

DecisionTreeClassifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="Weather_new-1")

dt_model = dt.fit(train_data)
pred = dt_model.transform(test_data)
evaluator.evaluate(pred)

In [None]:
Temp = float(input("Temp_C: "))
Dew_Point = float(input("Dew Point Temp_C: "))
Relative_Humidity = float(input("Rel Hum_%: "))
Wind_Speed = float(input("Wind Speed_km/h: "))
Visibility = float(input("Visibility_km: "))
Pressure = float(input("Press_kPa: "))
# -22.8,  -28.0,  62,  9, 25.0, 102.37
# -21.2,   -26.8,  61, 11, 25.0, 101.81
single_row_data = [(Temp,Dew_Point,Relative_Humidity,Wind_Speed,Visibility,Pressure)]
single_row_df = spark.createDataFrame(single_row_data, ["Temp_C", "Dew Point Temp_C", "Rel Hum_%", "Wind Speed_km/h", "Visibility_km", "Press_kPa"])

# Use the same VectorAssembler to transform the single row
single_row_df = assembler.transform(single_row_df)

# Make a prediction on the single row
prediction = log_reg.transform(single_row_df)

# Show the prediction
prediction.select("features", "prediction").show()