# Séance 1 - Objectifs, chargement, traitement

## Étape 1 - Choix de la source de données

In [None]:
import org.apache.spark.sql.SparkSession

spark = org.apache.spark.sql.SparkSession@34440609


org.apache.spark.sql.SparkSession@34440609

In [2]:
val lignes = sc.textFile("data/Trajets_train.csv")
lignes.take(10).foreach(println)

id,ville_depart,ville_arrivee,distance_km,duree_min
1,Nice,Lyon,470,188
2,Dijon,Tours,290,116
3,Marseille,Lyon,315,126
4,Nantes,Bordeaux,335,134
5,Grenoble,Dijon,309,124
6,Nantes,Dijon,323,129
7,Lille,Dijon,468,187
8,Dijon,Strasbourg,310,124
9,Nantes,Lyon,516,206


lignes = data/Trajets_train.csv MapPartitionsRDD[1] at textFile at <console>:25


data/Trajets_train.csv MapPartitionsRDD[1] at textFile at <console>:25

## Étape 2 – Persona

**Type d’utilisateur :**  
Agent de planification des transports (exemple : agent SNCF)

**Persona :**  
- **Nom :** Julien Dubois  
- **Métier :** Agent SNCF  
- **Objectif :** Améliorer les trajets entre les grandes villes

**Indicateurs pertinents :**
- Temps de trajet le plus court par ville de départ.
- Temps de trajet le plus long par ville de départ.
- Vitesse moyenne (km/h) par liaison.
- Les trajets qui prennent le plus de temps par rapport à la distance.
- Nombre de départs par ville.
- Nombre d'arrivées par ville.

## Étape 3 - Chargement, pré-traitement

In [3]:

// Définition de la structure d’un trajet avec une case class
case class Trajet(
  id: Int,                 // Identifiant unique du trajet
  ville_depart: String,    // Ville de départ
  ville_arrivee: String,   // Ville d’arrivée
  distance_km: Int,        // Distance en kilomètres
  duree_min: Int           // Durée du trajet en minutes
)

val lignesSansEntete = lignes.filter(!_.startsWith("id"))

// Transformation de chaque ligne du CSV en un objet Trajet
val trajets = lignesSansEntete.map { ligne =>
  val champs = ligne.split(",")  // Découper la ligne en colonnes (tableau de chaînes)
  Trajet(
    champs(0).toInt,   // id
    champs(1),         // ville_depart
    champs(2),         // ville_arrivee
    champs(3).toInt,   // distance_km
    champs(4).toInt    // duree_min
  )
}

trajets.take(5).foreach(println)


Trajet(1,Nice,Lyon,470,188)
Trajet(2,Dijon,Tours,290,116)
Trajet(3,Marseille,Lyon,315,126)
Trajet(4,Nantes,Bordeaux,335,134)
Trajet(5,Grenoble,Dijon,309,124)


defined class Trajet
lignesSansEntete = MapPartitionsRDD[2] at filter at <console>:34
trajets = MapPartitionsRDD[3] at map at <console>:37


MapPartitionsRDD[3] at map at <console>:37

# Séance 2 - Ingestion, SparkSQL

## Étape 1 : Transformer le RDD en DataFrame SparkSQL

In [5]:
import spark.implicits._

// Transformer le RDD en DataFrame
val dfTrajets = trajets.toDF()

println("Nombre de trajets : " + dfTrajets.count())

dfTrajets.show()

dfTrajets.printSchema()

// Statistiques résumées sur toutes les colonnes numériques
dfTrajets.summary().show()

Nombre de trajets : 100
+---+------------+-------------+-----------+---------+
| id|ville_depart|ville_arrivee|distance_km|duree_min|
+---+------------+-------------+-----------+---------+
|  1|        Nice|         Lyon|        470|      188|
|  2|       Dijon|        Tours|        290|      116|
|  3|   Marseille|         Lyon|        315|      126|
|  4|      Nantes|     Bordeaux|        335|      134|
|  5|    Grenoble|        Dijon|        309|      124|
|  6|      Nantes|        Dijon|        323|      129|
|  7|       Lille|        Dijon|        468|      187|
|  8|       Dijon|   Strasbourg|        310|      124|
|  9|      Nantes|         Lyon|        516|      206|
| 10|       Rouen|        Dijon|        157|       63|
| 11|       Dijon|        Tours|        515|      206|
| 12|  Strasbourg|        Tours|        562|      225|
| 13|        Lyon|         Nice|        290|      116|
| 14|       Lille|        Tours|        263|      105|
| 15|      Angers|       Rennes|        1

