In [1]:
#Installation de pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
#Importation des librairies
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

In [3]:
#Creation de Sparksession
spark = SparkSession.builder.appName('Projet').getOrCreate()

In [None]:
#Verification de Spark
spark

In [4]:
#Lecture des fichiers
features = spark.read.option('header','true').csv('features.csv',inferSchema=True,nullValue="NA")
stores = spark.read.option('header','true').csv('stores.csv',inferSchema=True)
train = spark.read.option('header','true').csv('train.csv',inferSchema=True)

In [5]:
#Affichage de features
features.show()

+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|               Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05 00:00:00|      42.31|     2.572|     null|     null|     null|     null|     null|211.0963582|       8.106|    false|
|    1|2010-02-12 00:00:00|      38.51|     2.548|     null|     null|     null|     null|     null|211.2421698|       8.106|     true|
|    1|2010-02-19 00:00:00|      39.93|     2.514|     null|     null|     null|     null|     null|211.2891429|       8.106|    false|
|    1|2010-02-26 00:00:00|      46.63|     2.561|     null|     null|     null|     null|     null|211.3196429|       8.106|    false|
|    1|2010-03-05 00:00:00|       46.5|     2.62

# **Phase de Nettoyage**


In [5]:
#Changer le type des Colonnes MarkDown en float
for i in range(1,6):colname = 'MarkDown' +str(i)
nfeatures =features.withColumn(colname,features[colname].cast("double"))

In [6]:
#Changer le type des colonnes 'CPI', 'Unemployment' en float
nfeatures =nfeatures.withColumn('CPI',nfeatures['CPI'].cast("double"))
nfeatures =nfeatures.withColumn('Unemployment',nfeatures['Unemployment'].cast("double"))


In [None]:
#Afficher les null de toutes les colonnes
Dict_Null = {col:nfeatures.filter(nfeatures[col].isNull()).count() for col in nfeatures.columns}
Dict_Null

{'Store': 0,
 'Date': 0,
 'Temperature': 0,
 'Fuel_Price': 0,
 'MarkDown1': 4158,
 'MarkDown2': 5269,
 'MarkDown3': 4577,
 'MarkDown4': 4726,
 'MarkDown5': 4140,
 'CPI': 585,
 'Unemployment': 585,
 'IsHoliday': 0}

In [7]:
#Remaplcer les null par des 0
for i in range(1,6):
  colname = 'MarkDown' +str(i)
  nfeatures= nfeatures.na.fill(value=0,subset=colname)


In [9]:
#Affichage de nfeatures
nfeatures.show()

+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|               Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+-------------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05 00:00:00|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|    false|
|    1|2010-02-12 00:00:00|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|     true|
|    1|2010-02-19 00:00:00|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|    false|
|    1|2010-02-26 00:00:00|      46.63|     2.561|      0.0|      0.0|      0.0|      0.0|      0.0|211.3196429|       8.106|    false|
|    1|2010-03-05 00:00:00|       46.5|     2.62

In [32]:
#Calculer la moyenne des deux colonnes 'CPI' et 'Unemployement'
nfeatures.select(mean('CPI')).show()
nfeatures.select(mean('Unemployment')).show()

+------------------+
|          avg(CPI)|
+------------------+
|172.46080919156995|
+------------------+

+-----------------+
|avg(Unemployment)|
+-----------------+
|7.826821038790301|
+-----------------+



In [8]:
#Remplacer les valeur null par la moyenne
nfeatures= nfeatures.na.fill(value=172.46080930610773,subset=['CPI'])
nfeatures= nfeatures.na.fill(value=7.826821038790305,subset=['Unemployment'])

In [12]:
#Calcule le nombre de null dans toutes les colonnes
Dict_Null = {col:nfeatures.filter(nfeatures[col].isNull()).count() for col in nfeatures.columns}
Dict_Null

