In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import requests


In [2]:
spark = SparkSession.builder \
    .appName("Retrieve Bike Counting Data") \
    .getOrCreate()

MONTH = '04'
YEAR = '2023'
START_DATE = f'{YEAR}-01-01'
END_DATE = f'{YEAR}-12-31'

# Requête des données

In [3]:
channels_data = spark.read.option("header", "true") \
               .option("sep", ";") \
               .csv("./CSVs/channels.csv")

channels_ID = [row['channel_id'] for row in channels_data.collect()]

In [4]:
def request_counting_data(channel_id, start_date, end_date):
    url = f'https://data.grandlyon.com/fr/datapusher/ws/timeseries/pvo_patrimoine_voirie.pvocomptagemeasure/all.json?start_datetime__gte={start_date}&start_datetime__lt={end_date}&channel_id__eq={channel_id}&maxfeatures=-1'
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json().get('values', [])
    except Exception as e:
        print(f"Failed to retrieve data for channel {channel_id}: {e}")
        return []

def request_temp_data(start_date, end_date):
    url = f'https://data.grandlyon.com/fr/datapusher/ws/timeseries/biotope.temperature/all.json?horodate__gte={start_date}&horodate__lt={end_date}&maxfeatures=-1'
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json().get('values', [])
    except Exception as e:
        print(f"Failed to retrieve temperature data: {e}")
        return []

def request_rain_data(start_date, end_date):
    url = f'https://data.grandlyon.com/fr/datapusher/ws/timeseries/eau.pluviometrie_mesure/all.json?maxfeatures=-1&horodate__gte={start_date}&horodate__lte={end_date}'
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.json().get('values', [])
    except Exception as e:
        print(f"Failed to retrieve rainfall data: {e}")
        return []

# Traitement des données

## Traitement des données de comptage

In [5]:
# Aggréger les données de comptage de la même heure toutes stations confondues
def traitement_count_data(data):
    data_with_hour = data.withColumn("start_datetime", F.date_trunc("hour", F.col("start_datetime")))
    count_data_agg = data_with_hour.groupBy("start_datetime") \
        .agg(F.sum("count").alias("total_count")) \
        .orderBy("start_datetime")
    count_data_agg.drop("end_datetime")
    count_data_agg = count_data_agg.withColumnRenamed("start_datetime", "horodate_heure")
    return count_data_agg

## Traitement des données de température

In [6]:
# Aggréger les données de température par horodate selon l'heure et la station de mesure
def traitement_temp_data(data):
    data_with_hour = data.withColumn("horodate_heure", F.date_trunc("hour", F.col("horodate")))
    temp_data_agg = data_with_hour.groupBy("horodate_heure") \
        .agg(F.avg("degre_celsius").alias("avg_temp_per_hour")) \
        .orderBy("horodate_heure")
    temp_data_agg.drop("horodate")
    return temp_data_agg

## Traitement des données de précipitations

In [7]:
# Aggréger les données de précipitations d'une même station de mesure
def traitement_rainfall(data):
    data_with_hour = data.withColumn("horodate_heure", F.date_trunc("hour", F.col("horodate")))
    rain_data_agg_station = data_with_hour.groupBy("horodate_heure", "identifiant") \
        .agg(F.sum("pluie_mm").alias("avg_pluie_per_station_per_hour"))
    rain_data_agg = rain_data_agg_station.groupBy("horodate_heure") \
        .agg(F.avg("avg_pluie_per_station_per_hour").alias("avg_pluie_per_hour")) \
        .orderBy("horodate_heure")
    rain_data_agg.drop("horodate")
    return rain_data_agg


# Exécution du traitement

In [9]:
count_rdd_ids = spark.sparkContext.parallelize(channels_ID)
count_records_rdd = count_rdd_ids.flatMap(lambda cid: request_counting_data(cid, START_DATE, END_DATE))
count_data_spark = spark.createDataFrame(count_records_rdd)
aggregated_count_data = traitement_count_data(count_data_spark)
aggregated_count_data.show()

+-------------------+-----------+
|     horodate_heure|total_count|
+-------------------+-----------+
|2023-01-01 00:00:00|       1888|
|2023-01-01 01:00:00|       2283|
|2023-01-01 02:00:00|       2408|
|2023-01-01 03:00:00|       2694|
|2023-01-01 04:00:00|       1998|
|2023-01-01 05:00:00|       1501|
|2023-01-01 06:00:00|       1041|
|2023-01-01 07:00:00|        744|
|2023-01-01 08:00:00|       1052|
|2023-01-01 09:00:00|       1524|
|2023-01-01 10:00:00|       2652|
|2023-01-01 11:00:00|       4311|
|2023-01-01 12:00:00|       5022|
|2023-01-01 13:00:00|       4816|
|2023-01-01 14:00:00|       6020|
|2023-01-01 15:00:00|       7214|
|2023-01-01 16:00:00|       6399|
|2023-01-01 17:00:00|       5123|
|2023-01-01 18:00:00|       4599|
|2023-01-01 19:00:00|       4552|
+-------------------+-----------+
only showing top 20 rows


