<a href="https://colab.research.google.com/github/RobDrie/IT-Tools-Spark/blob/main/Spark_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=62263b7c76d76521675bec785e48b498ebc198f2b921b241dbc66c86b760176d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import pyspark
from pyspark.sql.functions import year, month, dayofweek, col, split
from pyspark.sql.functions import weekofyear

In [None]:
spark = pyspark.sql.SparkSession.builder.appName('Session1').getOrCreate()

## Load & Process gas price data with Spark
* Download 2010 to 2023 gas price data
* Download gas Stations file & Services file (2022 versions)

In [None]:
!git clone https://github.com/rvm-courses/GasPrices

Cloning into 'GasPrices'...
remote: Enumerating objects: 143, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (19/19), done.[K
remote: Total 143 (delta 3), reused 0 (delta 0), pack-reused 124[K
Receiving objects: 100% (143/143), 619.51 MiB | 29.93 MiB/s, done.
Resolving deltas: 100% (33/33), done.
Updating files: 100% (40/40), done.


## Data Preparation - step 1
* Read and merge all gas files
* Split date in year, month, week of the year
* Prepare latitude & longitude for mapping (divide by the right power of 10)
* Make data available as a table in order to be able to use Spark S

In [None]:
def price_download(year_list):
  prices_df = None

  for year in year_list:
    file_path = f'GasPrices/Prix{year}.csv.gz'
    current_df = spark.read.option("header", "False").option('delimiter', ';').csv(file_path)

    if prices_df is None:
      prices_df = current_df
    else:
      prices_df = prices_df.union(current_df)

  return prices_df

In [None]:
year_list = ['2019', '2020', '2021', '2022S1', '2022S2']
prices_df = price_download(year_list)

file_path = 'GasPrices/Stations2022.csv.gz'
stations_df = spark.read.option("header", "False").option('delimiter', '|').csv(file_path)

file_path = 'GasPrices/Services2022.csv.gz'
services_df = spark.read.option("header", "False").option('delimiter', '|').csv(file_path)

### Variable definitions
* id_pdv = points of sales
* cp = zip code
* pop = type of population
* latitude
* longitude
* date
* id carburant = gas id
* nom carburant = gas label
* prix = price in millieuros

In [None]:
# # Set the column headers for price dataframe
# old_names_prices = prices_df.columns
# old_names_stations = station_df.columns
# old_names_services = services_df.columns

# new_names_prices = ['id_pdv', 'cp', 'pop', 'latitude', 'longitude',
#              'date', 'id_carburant', 'nom_carburant', 'prix']

# new_names_stations = ['id_pdv', 'cp', 'pop', 'latitude', 'longitude',
#              'adresse', 'vile']

# new_names_services = ['id_pdv', 'cp', 'pop', 'latitude', 'longitude',
#              'services']

# for new_name, old_name in zip(new_names, old_names):
#   prices_df = prices_df.withColumnRenamed(f'{old_name}', f'{new_name}')

In [None]:
# Set the column headers for price dataframe
columns_mapping = {
    'prices_df': {'old': prices_df.columns, 'new': ['id_pdv', 'cp', 'pop', 'latitude', 'longitude', 'date', 'id_carburant', 'nom_carburant', 'prix']},
    'stations_df': {'old': stations_df.columns, 'new': ['id_pdv', 'cp', 'pop', 'latitude', 'longitude', 'adresse', 'vile']},
    'services_df': {'old': services_df.columns, 'new': ['id_pdv', 'cp', 'pop', 'latitude', 'longitude', 'services']}
}

for df_name, mapping in columns_mapping.items():
    for new_name, old_name in zip(mapping['new'], mapping['old']):
        globals()[df_name] = globals()[df_name].withColumnRenamed(f'{old_name}', f'{new_name}')


### Split date in year, month, week of the year

In [None]:
split_datetime = split(prices_df['date'], 'T')
prices_df = prices_df.withColumn('Date', split_datetime.getItem(0))
split_date = split(prices_df['Date'], '-')

prices_df = prices_df.withColumn('Year', split_date.getItem(0))
prices_df = prices_df.withColumn('Month', split_date.getItem(1))
prices_df = prices_df.withColumn('Day', split_date.getItem(2))
prices_df = prices_df.withColumn('WeekOfYear', weekofyear(prices_df['date']))

In [None]:
prices_df.show(10)
stations_df.show(10)
services_df.show(10)

+-------+-----+---+--------+---------+----------+------------+-------------+----+----+-----+---+----------+
| id_pdv|   cp|pop|latitude|longitude|      Date|id_carburant|nom_carburant|prix|Year|Month|Day|WeekOfYear|
+-------+-----+---+--------+---------+----------+------------+-------------+----+----+-----+---+----------+
|1000001|01000|  R| 4620114|   519791|2019-01-04|           1|       Gazole|1328|2019|   01| 04|         1|
|1000001|01000|  R| 4620114|   519791|2019-01-07|           1|       Gazole|1348|2019|   01| 07|         2|
|1000001|01000|  R| 4620114|   519791|2019-01-10|           1|       Gazole|1374|2019|   01| 10|         2|
|1000001|01000|  R| 4620114|   519791|2019-01-11|           1|       Gazole|1387|2019|   01| 11|         2|
|1000001|01000|  R| 4620114|   519791|2019-01-14|           1|       Gazole|1394|2019|   01| 14|         3|
|1000001|01000|  R| 4620114|   519791|2019-01-16|           1|       Gazole|1394|2019|   01| 16|         3|
|1000001|01000|  R| 4620114|

### Prepare latitude & longitude for mapping (Divide by the right power of 10) to map the latitude & longitude on a [0,1] scale, we identity two different constants

In [None]:
division_constant_latitude = 10**7
division_constant_longitude = 10**6

dataframes = [prices_df, stations_df, services_df]

for i in range(len(dataframes)):
    dataframes[i] = dataframes[i].withColumn('latitude_adj', col('latitude') / division_constant_latitude)
    dataframes[i] = dataframes[i].withColumn('longitude_adj', col('longitude') / division_constant_longitude)

prices_df, stations_df, services_df = dataframes

### Make data available as a table in order to be able to use Spark SQL

In [None]:
prices_df.createOrReplaceTempView("Gas_prices")
stations_df.createOrReplaceTempView("Stations")
services_df.createOrReplaceTempView("Services")

### Through basic statistics, consider which gas types have some interest for the rest of the project


In [None]:
# Idenitify the different gas types
spark.sql("""
  SELECT DISTINCT nom_carburant
  FROM Gas_prices
""").show()

+-------------+
|nom_carburant|
+-------------+
|          E10|
|         SP98|
|          E85|
|       Gazole|
|         SP95|
|         GPLc|
|         NULL|
+-------------+



In [None]:
# Inspect summary statistics for different gas types
spark.sql("""
    SELECT
        nom_carburant,
        COUNT(*) as count,
        AVG(prix) as mean,
        STDDEV(prix) as stddev,
        MIN(prix) as min,
        MAX(prix) as max
    FROM
        Gas_prices
    GROUP BY
        nom_carburant
""").show()

+-------------+-------+------------------+------------------+-----+-----+
|nom_carburant|  count|              mean|            stddev|  min|  max|
+-------------+-------+------------------+------------------+-----+-----+
|         NULL|  14566|              NULL|              NULL| NULL| NULL|
|          E10|4357845|1061.6209274556575|  668.596370536261|0.001|  959|
|          E85|1141058|443.79118290393654|342.67485665722415|0.001|  999|
|         GPLc| 753286| 641.0837959314258| 392.8441353639444|0.019|  999|
|       Gazole|5273314| 999.6524478986081| 642.1946477857184|0.001|  999|
|         SP95|1327084|1071.6206721887988| 671.9647156310942|0.004|9.999|
|         SP98|4433584|1135.2852601017596| 702.3608904771925|0.001|  969|
+-------------+-------+------------------+------------------+-----+-----+



## Data Preparation - step 2
* Compute price index for each station per week:


In [None]:
# Idenitify the different stations
result_df = spark.sql("""
  SELECT DISTINCT id_pdv
  FROM Stations
""")
count = result_df.count()
print(count)

13637


In [None]:
from pyspark.sql import functions as F
# Compute price index

# Average week price for each gas type
average_price_gas_type = (
    prices_df
    .groupBy("id_carburant", "WeekOfYear")
    .agg(F.avg("prix")
    .alias("avg_price"))
)

In [None]:
# Average week price across all stations


In [None]:
average_price_gas_type.show()

+------------+----------+------------------+
|id_carburant|WeekOfYear|         avg_price|
+------------+----------+------------------+
|           5|        15|1026.8279417468495|
|           1|        23| 967.6790876084692|
|           6|         6|1131.3485029268845|
|           4|         1| 625.1007167437825|
|           5|        40| 1079.047403827629|
|           3|        41| 479.9917641923257|
|           2|        42|1166.5154194870593|
|           2|        51|1268.2314869539136|
|           6|        49|1157.8346213971126|
|           2|        39| 1138.549206085533|
|           5|        32|1030.8693405235476|
|           5|        41|1143.5995631037706|
|           5|        50|1084.8844499713553|
|           3|        32|421.52906523526724|
|           3|        44|471.49127773253707|
|           3|        13| 399.8121020398829|
|           5|        38|  1094.80155703838|
|           3|         8|426.81525787281834|
|           1|        44|1045.5431524325854|
|         

In [None]:
average_price_gas_type.count()

319

In [88]:
prices_stations_df = prices_df.join(stations_df, "id_pdv")

average_week_price_per_station_gas_type = (
    prices_stations_df
    .groupBy("id_carburant", "id_pdv", "WeekOfYear")
    .agg(F.avg("prix").alias("avg_week_price"))
    )

average_week_price_per_station_gas_type.show()


+------------+-------+----------+------------------+
|id_carburant| id_pdv|WeekOfYear|    avg_week_price|
+------------+-------+----------+------------------+
|           1|1000001|        11|1055.0000714285713|
|           2|1000007|        31|  642.233090909091|
|           6|1000007|        38|490.00977777777774|
|           1|1000008|        38|1354.2307692307693|
|           1|1000009|        29|            1406.8|
|           1|1000009|        44|1445.5714285714287|
|           6|1000012|        34|1207.1395357142858|
|           1|1000013|         5|1034.4170357142857|
|           2|1100001|        11|1087.6548181818182|
|           4|1100001|        19|402.58050000000003|
|           5|1100001|        10|           945.732|
|           2|1100006|        33|            1459.0|
|           3|1100007|        42|             729.0|
|           2|1110001|        28|            1514.0|
|           5|1120004|         3|          844.1653|
|           6|1120004|        19|           14

In [86]:
prices_stations_df.show(5)

+-------+-----+---+--------+---------+----------+------------+-------------+----+----+-----+---+----------+------------+-------------+-----+---+--------+---------+--------------------+--------------------+------------+-------------+
| id_pdv|   cp|pop|latitude|longitude|      Date|id_carburant|nom_carburant|prix|Year|Month|Day|WeekOfYear|latitude_adj|longitude_adj|   cp|pop|latitude|longitude|             adresse|                vile|latitude_adj|longitude_adj|
+-------+-----+---+--------+---------+----------+------------+-------------+----+----+-----+---+----------+------------+-------------+-----+---+--------+---------+--------------------+--------------------+------------+-------------+
|1000001|01000|  R| 4620114|   519791|2019-01-04|           1|       Gazole|1328|2019|   01| 04|         1|   0.4620114|     0.519791|01000|  R| 4620100|   519800|596 AVENUE DE TRE...|SAINT-DENIS-LèS-B...|     0.46201|       0.5198|
|1000001|01000|  R| 4620114|   519791|2019-01-07|           1|      