{'Store': 0,
 'Date': 0,
 'Temperature': 0,
 'Fuel_Price': 0,
 'MarkDown1': 0,
 'MarkDown2': 0,
 'MarkDown3': 0,
 'MarkDown4': 0,
 'MarkDown5': 0,
 'CPI': 0,
 'Unemployment': 0,
 'IsHoliday': 0}

In [None]:
#Affichage du schema de stores
stores.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



In [None]:
#Affichage du schema de train
train.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [13]:
#Affichage du schema de nfeatures
nfeatures.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: double (nullable = false)
 |-- MarkDown2: double (nullable = false)
 |-- MarkDown3: double (nullable = false)
 |-- MarkDown4: double (nullable = false)
 |-- MarkDown5: double (nullable = false)
 |-- CPI: double (nullable = false)
 |-- Unemployment: double (nullable = false)
 |-- IsHoliday: boolean (nullable = true)



# **Jointure des tables**

In [10]:
#joindre les deux Dataframe train et stores
train = train.join(stores, how = 'inner', on = 'Store')

In [11]:
#Joindre les deux DataFrame (train ; nfeatures) et supprimer la colonne en double "IsHoliday" 
nfeatures= nfeatures.drop('IsHoliday')
train = train.join(nfeatures, how = 'inner', on = ['Store', 'Date'])

+-----+-------------------+----+------------+---------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Store|               Date|Dept|Weekly_Sales|IsHoliday|Type|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----+-------------------+----+------------+---------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|    1|2010-02-05 00:00:00|   1|     24924.5|    false|   A|151315|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|
|    1|2010-02-12 00:00:00|   1|    46039.49|     true|   A|151315|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|
|    1|2010-02-19 00:00:00|   1|    41595.55|    false|   A|151315|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|
|   

In [None]:
#Affichage de train
train.show()

+-----+-------------------+----+------------+---------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|Store|               Date|Dept|Weekly_Sales|IsHoliday|Type|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+-----+-------------------+----+------------+---------+----+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|    1|2010-02-05 00:00:00|   1|     24924.5|    false|   A|151315|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|
|    1|2010-02-12 00:00:00|   1|    46039.49|     true|   A|151315|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|
|    1|2010-02-19 00:00:00|   1|    41595.55|    false|   A|151315|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|
|   

# **Transformation des donnees**

In [12]:
#Importation de StringIdexer
from pyspark.ml.feature import StringIndexer

#Convertir la colonne Type en float en devenant Type_i et supprimer la colonne d'origine Type
indexer = StringIndexer(inputCols=["Type"],outputCols=["Type_i"])
train=indexer.fit(train).transform(train)
train = train.drop('Type')

In [None]:
#Affichage de train
train.show()

+-----+-------------------+----+------------+---------+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+------+
|Store|               Date|Dept|Weekly_Sales|IsHoliday|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|Type_i|
+-----+-------------------+----+------------+---------+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+------+
|    1|2010-02-05 00:00:00|   1|     24924.5|    false|151315|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|   0.0|
|    1|2010-02-12 00:00:00|   1|    46039.49|     true|151315|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|   0.0|
|    1|2010-02-19 00:00:00|   1|    41595.55|    false|151315|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|

In [14]:
#Convertir la colonne IsHoliday en integer(0 pour false et 1 pour true)
train = train.withColumn('IsHoliday',when(train.IsHoliday == 'false', 0).otherwise(1))

In [32]:
#Affichage de train
train.show()

+-----+-------------------+----+------------+---------+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+------+
|Store|               Date|Dept|Weekly_Sales|IsHoliday|  Size|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|Type_i|
+-----+-------------------+----+------------+---------+------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+------+
|    1|2010-02-05 00:00:00|   1|     24924.5|        0|151315|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|   0.0|
|    1|2010-02-12 00:00:00|   1|    46039.49|        1|151315|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|   0.0|
|    1|2010-02-19 00:00:00|   1|    41595.55|        0|151315|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|

