# Construire une simple pipeline de transformation de donnée SQL

Ce exemple de pipeline utilise le célèbre jeu de données sur les trajets de taxis de New York (NYC taxi trip dataset). Ce jeu de données public est également disponible sur Kaggle.

Ce notebook utilise l'architecture Medallion qui permet d'améliorer la qualité des données en les faisant passer à travers trois couches : Bronze (données brutes), Silver (données nettoyées) et Gold (données finales prêtes à l'utilisation).
Architecture Medallion : https://drive.google.com/file/d/1Zn0mfRxOiXjwnwQX4cd0-MrW42zHjNvl/view?usp=sharing



### Couche Bronze
Table Bronze : Ingestion des données brutes
À ce niveau, les données brutes des trajets en taxi sont ingérées, avec un contrôle basique de qualité des données pour s'assurer que les distances des trajets sont positives.

In [0]:
from pyspark.sql.functions import col, when, date_trunc, avg, round

In [0]:
%sql
--Bronze layer: Raw data ingestion
CREATE OR REPLACE TABLE taxi_raw_records AS
SELECT * 
FROM samples.nyctaxi.trips WHERE trip_distance > 0.0

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT *
FROM taxi_raw_records

### Couche Silver
La couche Silver crée deux tables :

Table Silver 1 : Trajets signalés.
Cette table identifie les trajets potentiellement suspects en fonction des critères de tarif et de distance.

In [0]:
%sql
-- Silver Table 1: Flagged rides

CREATE OR REPLACE TABLE flagged_rides AS
SELECT
  date_trunc("week", tpep_pickup_datetime) AS week,
  pickup_zip AS zip,
  fare_amount,
  trip_distance
FROM
  taxi_raw_records
WHERE ((pickup_zip = dropoff_zip AND fare_amount > 50) OR
       (trip_distance < 5 AND fare_amount > 50));

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT *
FROM flagged_rides

week,zip,fare_amount,trip_distance
2016-02-22T00:00:00Z,10115,55.0,0.18
2016-02-22T00:00:00Z,11371,52.0,4.02
2016-02-22T00:00:00Z,10017,52.0,0.12
2016-02-29T00:00:00Z,11101,52.0,2.88
2016-02-22T00:00:00Z,11109,52.0,3.34
2016-01-11T00:00:00Z,11106,52.0,2.94
2016-01-11T00:00:00Z,10003,52.0,0.03
2015-12-28T00:00:00Z,10020,52.0,15.3
2016-01-04T00:00:00Z,10035,52.0,4.7
2016-02-08T00:00:00Z,11422,52.0,0.2


Table Silver 2 : Statistiques hebdomadaires.

Cette table Silver calcule les tarifs moyens hebdomadaires et les distances moyennes des trajets

In [0]:
%sql
--Silver layer 2: Weekly statistics

CREATE OR REPLACE TABLE weekly_stats AS
SELECT
  date_trunc("week", tpep_pickup_datetime) AS week,
  AVG(fare_amount) AS avg_amount,
  AVG(trip_distance) AS avg_distance
FROM
  taxi_raw_records
GROUP BY week
ORDER BY week ASC;

num_affected_rows,num_inserted_rows


### Couche Gold
Table Gold 1 : Top N trajets. 

Les tables Silver sont intégrées pour fournir une vue complète des trois trajets avec les tarifs les plus élevés.

In [0]:
%sql
-- Gold layer: Top N rides to investigate

CREATE OR REPLACE TABLE top_n AS
SELECT
  ws.week,
  ROUND(ws.avg_amount, 2) AS avg_amount,
  ROUND(ws.avg_distance, 3) AS avg_distance,
  fr.fare_amount,
  fr.trip_distance,
  fr.zip
FROM
  flagged_rides fr
LEFT JOIN weekly_stats ws ON ws.week = fr.week
ORDER BY fr.fare_amount DESC
LIMIT 3;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM top_n;

week,avg_amount,avg_distance,fare_amount,trip_distance,zip
2016-01-04T00:00:00Z,11.91,2.865,95.0,5.2,10009
2016-02-15T00:00:00Z,12.24,2.894,60.0,2.0,7311
2016-02-22T00:00:00Z,12.79,2.973,60.0,0.92,11422




## Planifier une tâche pour le notebook
Pour s'assurer que les tables Bronze, Silver et Gold sont régulièrement mises à jour avec des données récentes, il est recommandé de planifier l'exécution du notebook en tant que tâche périodique. Cela permettra de maintenir les tables à jour avec les dernières informations. Pour planifier une tâche pour le notebook :

- Dans le notebook, cliquez sur le bouton Planifier (Schedule) en haut à droite.
- Dans la fenêtre de dialogue Planification (Schedule), vous pouvez, si vous le souhaitez, saisir un nom pour la tâche. Par défaut, le nom est celui du notebook.
- Les exécutions planifiées vous permettent de définir une fréquence d'exécution pour la tâche. Ajustez la fréquence, l'heure et le fuseau horaire selon vos besoins.
- Laissez tous les autres paramètres tels quels, puis cliquez sur Créer (Create).
- Une fois la planification créée avec succès, cliquez sur Exécuter maintenant (Run Now) pour déclencher une exécution de la tâche pour la pipeline NYCTaxiSQL.

# Félicitations !
Vous avez maintenant créé, exécuté et analysé une pipeline de traitement des données. 

### Question unique : Reprendre les transformations en utilisant l'API Spark.



In [0]:
df_trips = spark.sql("SELECT * FROM samples.nyctaxi.trips")
df_trips.show()

+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
| 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
| 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
| 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
| 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
| 2016-02-05 06:45:02|  2016-02-05 06:50:26|          1.8|        7.0|     10009|      10065|
| 2016-02-15 15:03:28|  2016-02-15 15:18:45|         2.58|       12.0|     10153|      10199|
| 2016-02-25 19:09:26|  2016-02-25 19:24:50|          1.4|  

In [0]:
df_taxi_raw_records = df_trips.filter(df_trips.trip_distance > 0.0)
df_taxi_raw_records.show()

+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-16 22:40:45|  2016-02-16 22:59:25|         5.35|       18.5|     10003|      11238|
| 2016-02-05 16:06:44|  2016-02-05 16:26:03|          6.5|       21.5|     10282|      10001|
| 2016-02-08 07:39:25|  2016-02-08 07:44:14|          0.9|        5.5|     10119|      10003|
| 2016-02-29 22:25:33|  2016-02-29 22:38:09|          3.5|       13.5|     10001|      11222|
| 2016-02-03 17:21:02|  2016-02-03 17:23:24|          0.3|        3.5|     10028|      10028|
| 2016-02-19 03:24:25|  2016-02-19 03:44:56|         6.57|       21.5|     10001|      11377|
| 2016-02-02 14:05:23|  2016-02-02 14:23:07|         1.08|       11.5|     10103|      10167|
| 2016-02-20 15:42:20|  2016-02-20 15:50:40|          0.8|  

In [0]:
df_flagged_rides = (
    df_taxi_raw_records
    .select(
        date_trunc("week", col("tpep_pickup_datetime")).alias("week"), 
        col("pickup_zip").alias("zip"),
        col("fare_amount"),
        col("trip_distance")
    )
    .filter(
        (
            (col("pickup_zip") == col("dropoff_zip")) & (col("fare_amount") > 50)
        ) | (
            (col("trip_distance") < 5) & (col("fare_amount") > 50)
        )
    )
)
df_flagged_rides.show()

+-------------------+-----+-----------+-------------+
|               week|  zip|fare_amount|trip_distance|
+-------------------+-----+-----------+-------------+
|2016-02-08 00:00:00|11422|       52.0|          0.2|
|2016-01-11 00:00:00|11422|       52.0|          8.7|
|2015-12-28 00:00:00|10023|       52.0|          0.3|
|2016-01-18 00:00:00|10020|       52.0|          0.1|
|2016-02-22 00:00:00|10017|       52.0|         0.12|
|2016-02-29 00:00:00|11101|       52.0|         2.88|
|2016-02-22 00:00:00|11109|       52.0|         3.34|
|2016-01-11 00:00:00|11106|       52.0|         2.94|
|2016-01-11 00:00:00|10003|       52.0|         0.03|
|2015-12-28 00:00:00|10020|       52.0|         15.3|
|2016-01-04 00:00:00|10035|       52.0|          4.7|
|2016-01-25 00:00:00|11109|       52.0|          3.0|
|2016-02-22 00:00:00|10115|       55.0|         0.18|
|2016-02-22 00:00:00|11371|       52.0|         4.02|
|2016-02-22 00:00:00|11422|       60.0|         0.92|
|2016-01-11 00:00:00|11109| 

In [0]:
df_weekly_stats = (
    df_taxi_raw_records
    .groupBy(date_trunc("week", col("tpep_pickup_datetime")).alias("week"))
    .agg(
        avg("fare_amount").alias("avg_amount"),
        avg("trip_distance").alias("avg_distance")
    )
    .orderBy("week", ascending=True)
)
df_weekly_stats.show()

+-------------------+------------------+------------------+
|               week|        avg_amount|      avg_distance|
+-------------------+------------------+------------------+
|2015-12-28 00:00:00|12.178038379530918|   3.1040618336887|
|2016-01-04 00:00:00|11.907765076862436|2.8646038628301174|
|2016-01-11 00:00:00|12.332039911308204|2.9312638580931263|
|2016-01-18 00:00:00|11.966793403573066| 2.742175904718279|
|2016-01-25 00:00:00|12.981361426256077|2.8746961102106967|
|2016-02-01 00:00:00|11.990339116719243| 2.746336750788645|
|2016-02-08 00:00:00| 12.20651356238698|2.7510813743218807|
|2016-02-15 00:00:00|12.244146522870956| 2.894492376348084|
|2016-02-22 00:00:00| 12.79211403184006|2.9734727878563505|
|2016-02-29 00:00:00| 12.60960960960961|2.9733633633633625|
+-------------------+------------------+------------------+



In [0]:
df_top_n = (
    df_flagged_rides
    .join(df_weekly_stats, df_flagged_rides.week == df_weekly_stats.week, "left")
    .select(
        df_weekly_stats["week"],
        round(df_weekly_stats["avg_amount"], 2).alias("avg_amount"),
        round(df_weekly_stats["avg_distance"], 3).alias("avg_distance"),
        df_flagged_rides["fare_amount"],
        df_flagged_rides["trip_distance"],
        df_flagged_rides["zip"]
    )
    .orderBy(col("fare_amount").desc())
    .limit(3)
)
df_top_n.show()

+-------------------+----------+------------+-----------+-------------+-----+
|               week|avg_amount|avg_distance|fare_amount|trip_distance|  zip|
+-------------------+----------+------------+-----------+-------------+-----+
|2016-01-04 00:00:00|     11.91|       2.865|       95.0|          5.2|10009|
|2016-02-22 00:00:00|     12.79|       2.973|       60.0|         0.92|11422|
|2016-02-15 00:00:00|     12.24|       2.894|       60.0|          2.0| 7311|
+-------------------+----------+------------+-----------+-------------+-----+

