# Spark Project


<center>
Gas Consumption in France
2023-2024
</center>

In [46]:
import pyspark
import yaml
import glob
import pyspark.sql.functions as F
from pyspark.sql.functions import year,month,weekofyear
from pyspark.ml.feature import StringIndexer,OneHotEncoder, Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier


Read some configurations from the yaml file

In [47]:
with open('config.yaml') as f:
    configuration = yaml.safe_load(f)


In [48]:
configuration['service_file_parameters']

[{'delimiter': '|'},
 {'schema': 'id_pdv int,cp int,pop string,latitude double,longitude  double,services string'}]

## 1. Create a Spark Session

In [49]:
spark  = pyspark.sql.SparkSession.builder.appName("Gas").getOrCreate()

In [50]:
spark

2. Data preparation

2.1 Read the data from the csv files

Prepare the schema for the different datasets using the yaml configuration file

In [51]:
#get the right delimiter for each each data from the configuration file
gas_delimiter = configuration['gas_file_parameters'][0]['delimiter']
station_delimiter = configuration['station_file_parameters'][0]['delimiter']
service_delimiter = configuration['service_file_parameters'][0]['delimiter']

#get the right schema for each data from the configuration file
gas_schema = configuration['gas_file_parameters'][1]['schema']
station_schema = configuration['station_file_parameters'][1]['schema']
service_schema = configuration['service_file_parameters'][1]['schema']

#collect the file data paths for each data from the configuration file
gas_files = glob.glob("data/Prix*.csv")
station_files = glob.glob("data/Station*.csv")
service_files = glob.glob("data/Service*.csv")



In [52]:
gas_ddf = spark.read.csv(gas_files, schema=gas_schema, sep=gas_delimiter)
station_ddf = spark.read.csv(station_files, schema=station_schema, sep=station_delimiter)
service_ddf = spark.read.csv(service_files, schema=service_schema, sep=service_delimiter)

Explore the gas dataframe

In [53]:
gas_ddf.describe().show()



+-------+-------------------+-----------------+-------------------+-----------------+------------------+-----------------+-------------+------------------+
|summary|             id_pdv|               cp|                pop|         latitude|         longitude|     id_carburant|nom_carburant|              prix|
+-------+-------------------+-----------------+-------------------+-----------------+------------------+-----------------+-------------+------------------+
|  count|           48873023|         48873024|           48873023|         48861413|          48861720|         48834162|     48834162|          48834162|
|   mean|5.178747010046008E7|51777.70275334303|-18951.105633333333|4642316.448959652| 266430.0602396445|3.274674417470295|         NULL|1082.4760620455415|
| stddev|2.688841715853314E7|26894.29038135437| 146411.93674099928|533857.2721312031|330934.95540068485|2.054033955455618|         NULL| 573.6996826572682|
|    min|            1000001|                0|            -1739

                                                                                

In [54]:
#number of observations
gas_ddf.count()

                                                                                

48873026

Remove missing observations

In [55]:
c1 = gas_ddf.count()
r = 100 * (c1 -  gas_ddf.dropna(how='any').count() ) / c1
print(f"The removal rate of null observations is {r:.2f}%")




The removal rate of null observations is 0.10%


                                                                                

Save the new data without null observations

In [56]:
gas_ddf = gas_ddf.dropna(how='any')

2.2 Preprocessing the Gas data </br>
We will be doing the following :</br>
* a) Sort date by date column
* b) Split the date in year, month and weak of the year
* c) Prepare latitude & longitude for mapping (divide by the right power of 10)
* d) Create a Table associated with gas data

To make it easier we will use a preprocessing pipeline


2.2.a) Sort the date by date column

In [57]:
gas_ddf = gas_ddf.sort("date",ascending = True)

In [58]:
gas_ddf.show(5)



+--------+-----+---+---------------+---------------+-------------------+------------+-------------+------+
|  id_pdv|   cp|pop|       latitude|      longitude|               date|id_carburant|nom_carburant|  prix|
+--------+-----+---+---------------+---------------+-------------------+------------+-------------+------+
|76150002|76150|  A|4949456.8714207|99803.232277829|2008-01-01 00:00:00|           1|       Gazole|1190.0|
|54300005|54300|  R|  4859064.54138|  642599.690486|2008-01-01 00:00:00|           1|       Gazole|1239.0|
| 6250005| 6250|  R|      4358849.0|       703322.0|2008-01-01 00:00:00|           2|         SP95|1374.0|
|54630001|54630|  A|      4860743.0|       617385.0|2008-01-01 00:00:00|           1|       Gazole|1239.0|
|11700003|11700|  A|      4317816.0|       254421.0|2008-01-01 00:00:00|           2|         SP95|1469.0|
+--------+-----+---+---------------+---------------+-------------------+------------+-------------+------+
only showing top 5 rows



                                                                                

2.2.b) Split the date in year, month and weak of the year

In [59]:
gas_ddf = gas_ddf.withColumn("year",year(gas_ddf.date))
gas_ddf = gas_ddf.withColumn("month",month(gas_ddf.date))
gas_ddf = gas_ddf.withColumn("weekofyear",weekofyear(gas_ddf.date))
gas_ddf.show(5)