In [15]:
#Grouper notre DataFrame par les colonnes Store et Date,calculer la sum de Weekly_sales et la moyenne pour le reste des colonnes tout en les renommant
df =train.groupBy(['Store' , 'Date']).agg(sum('Weekly_Sales').alias('Sales'),avg('IsHoliday').alias('IsHoliday'),avg( 'Size').alias( 'Size'),
                                     avg('Temperature').alias('Temperature'),avg('Fuel_Price').alias('Fuel_Price'),avg('CPI').alias('CPI'),
                                      avg('Unemployment').alias('Unemployment'),avg('Type_i').alias('Type_i'))

In [None]:
#train.write.csv('feature_test.csv')

In [20]:
#Affichage de df
df.show()

+-----+----------+------------------+---------+--------+------------------+------------------+------------------+------------------+------+
|Store|      Date|             Sales|IsHoliday|    Size|       Temperature|        Fuel_Price|               CPI|      Unemployment|Type_i|
+-----+----------+------------------+---------+--------+------------------+------------------+------------------+------------------+------+
|    4|2010-06-25|1846651.9499999997|      0.0|205863.0| 81.10000000000005|2.6539999999999973| 126.1265999999998|             7.896|   0.0|
|   16|2010-03-12|         445393.74|      0.0| 57197.0| 28.64000000000004|2.6839999999999984|189.73720750000018| 7.038999999999992|   1.0|
|   25|2010-02-05| 677231.6299999999|      0.0|128107.0|21.099999999999987| 2.783999999999999|204.24719349999984| 8.187000000000008|   1.0|
|    1|2010-12-03|1548033.7799999993|      0.0|151315.0| 49.26999999999999| 2.707999999999999|211.60719299999985| 7.838000000000005|   0.0|
|    2|2012-08-24|  

