<a href="https://colab.research.google.com/github/ab2gbl/Master2/blob/main/BDPA/TP/BDPA_tp4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!unzip ./weatherHistory.csv.zip

In [None]:
import pandas as pd
import numpy as np

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WeatherPrediction").getOrCreate()

file_path = "./weatherHistory.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()


In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Summary", outputCol="SummaryIndex")
df = indexer.fit(df).transform(df)

indexer2 = StringIndexer(inputCol="Precip Type", outputCol="PrecipTypeIndex")
df = indexer2.fit(df).transform(df)

indexer3 = StringIndexer(inputCol="Daily Summary", outputCol="DailySummaryIndex")
df = indexer3.fit(df).transform(df)

df.printSchema()


In [None]:
df.dtypes

In [None]:
from pyspark.sql.functions import col

columns_to_cast = ['Temperature (C)', 'Apparent Temperature (C)', 'Humidity', 'Wind Speed (km/h)',
                   'Wind Bearing (degrees)', 'Visibility (km)', 'Loud Cover', 'Pressure (millibars)']

for col_name in columns_to_cast:
    df = df.withColumn(col_name, col(col_name).cast("float"))


In [None]:
from pyspark.ml.feature import VectorAssembler

feature_columns = [ 'Humidity', 'Wind Speed (km/h)',
                   'Wind Bearing (degrees)', 'Visibility (km)', 'Loud Cover', 'Pressure (millibars)',
                   'SummaryIndex', 'PrecipTypeIndex']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="weather_features")

df = assembler.transform(df)



In [None]:
final_df = df.select("weather_features", "Temperature (C)")
final_df.show(5)


# Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = final_df.randomSplit([0.7, 0.3], seed=42)

lr = LinearRegression(featuresCol="weather_features", labelCol="Temperature (C)")

model = lr.fit(train_data)

print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")


Coefficients: [-26.679999420390136,-0.19812815348079615,0.0021767120057714045,0.17144397880874138,0.0,-0.0010110169516249537,-0.20270512656588377,-10.584678461460065]
Intercept: 34.088073550271496


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="Temperature (C)", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print(f"R2 Metric: {r2}")


R2 Metric: 0.5828526417727735


# optimise

In [None]:
from pyspark.ml.regression import RandomForestRegressor,GBTRegressor

rf = RandomForestRegressor(featuresCol="weather_features", labelCol="Temperature (C)")
rf_model = rf.fit(train_data)
print(rf_model.featureImportances)


(8,[0,1,2,3,5,6,7],[0.43153404868943895,0.012017932554312409,0.0007169956648431816,0.13930405072834864,0.07473878217102092,0.04444931249705128,0.2972388776949844])


In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

scaler = StandardScaler(inputCol="weather_features", outputCol="scaled_features", withStd=True, withMean=True)

scaled_df = scaler.fit(final_df).transform(final_df)

train_data, test_data = scaled_df.randomSplit([0.7, 0.3], seed=42)

gbt = GBTRegressor(featuresCol="scaled_features", labelCol="Temperature (C)", maxIter=50)
gbt_model = gbt.fit(train_data)

test_predictions = gbt_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="Temperature (C)", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(test_predictions)

print(f"R2 after scaling and using GBT: {r2}")


R2 after scaling and using GBT: 0.7375447236877823


In [None]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Scale features
scaler = StandardScaler(inputCol="weather_features", outputCol="scaled_features", withStd=True, withMean=True)
final_df = scaler.fit(final_df).transform(final_df)

# 2. Split Data
train_data, test_data = final_df.randomSplit([0.7, 0.3], seed=42)

# 3. Model Training with Hyperparameter Tuning
gbt = GBTRegressor(featuresCol="scaled_features", labelCol="Temperature (C)", maxIter=50)

# Parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .addGrid(gbt.maxIter, [50, 100]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

# Cross-validation
evaluator = RegressionEvaluator(labelCol="Temperature (C)", predictionCol="prediction", metricName="r2")
crossval = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Train the model
cv_model = crossval.fit(train_data)

# 4. Model Evaluation
best_model = cv_model.bestModel
test_predictions = best_model.transform(test_data)
r2 = evaluator.evaluate(test_predictions)

# Results
print(f"Optimized R² on test data: {r2}")