dfTrajets = [id: int, ville_depart: string ... 3 more fields]


[id: int, ville_depart: string ... 3 more fields]

## Étape 2 - Extraction de dimensions

In [10]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
val spark = SparkSession.builder().appName("Atelier Spark Scala").master("local[*]").getOrCreate()
import spark.implicits._

val dfTrajets = spark.read.option("header", true).option("inferSchema", true).csv("data/Trajets_train.csv")

// Dimension : villes de départ
val dfVillesDepart = dfTrajets.select("ville_depart").distinct()
  .withColumn("id_ville_depart", monotonically_increasing_id())

// Dimension : villes d’arrivée
val dfVillesArrivee = dfTrajets.select("ville_arrivee").distinct()
  .withColumn("id_ville_arrivee", monotonically_increasing_id())

// Ajouter l’ID de la ville de départ
val dfTrajetsJoin1 = dfTrajets.join(dfVillesDepart, Seq("ville_depart"))

// Ajouter l’ID de la ville d’arrivée
val dfTrajetsFinal = dfTrajetsJoin1.join(dfVillesArrivee, Seq("ville_arrivee"))

// Nettoyer les colonnes littérales devenues inutiles
val dfTrajetsClean = dfTrajetsFinal
  .drop("ville_depart")
  .drop("ville_arrivee")

dfTrajetsClean.show()

+---+-----------+---------+---------------+----------------+
| id|distance_km|duree_min|id_ville_depart|id_ville_arrivee|
+---+-----------+---------+---------------+----------------+
|  1|        470|      188|              0|               8|
|  2|        290|      116|              1|              13|
|  3|        315|      126|              6|               8|
|  4|        335|      134|              5|               9|
|  5|        309|      124|              2|               1|
|  6|        323|      129|              5|               1|
|  7|        468|      187|              4|               1|
|  8|        310|      124|              1|              10|
|  9|        516|      206|              5|               8|
| 10|        157|       63|             14|               1|
| 11|        515|      206|              1|              13|
| 12|        562|      225|             10|              13|
| 13|        290|      116|              8|               0|
| 14|        263|      1

