In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
# Créer une session Spark
spark = SparkSession.builder.appName("Projet2_RegressionLineaire").getOrCreate()

In [None]:
# Charger les données du fichier csv dans un dataframe
df = spark.read.csv("gold1.csv", header=True, inferSchema=True)

In [None]:
# Afficher le schéma du dataframe
df.printSchema()

# Afficher un résumé statistique des colonnes du dataframe
df.describe().show()
df.count()

# Afficher les 5 premières lignes du dataframe
df.show(5)

# Afficher le nombre de valeurs manquantes dans chaque colonne
df.select([sum(f.col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()
#df = df.na.drop() # supprime les lignes avec des valeurs manquantes dans le dataframe (pas besoin pour notre dataset)

# Afficher le nombre duplicatas dans le dataframe
lignes_dupliquees = df.groupby(df.columns).agg(f.count("*").alias("compte")).filter("compte > 1") # récupère toutes les lignes dupliquées
lignes_dupliquees = lignes_dupliquees.withColumn('nb_duplicatas', f.col("compte") - 1) # ajoute une colonne qui compte le nombre de duplicatas pour chaque ligne (on soustrait 1 car il y a une ligne qui faudra garder pour chaque groupe de duplicatas)
lignes_dupliquees.select(f.sum("nb_duplicatas").alias("nb_total_duplicatas")).show()
#df = df.dropDuplicates() # supprime les duplicatas dans le dataframe (pas besoin pour notre dataset)

# Convertir la colonne Date de type date en un format numérique (timestamp)
df = df.withColumn("Date_Timestamp",df["Date"].cast("Date"))
df = df.withColumn("Date_Timestamp",f.unix_timestamp(df["Date_Timestamp"]))
df = df.withColumn("Close",df["Close"])

In [None]:
# Convertir le dataframe pyspark en dataframe pandas pour la visualisation
df_pandas = df.toPandas()

# Créer une matrice des corrélations entre les variables
df_pandas_donnees_numeriques = df_pandas.select_dtypes(include=["number"])
matrice_de_correlations = df_pandas_donnees_numeriques.corr()
sns.heatmap(matrice_de_correlations, annot=True, cmap='coolwarm')
plt.title('Corrélations entre les variables')
plt.show()

# Créer un graphique Prix selon Date (lineplot)
plt.figure(figsize=(10,6))
plt.plot(df_pandas["Date_Timestamp"], df_pandas["Close"])
plt.xlabel("Date")
plt.ylabel("Prix")
plt.show()

In [None]:
# Préparation des données pour le modèle
assembler = VectorAssembler(inputCols=["Date_Timestamp"], outputCol="features")
data_ready = assembler.transform(df).select("features", "Close").withColumnRenamed("Close", "label")

# Diviser les données du dataframe en données d'entrainement (80%) et de test (20%)
train_data, test_data = data_ready.randomSplit([0.8,0.2],seed=5)

# Créer un modèle de régression linéaire et l'entrainer avec les données d'entrainement
lr = LinearRegression(featuresCol="features",labelCol="label")
lr_model = lr.fit(train_data)

# Effectuer des prédictions avec les données de test 
test_predictions = lr_model.transform(test_data)
test_predictions.select("features","label","prediction").show()

In [None]:
# Calculer les métriques d'évaluation du modèle
test_evaluation = lr_model.evaluate(test_data)
print(".")
print("Erreur quadratique moyenne (MSE)",test_evaluation.meanSquaredError)
print("Erreur absolue moyenne (MAE)", test_evaluation.meanAbsoluteError)
print("Coefficient de Détermination (R2)", test_evaluation.r2)

In [None]:
# Transformer les données d'entrainement et de test sous format panda
resultats_entrainement_pandas = train_data.select("features","label").toPandas()
resultats_test_pandas = test_predictions.select("features","label", "prediction").toPandas()

# Extraire les dates (timestamp)
resultats_entrainement_pandas["Date_Timestamp"] = resultats_entrainement_pandas["features"].apply(lambda x: x[0])
resultats_test_pandas["Date_Timestamp"] = resultats_test_pandas["features"].apply(lambda x: x[0])

# Créer le graphique pour visualiser les résultats
plt.figure(figsize=(10,6))

plt.scatter(resultats_entrainement_pandas["Date_Timestamp"], resultats_entrainement_pandas["label"], color='blue', label = "Données d'entrainement")
plt.scatter(resultats_test_pandas["Date_Timestamp"],resultats_test_pandas["label"], color="green", label = "Données de test réelles")
plt.scatter(resultats_test_pandas["Date_Timestamp"], resultats_test_pandas["prediction"], color="orange", marker="x", label="Prédictions")

x_line = np.linspace(df.select("Date_Timestamp").rdd.min()[0],df.select("Date_Timestamp").rdd.max()[0], 100)
y_line = lr_model.coefficients[0] * x_line + lr_model.intercept

plt.plot(x_line,y_line, color = "red",label= "Droite de régression")
plt.xlabel("Date")
plt.ylabel("Prix")
plt.title("Régression linéaire du Prix en fonction de la Date")
plt.legend()
plt.show() 