In [10]:
temp_records = request_temp_data(START_DATE, END_DATE)
temp_data_spark = spark.createDataFrame(temp_records)
aggregated_temp_data = traitement_temp_data(temp_data_spark)
aggregated_temp_data = aggregated_temp_data.withColumn("avg_temp_per_hour", F.round(F.col("avg_temp_per_hour"), 2))
aggregated_temp_data.show()

+-------------------+-----------------+
|     horodate_heure|avg_temp_per_hour|
+-------------------+-----------------+
|2023-01-01 00:00:00|            15.14|
|2023-01-01 01:00:00|            14.91|
|2023-01-01 02:00:00|            14.72|
|2023-01-01 03:00:00|            14.88|
|2023-01-01 04:00:00|            14.89|
|2023-01-01 05:00:00|            14.78|
|2023-01-01 06:00:00|            14.59|
|2023-01-01 07:00:00|            14.43|
|2023-01-01 08:00:00|             14.4|
|2023-01-01 09:00:00|            14.83|
|2023-01-01 10:00:00|            15.71|
|2023-01-01 11:00:00|            16.58|
|2023-01-01 12:00:00|            16.87|
|2023-01-01 13:00:00|            17.04|
|2023-01-01 14:00:00|            16.85|
|2023-01-01 15:00:00|            16.34|
|2023-01-01 16:00:00|            16.01|
|2023-01-01 17:00:00|            15.67|
|2023-01-01 18:00:00|            15.63|
|2023-01-01 19:00:00|            15.46|
+-------------------+-----------------+
only showing top 20 rows


In [11]:
# Partie Meteo
rain_records = request_rain_data(START_DATE, END_DATE)
rain_data_spark = spark.createDataFrame(rain_records)
aggregated_rain_data = traitement_rainfall(rain_data_spark)
aggregated_rain_data = aggregated_rain_data.withColumn("avg_pluie_per_hour", F.round(F.col("avg_pluie_per_hour"), 2))
aggregated_rain_data.show()

+-------------------+------------------+
|     horodate_heure|avg_pluie_per_hour|
+-------------------+------------------+
|2023-01-01 08:00:00|               0.3|
|2023-01-01 09:00:00|               0.1|
|2023-01-01 14:00:00|               0.1|
|2023-01-01 19:00:00|               0.1|
|2023-01-02 04:00:00|               0.1|
|2023-01-02 05:00:00|               0.1|
|2023-01-02 13:00:00|              0.17|
|2023-01-02 14:00:00|               0.5|
|2023-01-02 15:00:00|               0.2|
|2023-01-02 16:00:00|              0.15|
|2023-01-02 17:00:00|              0.88|
|2023-01-02 18:00:00|              2.02|
|2023-01-02 19:00:00|              1.89|
|2023-01-02 20:00:00|              1.09|
|2023-01-02 21:00:00|              0.51|
|2023-01-02 22:00:00|               0.2|
|2023-01-02 23:00:00|              0.87|
|2023-01-03 00:00:00|               0.6|
|2023-01-03 01:00:00|              0.23|
|2023-01-03 02:00:00|              0.27|
+-------------------+------------------+
only showing top

In [12]:
final_data = aggregated_count_data.join(aggregated_temp_data, "horodate_heure", "left") \
                                  .join(aggregated_rain_data, "horodate_heure", "left") \
                                  .withColumnRenamed("horodate_heure", "datetime") \
                                  .orderBy("datetime")
final_data = final_data.fillna(0)
final_data.show(20)

+-------------------+-----------+-----------------+------------------+
|           datetime|total_count|avg_temp_per_hour|avg_pluie_per_hour|
+-------------------+-----------+-----------------+------------------+
|2023-01-01 00:00:00|       1888|            15.14|               0.0|
|2023-01-01 01:00:00|       2283|            14.91|               0.0|
|2023-01-01 02:00:00|       2408|            14.72|               0.0|
|2023-01-01 03:00:00|       2694|            14.88|               0.0|
|2023-01-01 04:00:00|       1998|            14.89|               0.0|
|2023-01-01 05:00:00|       1501|            14.78|               0.0|
|2023-01-01 06:00:00|       1041|            14.59|               0.0|
|2023-01-01 07:00:00|        744|            14.43|               0.0|
|2023-01-01 08:00:00|       1052|             14.4|               0.3|
|2023-01-01 09:00:00|       1524|            14.83|               0.1|
|2023-01-01 10:00:00|       2652|            15.71|               0.0|
|2023-

In [14]:
final_data.write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("sep", ";") \
    .csv(f"./CSVs/spark/counting_data{YEAR}")