lastException = null
spark = org.apache.spark.sql.SparkSession@34440609
dfTrajets = [id: int, ville_depart: string ... 3 more fields]
dfVillesDepart = [ville_depart: string, id_ville_depart: bigint]
dfVillesArrivee = [ville_arrivee: string, id_ville_arrivee: bigint]
dfTrajetsJoin1 = [ville_depart: string, id: int ... 4 more fields]
dfTrajetsFinal = [ville_arrivee: string, ville_depart: string ... 5 more fields]
dfTrajetsClean = [id: int, distance_km: int ...


[id: int, distance_km: int ...

## Étape 3 - Tables Hive, SQL

In [11]:
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)

// Supprimer les tables existantes si nécessaire
hc.sql("DROP TABLE IF EXISTS trajets")
hc.sql("DROP TABLE IF EXISTS villes_depart")
hc.sql("DROP TABLE IF EXISTS villes_arrivee")

// Enregistrer les DataFrames en tant que tables Hive
dfTrajetsClean.write.mode("overwrite").saveAsTable("trajets")
dfVillesDepart.write.mode("overwrite").saveAsTable("villes_depart")
dfVillesArrivee.write.mode("overwrite").saveAsTable("villes_arrivee")

// Vérifier les résultats avec quelques requêtes SQL
hc.sql("SELECT * FROM trajets LIMIT 5").show()
hc.sql("SELECT * FROM villes_depart LIMIT 5").show()
hc.sql("SELECT COUNT(*) as nb_trajets FROM trajets").show()


+---+-----------+---------+---------------+----------------+
| id|distance_km|duree_min|id_ville_depart|id_ville_arrivee|
+---+-----------+---------+---------------+----------------+
|  1|        470|      188|              0|               8|
|  2|        290|      116|              1|              13|
|  3|        315|      126|              6|               8|
|  4|        335|      134|              5|               9|
|  5|        309|      124|              2|               1|
+---+-----------+---------+---------------+----------------+

+------------+---------------+
|ville_depart|id_ville_depart|
+------------+---------------+
|        Nice|              0|
|       Dijon|              1|
|    Grenoble|              2|
|      Angers|              3|
|       Lille|              4|
+------------+---------------+

+----------+
|nb_trajets|
+----------+
|       100|
+----------+



hc = org.apache.spark.sql.hive.HiveContext@43c59bb4




org.apache.spark.sql.hive.HiveContext@43c59bb4

# Séance 3 - Spark SQL (suite), Stats

## Étape 1 - Agrégations

### 1. Temps de trajet le plus court par ville de départ.

In [15]:
// On recrée un DataFrame avec la vitesse ajoutée (km / (min / 60) = km/h)
val dfTrajetsAvecVitesse = dfTrajetsClean
  .withColumn("vitesse_kmh", $"distance_km" / ($"duree_min" / 60))

dfTrajetsAvecVitesse.createOrReplaceTempView("trajets_vitesse")

hc.sql("""
  SELECT vd.ville_depart, MIN(t.duree_min) AS duree_minimum
  FROM trajets_vitesse t
  JOIN villes_depart vd ON t.id_ville_depart = vd.id_ville_depart
  GROUP BY vd.ville_depart 
""").show()



+------------+-------------+
|ville_depart|duree_minimum|
+------------+-------------+
|        Nice|           68|
|       Dijon|           22|
|    Grenoble|           33|
|      Angers|           28|
|       Lille|           70|
|      Nantes|          129|
|   Marseille|           45|
|       Paris|           49|
|        Lyon|           12|
|    Bordeaux|           17|
|  Strasbourg|           54|
|    Toulouse|           57|
|      Rennes|           57|
|       Tours|           74|
|       Rouen|           32|
+------------+-------------+



lastException = null
dfTrajetsAvecVitesse = [id: int, distance_km: int ... 4 more fields]


[id: int, distance_km: int ... 4 more fields]

### 2. Temps de trajet le plus long par ville de départ

In [16]:
hc.sql("""
  SELECT vd.ville_depart, MAX(t.duree_min) AS duree_maximum
  FROM trajets_vitesse t
  JOIN villes_depart vd ON t.id_ville_depart = vd.id_ville_depart
  GROUP BY vd.ville_depart
""").show()


+------------+-------------+
|ville_depart|duree_maximum|
+------------+-------------+
|        Nice|          227|
|       Dijon|          258|
|    Grenoble|          320|
|      Angers|          237|
|       Lille|          261|
|      Nantes|          230|
|   Marseille|          126|
|       Paris|          300|
|        Lyon|          276|
|    Bordeaux|          224|
|  Strasbourg|          328|
|    Toulouse|          228|
|      Rennes|          276|
|       Tours|          252|
|       Rouen|          266|
+------------+-------------+



### 3. Vitesse moyenne (km/h) par trajet

In [32]:
val dfVitesseMoyenne = hc.sql("""
  SELECT 
    vd.ville_depart, 
    va.ville_arrivee, 
    ROUND(AVG(t.vitesse_kmh), 1) AS vitesse_moyenne
  FROM trajets_vitesse t
  JOIN villes_depart vd ON t.id_ville_depart = vd.id_ville_depart
  JOIN villes_arrivee va ON t.id_ville_arrivee = va.id_ville_arrivee
  GROUP BY vd.ville_depart, va.ville_arrivee
""")

dfVitesseMoyenne
  .coalesce(1)
  .write
  .option("header", "true")
  .mode("overwrite")
  .csv("export/vitesse_moyenne_par_trajet")

  dfVitesseMoyenne.show()



+------------+-------------+---------------+
|ville_depart|ville_arrivee|vitesse_moyenne|
+------------+-------------+---------------+
|        Nice|     Bordeaux|          150.3|
|      Angers|        Lille|          147.9|
|    Toulouse|    Marseille|          150.3|
|    Bordeaux|       Rennes|          150.0|
|      Nantes|         Lyon|          150.3|
|       Lille|   Strasbourg|          149.9|
|       Lille|        Dijon|          150.2|
|    Grenoble|     Toulouse|          149.9|
|       Rouen|        Tours|          150.2|
|    Grenoble|        Dijon|          149.5|
|  Strasbourg|        Paris|          150.0|
|        Lyon|   Strasbourg|          150.2|
|      Angers|     Bordeaux|          150.5|
|      Nantes|       Angers|          150.0|
|      Nantes|        Dijon|          150.2|
|    Grenoble|       Rennes|          150.2|
|      Angers|         Nice|          150.1|
|       Lille|    Marseille|          150.1|
|       Rouen|        Dijon|          149.5|
|       To

dfVitesseMoyenne = [ville_depart: string, ville_arrivee: string ... 1 more field]


[ville_depart: string, ville_arrivee: string ... 1 more field]

### 4. Top 5 des trajets les plus lents (en km/h)


In [34]:
val dfLents = hc.sql("""
  SELECT 
    vd.ville_depart, 
    va.ville_arrivee, 
    ROUND(t.distance_km / (t.duree_min / 60.0), 1) AS vitesse_kmh 
  FROM trajets t
  JOIN villes_depart vd ON t.id_ville_depart = vd.id_ville_depart
  JOIN villes_arrivee va ON t.id_ville_arrivee = va.id_ville_arrivee
  ORDER BY vitesse_kmh ASC 
  LIMIT 5
""")

dfLents
  .coalesce(1)
  .write
  .option("header", "true")
  .mode("overwrite")
  .csv("export/top5_trajets_lents")

dfLents.show()

+------------+-------------+-----------+
|ville_depart|ville_arrivee|vitesse_kmh|
+------------+-------------+-----------+
|      Angers|       Rennes|      144.9|
|      Angers|        Lille|      147.9|
|    Bordeaux|        Lille|      148.2|
|        Lyon|       Rennes|      148.9|
|    Toulouse|        Rouen|      149.1|
+------------+-------------+-----------+



dfLents = [ville_depart: string, ville_arrivee: string ... 1 more field]


[ville_depart: string, ville_arrivee: string ... 1 more field]

## 5. Nombre de départs par ville

In [35]:
val dfNbDepart = hc.sql("""
  SELECT vd.ville_depart, COUNT(*) AS nb_depart
  FROM trajets t
  JOIN villes_depart vd ON t.id_ville_depart = vd.id_ville_depart
  GROUP BY vd.ville_depart
  ORDER BY nb_depart DESC
""")

dfNbDepart
  .coalesce(1)
  .write
  .option("header", "true")
  .mode("overwrite")
  .csv("export/nb_depart_par_ville")

  dfNbDepart.show()


+------------+---------+
|ville_depart|nb_depart|
+------------+---------+
|  Strasbourg|       11|
|       Dijon|       10|
|        Lyon|       10|
|       Lille|        9|
|    Grenoble|        8|
|       Paris|        7|
|    Bordeaux|        7|
|       Rouen|        7|
|      Nantes|        6|
|        Nice|        5|
|      Angers|        5|
|      Rennes|        5|
|    Toulouse|        4|
|   Marseille|        3|
|       Tours|        3|
+------------+---------+



dfNbDepart = [ville_depart: string, nb_depart: bigint]


[ville_depart: string, nb_depart: bigint]

## 6. Nombre d'arrivées par ville

In [36]:
val dfNbArrivee = hc.sql("""
  SELECT va.ville_arrivee, COUNT(*) AS nb_arrivee
  FROM trajets t
  JOIN villes_arrivee va ON t.id_ville_arrivee = va.id_ville_arrivee
  GROUP BY va.ville_arrivee
  ORDER BY nb_arrivee DESC
""")

dfNbArrivee
  .coalesce(1)
  .write
  .option("header", "true")
  .mode("overwrite")
  .csv("export/nb_arrivee_par_ville")

  dfNbArrivee.show()


+-------------+----------+
|ville_arrivee|nb_arrivee|
+-------------+----------+
|       Rennes|        13|
|        Tours|        11|
|        Lille|        10|
|    Marseille|         9|
|       Angers|         7|
|   Strasbourg|         7|
|        Dijon|         6|
|       Nantes|         6|
|     Bordeaux|         6|
|         Nice|         5|
|     Grenoble|         5|
|     Toulouse|         5|
|        Paris|         4|
|         Lyon|         4|
|        Rouen|         2|
+-------------+----------+



dfNbArrivee = [ville_arrivee: string, nb_arrivee: bigint]


[ville_arrivee: string, nb_arrivee: bigint]