In [19]:
#Afficher le schema de df
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Sales: double (nullable = true)
 |-- IsHoliday: double (nullable = true)
 |-- Size: double (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- Type_i: double (nullable = true)



In [18]:
#Changer le format de la date
df=df.withColumn("Date",to_date(col("Date"), "MM-dd-yyyy"))

In [22]:
#Importation de VectorAssembler
from pyspark.ml.feature import VectorAssembler

#Selectionner les colonnes du vecteur en les regroupant dans une seule colonne Independent_Features
featureassembler = VectorAssembler(inputCols=['Store','IsHoliday','Size','Temperature','Fuel_Price','CPI','Unemployment','Type_i'],outputCol="Independent_Features")
df1 =featureassembler.transform(df)

In [33]:
#Affichage de df1
df1.show()

+-----+----------+------------------+---------+--------+------------------+------------------+------------------+------------------+------+--------------------+
|Store|      Date|             Sales|IsHoliday|    Size|       Temperature|        Fuel_Price|               CPI|      Unemployment|Type_i|Independent_Features|
+-----+----------+------------------+---------+--------+------------------+------------------+------------------+------------------+------+--------------------+
|    4|2010-06-25|1846651.9499999997|      0.0|205863.0| 81.10000000000005|2.6539999999999973| 126.1265999999998|             7.896|   0.0|[4.0,0.0,205863.0...|
|   16|2010-03-12|         445393.74|      0.0| 57197.0| 28.64000000000004|2.6839999999999984|189.73720750000018| 7.038999999999992|   1.0|[16.0,0.0,57197.0...|
|   25|2010-02-05| 677231.6299999999|      0.0|128107.0|21.099999999999987| 2.783999999999999|204.24719349999984| 8.187000000000008|   1.0|[25.0,0.0,128107....|
|    1|2010-12-03|1548033.77999999

In [23]:
#Afficher le schema de df1
df1.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Sales: double (nullable = true)
 |-- IsHoliday: double (nullable = true)
 |-- Size: double (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- Type_i: double (nullable = true)
 |-- Independent_Features: vector (nullable = true)



In [24]:
#Selectionner le vecteur et la cible
df_final = df1.select("Independent_Features","Sales")
df_final.show()

+--------------------+------------------+
|Independent_Features|             Sales|
+--------------------+------------------+
|[4.0,0.0,205863.0...|1846651.9499999997|
|[16.0,0.0,57197.0...|         445393.74|
|[25.0,0.0,128107....| 677231.6299999999|
|[1.0,0.0,151315.0...|1548033.7799999993|
|[2.0,0.0,202307.0...|        1876788.15|
|[4.0,0.0,205863.0...|        2116475.38|
|[4.0,0.0,205863.0...|        2209835.43|
|[7.0,0.0,70713.0,...| 642748.2100000002|
|[7.0,1.0,70713.0,...|         563460.77|
|[8.0,0.0,155078.0...|1069061.6300000001|
|[10.0,0.0,126512....|1704753.0199999998|
|[13.0,0.0,219622....|2059458.2500000007|
|[15.0,0.0,123737....|         603318.89|
|[23.0,0.0,114533....|        1407191.96|
|[26.0,0.0,152513....| 919503.4000000004|
|[29.0,0.0,93638.0...| 505304.3300000001|
|[29.0,0.0,93638.0...|         598251.57|
|[3.0,0.0,37392.0,...|         352864.49|
|[19.0,0.0,203819....|1373270.0599999998|
|[21.0,0.0,140167....| 867283.2499999998|
+--------------------+------------

In [26]:
#Importation de LinearRegression
from pyspark.ml.regression import LinearRegression
#Entrainement du model
## 75% des donnees iront a train_data et 25% a test_data
train_data,test_data = df_final.randomSplit([0.70,0.30])

#Precision du vecteur 'Independent Features' et de la cible 'Sales'
regressor=LinearRegression(featuresCol='Independent_Features', labelCol='Sales')

#Application du modele a train_data
regressor = regressor.fit(train_data)

In [27]:
#Affichage du coefficient
regressor.coefficients

DenseVector([-7538.3091, 76667.8233, 7.7615, 1328.1992, -13659.0736, -1850.2813, -17613.5228, 106361.9609])

In [28]:
#Affichage de l'intercept
regressor.intercept

557894.5150144538

In [29]:
#Calculer la prediction
pred_results= regressor.evaluate(test_data)

In [35]:
#Affichage de la prediction
pred_results.predictions.show(100)

+--------------------+------------------+------------------+
|Independent_Features|             Sales|        prediction|
+--------------------+------------------+------------------+
|[1.0,0.0,151315.0...|        1316899.31|1212901.8396926043|
|[1.0,0.0,151315.0...|        1554806.68|1216862.5831081206|
|[1.0,0.0,151315.0...|        2270188.99|1201560.7570831522|
|[1.0,0.0,151315.0...|1444732.2799999996|1220729.4595800638|
|[1.0,0.0,151315.0...| 1891034.929999999| 1222584.299349755|
|[1.0,0.0,151315.0...|        1394393.84|1215560.2542985636|
|[1.0,0.0,151315.0...|1472515.7899999998|1226545.6977488296|
|[1.0,0.0,151315.0...|1636339.6500000001|1217199.6699487963|
|[1.0,0.0,151315.0...|1551659.2799999993| 1235844.249250453|
|[1.0,0.0,151315.0...|1494479.4899999998| 1236379.097583947|
|[1.0,0.0,151315.0...|1539483.7000000002| 1219967.708211728|
|[1.0,0.0,151315.0...|1594968.2799999998| 1242753.241729605|
|[1.0,0.0,151315.0...|1456800.2799999998| 1235002.652036965|
|[1.0,0.0,151315.0...|15

In [34]:
#Les valeurs de performance du modele (meanAbsoluteError,meanSquaredError)
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(232082.98044284168, 97887730070.41074)