+--------+-----+---+---------------+---------------+-------------------+------------+-------------+------+----+-----+----------+
|  id_pdv|   cp|pop|       latitude|      longitude|               date|id_carburant|nom_carburant|  prix|year|month|weekofyear|
+--------+-----+---+---------------+---------------+-------------------+------------+-------------+------+----+-----+----------+
|76150002|76150|  A|4949456.8714207|99803.232277829|2008-01-01 00:00:00|           1|       Gazole|1190.0|2008|    1|         1|
|54300005|54300|  R|  4859064.54138|  642599.690486|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
| 6250005| 6250|  R|      4358849.0|       703322.0|2008-01-01 00:00:00|           2|         SP95|1374.0|2008|    1|         1|
|54630001|54630|  A|      4860743.0|       617385.0|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
|11700003|11700|  A|      4317816.0|       254421.0|2008-01-01 00:00:00|           2|         SP9

                                                                                

2.2.c) Prepare latitude & longitude for mapping (divide by the right power of 10)

In [60]:
gas_ddf = gas_ddf.withColumn("latitude",F.col("latitude")/100_000)
gas_ddf = gas_ddf.withColumn("longitude",F.col("longitude")/100_000)
gas_ddf.show(5)



+--------+-----+---+------------------+-----------------+-------------------+------------+-------------+------+----+-----+----------+
|  id_pdv|   cp|pop|          latitude|        longitude|               date|id_carburant|nom_carburant|  prix|year|month|weekofyear|
+--------+-----+---+------------------+-----------------+-------------------+------------+-------------+------+----+-----+----------+
|76150002|76150|  A|   49.494568714207| 0.99803232277829|2008-01-01 00:00:00|           1|       Gazole|1190.0|2008|    1|         1|
|54300005|54300|  R|48.590645413800004|6.425996904860001|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
| 6250005| 6250|  R|          43.58849|          7.03322|2008-01-01 00:00:00|           2|         SP95|1374.0|2008|    1|         1|
|54630001|54630|  A|          48.60743|          6.17385|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
|11700003|11700|  A|          43.17816|          2.54421|2008-

                                                                                

2.2.d) Create a Table associated with gas data

In [61]:
gas_ddf.createOrReplaceTempView('gas')
spark.sql("SELECT * FROM gas").show(5)



+--------+-----+---+------------------+-----------------+-------------------+------------+-------------+------+----+-----+----------+
|  id_pdv|   cp|pop|          latitude|        longitude|               date|id_carburant|nom_carburant|  prix|year|month|weekofyear|
+--------+-----+---+------------------+-----------------+-------------------+------------+-------------+------+----+-----+----------+
|76150002|76150|  A|   49.494568714207| 0.99803232277829|2008-01-01 00:00:00|           1|       Gazole|1190.0|2008|    1|         1|
|54300005|54300|  R|48.590645413800004|6.425996904860001|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
| 6250005| 6250|  R|          43.58849|          7.03322|2008-01-01 00:00:00|           2|         SP95|1374.0|2008|    1|         1|
|54630001|54630|  A|          48.60743|          6.17385|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
|11700003|11700|  A|          43.17816|          2.54421|2008-

                                                                                

Which gas types have some interest for the rest of the project?

In [62]:
spark.sql(
    """
    SELECT nom_carburant,
    count(*) as count,
    round(100 * count(*) / (SELECT count(*) FROM gas),2) as `ratio(%)`
    FROM gas 
    GROUP BY  nom_carburant
    ORDER BY count DESC
    """
    ).show()



+-------------+--------+--------+
|nom_carburant|   count|ratio(%)|
+-------------+--------+--------+
|       Gazole|16633159|   34.07|
|          E10|10515716|   21.54|
|         SP98|10197268|   20.89|
|         SP95| 7122418|   14.59|
|         GPLc| 2181898|    4.47|
|          E85| 2173137|    4.45|
+-------------+--------+--------+



                                                                                

In our dataset, the representation of gas types, specifically **E85 and GPLc**, is notably lower. </br> Their individual ratio stands at approximately 5%, indicating a comparatively lower occurrence in our data.
We will drop those gas types for next part of the project.


Drop gas types **E85 and GPLc**

In [63]:
gas_ddf = gas_ddf.filter(
    "nom_carburant != 'GPLc' OR nom_carburant != 'E85'")


2.3 Compute price index for each station per week:</br>
Compute a new variable called ‚ÄúPrice Index‚Äù for each gas type sold in
a station such as:</br>

$ùë∑ùíìùíäùíÑùíÜ ùë∞ùíèùíÖùíÜùíô = ùüèùüéùüé √ó (\frac{ùë´ùíÇùíö ùë∑ùíìùíäùíÑùíÜ ùíäùíè ùíîùíïùíÇùíïùíäùíêùíè ‚àí ùë®ùíóùíÜùíìùíÇùíàùíÜ ùë´ùíÇùíö ùë∑ùíìùíäùíÑùíÜ ùíäùíè ùë≠ùíìùíÇùíèùíÑùíÜ}{ùë®ùíóùíÜùíìùíÇùíàùíÜ ùë´ùíÇùíö ùë∑ùíìùíäùíÑùíÜ ùíäùíè ùë≠ùíìùíÇùíèùíÑùíÜ} + ùüè)$

In [64]:
gas_ddf.show(2)



+--------+-----+---+--------+---------+-------------------+------------+-------------+------+----+-----+----------+
|  id_pdv|   cp|pop|latitude|longitude|               date|id_carburant|nom_carburant|  prix|year|month|weekofyear|
+--------+-----+---+--------+---------+-------------------+------------+-------------+------+----+-----+----------+
| 6250005| 6250|  R|43.58849|  7.03322|2008-01-01 00:00:00|           1|       Gazole|1239.0|2008|    1|         1|
|11700003|11700|  A|43.17816|  2.54421|2008-01-01 00:00:00|           2|         SP95|1469.0|2008|    1|         1|
+--------+-----+---+--------+---------+-------------------+------------+-------------+------+----+-----+----------+
only showing top 2 rows



                                                                                