In [0]:
# -------------------------------------------------------------------------
# Nouveau dataset de novembre : trains additionnels ou mis à jour
# -------------------------------------------------------------------------
data_nov = """TrainID,TrainType,DepartureStation,ArrivalStation,DepartureTime,ArrivalTime,DelayMinutes
TGV010,TGV,Paris,Nice,2024-11-02 07:00:00,2024-11-02 11:20:00,0
TGV011,TGV,Nice,Paris,2024-11-02 13:00:00,2024-11-02 17:30:00,5
TER108,TER,Paris,Lille,2024-11-02 08:30:00,2024-11-02 10:50:00,12
TER109,TER,Reims,Paris,2024-11-02 09:15:00,2024-11-02 11:00:00,0
IC205,Intercités,Nantes,Bordeaux,2024-11-02 10:00:00,2024-11-02 13:10:00,3
IC202,Intercités,Bordeaux,Nantes,2024-11-02 09:00:00,2024-11-02 12:15:00,25  # même ID qu'avant, mais retard mis à jour
"""

path_nov = "/dbfs/spark_lab/train_schedule_2024_11.csv"
dbutils.fs.put(
    path_nov,
    data_nov,
    overwrite=True
)

print(f"✅ Dataset créé : {path_nov}")


In [0]:
# Lecture du fichier novembre et création du DataFrame df_nov
df_nov = spark.read.option("header", True).csv(path_nov)

display(df_nov)

In [0]:
# Restaure la table Delta à la version précédente (étape n-1)
last_version = spark.sql("SELECT max(version) as v FROM (DESCRIBE HISTORY sncf_analytics.raw.train_schedule)").collect()[0]['v']
spark.sql(f"RESTORE TABLE sncf_analytics.raw.train_schedule TO VERSION AS OF {last_version - 1}")

In [0]:
from pyspark.sql.functions import concat_ws, date_format, col

df_nov = df_nov.withColumn(
    "ServiceUID",
    concat_ws("_", col("TrainID"), date_format(col("DepartureTime"), "yyyyMMddHHmm"))
)


In [0]:
df_raw = spark.table("sncf_analytics.raw.train_schedule")

# Si la clé n’existe pas encore, on la crée
if "ServiceUID" not in df_raw.columns:
    df_raw = df_raw.withColumn(
        "ServiceUID",
        concat_ws("_", col("TrainID"), date_format(col("DepartureTime"), "yyyyMMddHHmm"))
    )

# Réécriture (une seule fois)
df_raw.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("sncf_analytics.raw.train_schedule")


In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "sncf_analytics.raw.train_schedule")

(
    delta_table.alias("t")
    .merge(
        df_nov.alias("s"),
        "t.ServiceUID = s.ServiceUID"
    )
    .whenMatchedUpdateAll()     # met à jour le même trajet (retard corrigé)
    .whenNotMatchedInsertAll()  # ajoute les nouveaux voyages
    .execute()
)

print("✅ MERGE Delta historisé effectué avec succès (clé ServiceUID utilisée).")


In [0]:
%sql
SELECT TrainID, DepartureTime, DelayMinutes
FROM sncf_analytics.raw.train_schedule
ORDER BY TrainID, DepartureTime;


In [0]:
from pyspark.sql.functions import to_timestamp, when, round

df = spark.table("sncf_analytics.raw.train_schedule")

df_transformed = (
    df
    .withColumn("DepartureTime", to_timestamp(col("DepartureTime")))
    .withColumn("ArrivalTime", to_timestamp(col("ArrivalTime")))
    .withColumn("DelayMinutes", col("DelayMinutes").cast("int"))
    .withColumn("Status", when(col("DelayMinutes") > 10, "Late").otherwise("On Time"))
    .withColumn("TravelDurationMinutes",
                round((col("ArrivalTime").cast("long") - col("DepartureTime").cast("long")) / 60, 2))
)

df_transformed.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("sncf_analytics.curated.train_status")

print("✅ Couche CURATED mise à jour avec historique et statut actualisé.")


In [0]:
%sql
SELECT 
  TrainType, 
  COUNT(*) AS Nb_Voyages,
  ROUND(AVG(DelayMinutes),2) AS AvgDelay,
  SUM(CASE WHEN Status='Late' THEN 1 ELSE 0 END)*100/COUNT(*) AS LateRate
FROM sncf_analytics.curated.train_status
GROUP BY TrainType;
