# Data Jointure V2

## Configuration

In [1]:
%AddJar file:///home/jovyan/work/apps/Emiasd-Flight-Data-Analysis.jar

Starting download from file:///home/jovyan/work/apps/Emiasd-Flight-Data-Analysis.jar
Finished download of Emiasd-Flight-Data-Analysis.jar
Using cached version of Emiasd-Flight-Data-Analysis.jar


In [17]:
import org.apache.spark.sql.SparkSession
import com.flightdelay.config.{AppConfiguration, ConfigurationLoader, ExperimentConfig}
import com.flightdelay.data.loaders.FlightDataLoader

// Env Configuration
val args: Array[String] = Array("jupyter")
implicit val configuration: AppConfiguration = ConfigurationLoader.loadConfiguration(args)
implicit val experiment: ExperimentConfig = configuration.experiments(0)

val spark = SparkSession.builder()
  .config(sc.getConf)
  .config("spark.eventLog.enabled", "true")
  .config("spark.eventLog.dir", s"${configuration.common.output.basePath}/spark-events")  // ex: "file:/tmp/spark-events" ou "hdfs:///spark-events"
  .getOrCreate()

// Rendre la session Spark implicite
implicit val session = spark



args = Array(jupyter)
configuration = AppConfiguration(local,CommonConfig(42,DataConfig(/home/jovyan/work/data,FileConfig(/home/jovyan/work/data/FLIGHT-3Y/Flights/201201*.csv),FileConfig(/home/jovyan/work/data/FLIGHT-3Y/Weather/20101*.txt),FileConfig(/home/jovyan/work/data/FLIGHT-3Y/wban_airport_timezone.csv)),OutputConfig(/home/jovyan/work/output,FileConfig(/home/jovyan/work/output/data),FileConfig(/home/jovyan/work/output/model)),MLFlowConfig(false,http://localhost:5555)),Stream(ExperimentConfig(Experience-jupyter,Baseline Random Forest with hyperpara...


AppConfiguration(local,CommonConfig(42,DataConfig(/home/jovyan/work/data,FileConfig(/home/jovyan/work/data/FLIGHT-3Y/Flights/201201*.csv),FileConfig(/home/jovyan/work/data/FLIGHT-3Y/Weather/20101*.txt),FileConfig(/home/jovyan/work/data/FLIGHT-3Y/wban_airport_timezone.csv)),OutputConfig(/home/jovyan/work/output,FileConfig(/home/jovyan/work/output/data),FileConfig(/home/jovyan/work/output/model)),MLFlowConfig(false,http://localhost:5555)),Stream(ExperimentConfig(Experience-jupyter,Baseline Random Forest with hyperpara...

## Chargement des données

In [18]:
val flightDFPath = s"${configuration.common.output.basePath}/common/data/processed_flights.parquet"
val flightDF = spark.read.parquet(flightDFPath)

println("Flight DF Count: ", flightDF.count())

(Flight DF Count: ,17310299)


flightDFPath = /home/jovyan/work/output/common/data/processed_flights.parquet
flightDF = [FL_DATE: date, OP_CARRIER_AIRLINE_ID: int ... 135 more fields]


[FL_DATE: date, OP_CARRIER_AIRLINE_ID: int ... 135 more fields]

In [19]:
val weatherDFPath = s"${configuration.common.output.basePath}/common/data/processed_weather.parquet"
val weatherDF = spark.read.parquet(weatherDFPath)

println("Weather DF Count: ", weatherDF.count())

(Weather DF Count: ,11236398)


weatherDFPath = /home/jovyan/work/output/common/data/processed_weather.parquet
weatherDF = [WBAN: string, Date: date ... 92 more fields]


[WBAN: string, Date: date ... 92 more fields]

In [23]:
import org.apache.spark.sql.functions._

// ------------------------
// 0) Helpers métriques
// ------------------------
object Metrics {
  def withJob[T](id: String, desc: String)(body: => T): T = {
    val sc = spark.sparkContext
    sc.setJobGroup(id, desc, interruptOnCancel = false)
    val t0 = System.nanoTime()
    val res = body
    val dtMs = (System.nanoTime() - t0) / 1e6
    println(f"[METRIC][$id] $desc took ${dtMs}%.2f ms")
    sc.clearJobGroup()
    res
  }

  def withUiLabels[T](groupId: String, desc: String, tags: String = "")(body: => T): T = {
      val sc = spark.sparkContext
      sc.setJobGroup(groupId, desc, interruptOnCancel = true)
      sc.setLocalProperty("spark.job.description", desc)
      sc.setLocalProperty("spark.jobGroup.id", groupId)
      sc.setLocalProperty("spark.job.tags", tags)
      try body
      finally {
        sc.clearJobGroup()
        sc.setLocalProperty("spark.job.description", null)
        sc.setLocalProperty("spark.jobGroup.id", null)
        sc.setLocalProperty("spark.job.tags", null)
      }
    }    
}

  

defined object Metrics


## Nettoyage

### On garde les vols pour lesquels nous avons de la météo à l'aéroport de départ et d'arrivée 

In [21]:
import org.apache.spark.sql.functions._

flightDF.select(
  length(trim(col("ORIGIN_WBAN"))).as("len_origin"),
  length(trim(col("DEST_WBAN"))).as("len_dest")
)
.agg(
  min(col("len_origin")).as("min_len_origin"),
  max(col("len_origin")).as("max_len_origin"),
  min(col("len_dest")).as("min_len_dest"),
  max(col("len_dest")).as("max_len_dest")
)
.show(false)

+--------------+--------------+------------+------------+
|min_len_origin|max_len_origin|min_len_dest|max_len_dest|
+--------------+--------------+------------+------------+
|5             |5             |5           |5           |
+--------------+--------------+------------+------------+



In [22]:
// Spark 3.5.3 / Scala 2.12
import org.apache.spark.sql.functions._

val flightDF_filtered =
  withUiLabels(
    groupId = "Filter-Flights-From-NonExistingWeatherWBAN",
    desc    = "Remove Flights If ORIGIN_WBAN, DEST_WBAN does n ot exists in Weather",
    tags    = "prep,semi-join,wban"
  ) {
    
    println("-> Filter-Flights-From-NonExistingWeatherWBAN ...")
    /** 1) WBAN valides côté météo (distinct, non nuls, nettoyés) */
    val weatherStations = weatherDF
      .select(trim(col("WBAN")).as("WBAN"))
      .where(col("WBAN").isNotNull && length(col("WBAN")) > 0)
      .distinct()
      .repartition(200)        // optionnel: ajuste selon cluster
      .cache()
    
    /** 2) Prépare les colonnes WBAN côté vols (nettoyage basique) */
    val flightsWBAN = flightDF
      .withColumn("ORIGIN_WBAN", trim(col("ORIGIN_WBAN")))
      .withColumn("DEST_WBAN",   trim(col("DEST_WBAN")))
    
    /** 3) Comptage avant filtrage (optionnel) */
    val countBefore = flightsWBAN.count()
    
    /** 4) Garde uniquement les vols dont ORIGIN_WBAN existe dans la météo */
    val originStations = weatherStations
      .select(col("WBAN").as("ORIGIN_WBAN"))
    
    val flightsHasOrigin = flightsWBAN
      .join(originStations, Seq("ORIGIN_WBAN"), "left_semi")
    
    /** 5) Puis garde uniquement ceux dont DEST_WBAN existe aussi */
    val destStations = weatherStations
      .select(col("WBAN").as("DEST_WBAN"))
    
    val flightDF_filtered = flightsHasOrigin
      .join(destStations, Seq("DEST_WBAN"), "left_semi")
      .cache()
    
    /** 6) Comptage après filtrage et petit bilan */
    val countAfter  = flightDF_filtered.count()
    println(s"[WBAN filter] Flights before: $countBefore, after: $countAfter, removed: ${countBefore - countAfter}")

    flightDF_filtered
}    

-> Filter-Flights-From-NonExistingWeatherWBAN ...
[WBAN filter] Flights before: 17310299, after: 17288302, removed: 21997


flightDF_filtered = [DEST_WBAN: string, ORIGIN_WBAN: string ... 135 more fields]


[DEST_WBAN: string, ORIGIN_WBAN: string ... 135 more fields]

### On garde la météo qui est associé à un aéroport

In [8]:
// Spark 3.5.3 / Scala 2.12
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

/** 1) WBAN référencés par au moins un vol (origine ou destination) */
val flightWBANs: DataFrame =
  flightDF
    .select(trim(col("ORIGIN_WBAN")).as("WBAN"))
    .where(col("WBAN").isNotNull && length(col("WBAN")) > 0)
    .unionByName(
      flightDF
        .select(trim(col("DEST_WBAN")).as("WBAN"))
        .where(col("WBAN").isNotNull && length(col("WBAN")) > 0)
    )
    .distinct()
    .repartition(200) // optionnel selon cluster/volume
    .cache()

/** 2) Comptage avant (optionnel) */
val weatherBefore = weatherDF.count()

/** 3) Filtrer la météo: garder uniquement les WBAN utilisés par les vols */
val weatherDF_pruned =
  weatherDF
    .withColumn("WBAN", trim(col("WBAN")))
    .where(col("WBAN").isNotNull && length(col("WBAN")) > 0)
    .join(flightWBANs, Seq("WBAN"), "left_semi")
    .cache()

/** 4) Comptage après et bilan */
val weatherAfter = weatherDF_pruned.count()
println(s"[Weather prune] Weather rows before: $weatherBefore, after: $weatherAfter, removed: ${weatherBefore - weatherAfter}")

/** 5) (Optionnel) Sauvegarde */
val outPathWeather = s"${configuration.common.output.basePath}/common/data/weather_pruned_by_flights.parquet"
weatherDF_pruned
  .repartition(400) // optionnel
  .write.mode("overwrite")
  .parquet(outPathWeather)
println(s"Saved pruned weather to: $outPathWeather")

[Weather prune] Weather rows before: 11236398, after: 1560981, removed: 9675417
Saved pruned weather to: /home/jovyan/work/output/common/data/weather_pruned_by_flights.parquet


flightWBANs = [WBAN: string]
weatherBefore = 11236398
weatherDF_pruned = [WBAN: string, Date: date ... 92 more fields]
weatherAfter = 1560981
outPathWeather = /home/jovyan/work/output/common/data/weather_pruned_by_flights.parquet


/home/jovyan/work/output/common/data/weather_pruned_by_flights.parquet

## Restreindre les vols aux mois couverts par la météo

In [9]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val flightsCoveredMonths =
  flightDF_filtered
    .withColumn("month_utc", date_format(col("UTC_FL_DATE"), "yyyy-MM"))

val weatherMonths =
  weatherDF_pruned
    .withColumn("month_utc", date_format(col("Date"), "yyyy-MM"))
    .select("month_utc").distinct()

val flightDF_mCovered =
  flightsCoveredMonths.join(weatherMonths, Seq("month_utc"), "left_semi")

val kept = flightDF_mCovered.count()
val total = flightDF_filtered.count()
println(f"[Step1] Flights in covered months: $kept / $total (${kept.toDouble * 100.0 / total}%.2f%%)")

[Step1] Flights in covered months: 3908461 / 17288302 (22.61%)


flightsCoveredMonths = [DEST_WBAN: string, ORIGIN_WBAN: string ... 136 more fields]
weatherMonths = [month_utc: string]
flightDF_mCovered = [month_utc: string, DEST_WBAN: string ... 136 more fields]
kept = 3908461
total = 17288302


17288302

## Metrics

In [10]:
// ======================================================================
// Analyse post-filtrage Flight / Weather
// ======================================================================

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Relis les DF filtrés / prunés
val flightDF_filtered = spark.read.parquet("/home/jovyan/work/output/common/data/flight_filtered_has_weather.parquet")
val weatherDF_pruned  = spark.read.parquet("/home/jovyan/work/output/common/data/weather_pruned_by_flights.parquet")

def cleanWBAN(c: String) = trim(col(c))
def nn(c: String) = col(c).isNotNull && length(col(c)) > 0

// ----------------------------------------------------------------------
// 1) WBAN distincts après filtrage
// ----------------------------------------------------------------------
val nbFlightWBAN_after = flightDF_filtered
  .select(cleanWBAN("ORIGIN_WBAN").as("WBAN")).where(nn("WBAN"))
  .unionByName(flightDF_filtered.select(cleanWBAN("DEST_WBAN").as("WBAN")).where(nn("WBAN")))
  .distinct()
  .count()

val nbWeatherWBAN_after = weatherDF_pruned
  .select(cleanWBAN("WBAN").as("WBAN"))
  .where(nn("WBAN"))
  .distinct()
  .count()

println(s"[WBAN distincts] flights(after) = $nbFlightWBAN_after, weather(after) = $nbWeatherWBAN_after")

// ----------------------------------------------------------------------
// 2) Vols / jour (moyenne et quantiles)
// ----------------------------------------------------------------------
val flightsPerDay = flightDF_filtered
  .groupBy(col("UTC_FL_DATE").cast(DateType).as("day"))
  .count()

val avgFlightsPerDay = flightsPerDay.agg(avg(col("count"))).first().getDouble(0)
val quantilesF = flightsPerDay.select(col("count").cast(DoubleType))
  .stat.approxQuantile("count", Array(0.0, 0.5, 0.95, 1.0), 0.01)

println(f"[Vols/jour] avg = $avgFlightsPerDay%.2f | min = ${quantilesF(0)}%.0f | p50 = ${quantilesF(1)}%.0f | p95 = ${quantilesF(2)}%.0f | max = ${quantilesF(3)}%.0f")

// ----------------------------------------------------------------------
// 3) Observations météo par (WBAN, Date)
// ----------------------------------------------------------------------
val obsPerWBANPerDay = weatherDF_pruned
  .select(cleanWBAN("WBAN").as("WBAN"), col("Date").cast(DateType).as("WDATE"))
  .where(nn("WBAN") && col("WDATE").isNotNull)
  .groupBy(col("WBAN"), col("WDATE"))
  .count()

val avgObsPerWBANPerDay = obsPerWBANPerDay.agg(avg(col("count"))).first().getDouble(0)
val quantilesW = obsPerWBANPerDay.select(col("count").cast(DoubleType))
  .stat.approxQuantile("count", Array(0.0, 0.5, 0.95, 1.0), 0.01)

println(f"[Météo/WBAN/jour] avg = $avgObsPerWBANPerDay%.2f | min = ${quantilesW(0)}%.0f | p50 = ${quantilesW(1)}%.0f | p95 = ${quantilesW(2)}%.0f | max = ${quantilesW(3)}%.0f")

// ----------------------------------------------------------------------
// 4) Couverture par mois (vols avec météo dispo sur ce mois)
// ----------------------------------------------------------------------
val flightsByMonth = flightDF_filtered
  .withColumn("month_utc", date_format(col("UTC_FL_DATE"), "yyyy-MM"))
  .groupBy("month_utc")
  .count()
  .orderBy("month_utc")

val weatherMonths = weatherDF_pruned
  .withColumn("month_utc", date_format(col("Date"), "yyyy-MM"))
  .select("month_utc")
  .distinct()

val coverageByMonth = flightsByMonth
  .join(weatherMonths, Seq("month_utc"), "left_semi")
  .withColumnRenamed("count", "flights_in_covered_month")

val totalFlights   = flightDF_filtered.count()
val coveredFlights = coverageByMonth.agg(sum("flights_in_covered_month")).first().getLong(0)
val pctCovered     = coveredFlights.toDouble * 100.0 / totalFlights.toDouble

println(f"[Couverture mois] vols couverts = $coveredFlights / $totalFlights (${pctCovered}%.2f%%)")

// ======================================================================
// Fin du script
// ======================================================================

[WBAN distincts] flights(after) = 270, weather(after) = 270
[Vols/jour] avg = 15759.62 | min = 2122 | p50 = 15905 | p95 = 17634 | max = 18268
[Météo/WBAN/jour] avg = 23.86 | min = 1 | p50 = 24 | p95 = 24 | max = 24
[Couverture mois] vols couverts = 3908461 / 17288302 (22.61%)


flightDF_filtered = [DEST_WBAN: string, ORIGIN_WBAN: string ... 135 more fields]
weatherDF_pruned = [WBAN: string, Date: date ... 92 more fields]
nbFlightWBAN_after = 270
nbWeatherWBAN_after = 270
flightsPerDay = [day: date, count: bigint]
avgFlightsPerDay = 15759.618960802189
quantilesF = Array(2122.0, 15905.0, 17634.0, 18268.0)
obsPerWBANPerDay = [WBAN: string, WDATE: date ... 1 more field]
avgObsPerWBANPerDay = 23.859819940999344


cleanWBAN: (c: String)org.apache.spark.sql.Column
nn: (c: String)org.apache.spark.sql.Column
quantilesW: Array[Do...


23.859819940999344

## Jointure

In [13]:
val flightData = flightDF_mCovered
val weatherData = weatherDF_pruned

println("Flight for join count ->", flightData.count())
println("Weather for join count ->", weatherData.count())

(Flight for join count ->,3908461)
(Weather for join count ->,1560981)


flightData = [month_utc: string, DEST_WBAN: string ... 136 more fields]
weatherData = [WBAN: string, Date: date ... 92 more fields]


[WBAN: string, Date: date ... 92 more fields]

In [14]:
flightData.show(1,1000, true)

-RECORD 0---------------------------------------------------------------
 month_utc                          | 2013-06                           
 DEST_WBAN                          | 14820                             
 ORIGIN_WBAN                        | 14735                             
 FL_DATE                            | 2013-06-28                        
 OP_CARRIER_AIRLINE_ID              | 20366                             
 OP_CARRIER_FL_NUM                  | 4349                              
 ORIGIN_AIRPORT_ID                  | 10257                             
 DEST_AIRPORT_ID                    | 11042                             
 CRS_DEP_TIME                       | 605                               
 ARR_DELAY_NEW                      | 14.0                              
 CRS_ELAPSED_TIME                   | 92.0                              
 WEATHER_DELAY                      | NULL                              
 NAS_DELAY                          | NULL         

In [15]:
weatherData.show(1,1000, true)

-RECORD 0-------------------------------------------------
 WBAN                              | 13865                
 Date                              | 2012-01-10           
 Time                              | 0300                 
 StationType                       | 11                   
 SkyCondition                      | OVC004               
 SkyConditionFlag                  |                      
 Visibility                        | 1.75                 
 VisibilityFlag                    |                      
 WeatherType                       | +RA BR               
 WeatherTypeFlag                   |                      
 DryBulbFarenheit                  | 61.0                 
 DryBulbFarenheitFlag              |                      
 DryBulbCelsius                    | 16.0                 
 DryBulbCelsiusFlag                |                      
 WetBulbFarenheit                  | 61.0                 
 WetBulbFarenheitFlag              |                    

### RDD Join

In [13]:
// ================================================================
// Double jointure "improved repartition join" (version tolérante)
// + METRICS instrumentation
// Spark 3.5.3 / Scala 2.12
// ================================================================
import org.apache.spark.sql.{DataFrame, Row, SparkSession, Encoder, Encoders}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.{ArrayBuffer, HashMap}

// ------------------------
// 1) Sélection colonnes
// ------------------------
val flights = flightDF_mCovered.select(
  col("feature_flight_unique_id").as("flightId"),
  col("ORIGIN_WBAN"), col("DEST_WBAN"),
  col("UTC_FL_DATE").cast(DateType).as("DEP_DATE"),
  col("UTC_ARR_DATE").cast(DateType).as("ARR_DATE"),
  col("UTC_CRS_DEP_TIME").as("DEP_TIME_HHMM"),
  col("UTC_ARR_TIME").as("ARR_TIME_HHMM"),
  col("*")
)

val weather = weatherDF_pruned.select(
  trim(col("WBAN")).as("WBAN"),
  col("Date").cast(DateType).as("WDATE"),
  col("Time").as("WTIME_HHMM"),
  col("Visibility").as("vis"),
  col("WindSpeed").as("ws"),
  col("WindDirection").as("wd"),
  col("DryBulbCelsius").as("tempC"),
  col("SeaLevelPressure").as("slp"),
  col("HourlyPrecip").as("precip"),
  col("feature_weather_severity_index"),
  col("feature_flight_category_ordinal")
).where(col("WBAN").isNotNull && length(col("WBAN")) > 0 && col("WDATE").isNotNull)

// ------------------------
// 2) Utils / modèles
// ------------------------
def hhmmToHourInt(hhmm: String): Option[Int] = {
  if (hhmm == null) None
  else {
    val s = hhmm.replace(":", "")
    if (s.length >= 2 && s.forall(_.isDigit)) Some(s.substring(0,2).toInt)
    else None
  }
}

// hour devient relHour (peut être négatif pour la veille)
case class WeatherSlim(
  relHour: Int, // -24..23 (J=0..23 ; J+1=-24..-1)
  WBAN: String, WDATE: java.sql.Date, WTIME_HHMM: String,
  vis: java.lang.Double, ws: java.lang.Double, wd: java.lang.Double,
  tempC: java.lang.Double, slp: java.lang.Double, precip: java.lang.Double,
  feature_weather_severity_index: java.lang.Double,
  feature_flight_category_ordinal: java.lang.Integer
)

def plusOne(d: java.sql.Date): java.sql.Date =
  java.sql.Date.valueOf(d.toLocalDate.plusDays(1))

def pick12Prev(
  depHour: Int,                           // 0..23
  weatherByRelHour: Map[Int, WeatherSlim] // relHour -> obs
): Array[WeatherSlim] = {
  val buff = new Array[WeatherSlim](12)
  var i = 0
  while (i < 12) {
    val rel = depHour - (i + 1)           // 1h avant = depHour-1…
    buff(i) = weatherByRelHour.getOrElse(rel, null)
    i += 1
  }
  buff
}

// ------------------------
// 3) JOIN #1 — ORIGIN
// ------------------------
val cores = spark.sparkContext.defaultParallelism  // ≈ 12 avec local[12]

def pickParts(mult: Double, minAbs: Int, maxAbs: Int): Int =
  math.min(maxAbs, math.max(minAbs, math.round(cores * mult).toInt))

val numPartsOrigin = pickParts(mult = 3.3, minAbs = 32, maxAbs = 128) // ≈ 40
val numPartsDest   = pickParts(mult = 5.2, minAbs = 48, maxAbs = 192) // ≈ 62 → arrondis à 64

val partitionerOrigin = new HashPartitioner(numPartsOrigin)
val partitionerDest   = new HashPartitioner(numPartsDest)

spark.conf.set("spark.sql.shuffle.partitions", numPartsDest) // 64

// Map météo ORIGIN — double émission (J et J+1)
val weatherOriginPairs: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("map-origin-weather-build", "Build weatherOriginPairs RDD") {
    weather.rdd.flatMap { r =>
      val wban  = r.getAs[String]("WBAN")
      val wdate = r.getAs[java.sql.Date]("WDATE")
      val wtime = r.getAs[String]("WTIME_HHMM")
      val hour  = hhmmToHourInt(wtime).getOrElse(-1)

      val base = WeatherSlim(
        hour, wban, wdate, wtime,
        r.getAs[java.lang.Double]("vis"),
        r.getAs[java.lang.Double]("ws"),
        r.getAs[java.lang.Double]("wd"),
        r.getAs[java.lang.Double]("tempC"),
        r.getAs[java.lang.Double]("slp"),
        r.getAs[java.lang.Double]("precip"),
        r.getAs[java.lang.Double]("feature_weather_severity_index"),
        r.getAs[java.lang.Integer]("feature_flight_category_ordinal")
      )

      val sameDay = ((wban, wdate), ("OT", base))                                   // rel = hour
      val nextDay = ((wban, plusOne(wdate)), ("OT", base.copy(relHour = hour - 24))) // rel = hour-24
      Iterator(sameDay, nextDay)
    }
  }
Metrics.persistCount("map-origin-weather", "weatherOriginPairs materialize", weatherOriginPairs)

// Map vols ORIGIN
val flightsOriginPairs: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("map-origin-flights-build", "Build flightsOriginPairs RDD") {
    flights.rdd.map { r =>
      val keyWBAN = r.getAs[String]("ORIGIN_WBAN")
      val depDate = r.getAs[java.sql.Date]("DEP_DATE")
      ((keyWBAN, depDate), ("FT", r))
    }
  }
Metrics.persistCount("map-origin-flights", "flightsOriginPairs materialize", flightsOriginPairs)

// Union + partition
val originTagged: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("union-origin-build", "Build originTagged (union+partitionBy)") {
    weatherOriginPairs.union(flightsOriginPairs).partitionBy(partitionerOrigin)
  }
Metrics.persistCount("union-origin", "originTagged materialize", originTagged)

// Reduce ORIGIN
val originJoined: RDD[Row] =
  Metrics.withJob("reduce-origin-build", "Build originJoined (combineByKey + enrich)") {
    originTagged.combineByKey(
      (v: (String, Any)) => {
        val ws = new ArrayBuffer[WeatherSlim]()
        val fs = new ArrayBuffer[Row]()
        if (v._1 == "OT") ws += v._2.asInstanceOf[WeatherSlim] else fs += v._2.asInstanceOf[Row]
        (ws, fs)
      },
      (comb: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row]), v: (String, Any)) => {
        if (v._1 == "OT") comb._1 += v._2.asInstanceOf[WeatherSlim]
        else               comb._2 += v._2.asInstanceOf[Row]
        comb
      },
      (c1: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row]), c2: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row])) => {
        c1._1 ++= c2._1; c1._2 ++= c2._2; c1
      }
    ).values.flatMap { case (weatherBuf, flightBuf) =>
      val weatherByRel = weatherBuf
        .filter(w => w.relHour >= -24 && w.relHour <= 23)
        .map(w => w.relHour -> w).toMap

      flightBuf.iterator.map { fr =>
        val depHH = hhmmToHourInt(fr.getAs[String]("DEP_TIME_HHMM")).getOrElse(0)
        val WoArr = pick12Prev(depHH, weatherByRel)
        val woStructs: Array[Any] = WoArr.map { w =>
          if (w == null) null
          else Row(
            w.relHour: java.lang.Integer,  // champ "hour" = relHour
            w.WBAN, w.WDATE, w.WTIME_HHMM, w.vis, w.ws, w.wd, w.tempC, w.slp, w.precip,
            w.feature_weather_severity_index, w.feature_flight_category_ordinal
          )
        }
        Row.fromSeq(fr.toSeq ++ woStructs)
      }
    }
  }
Metrics.persistCount("reduce-origin", "originJoined materialize", originJoined)

// Schéma ORIGIN
val woStructType = StructType(Seq(
  StructField("hour", IntegerType, true),      // contient relHour (peut être négatif)
  StructField("WBAN", StringType, true),
  StructField("WDATE", DateType, true),
  StructField("WTIME_HHMM", StringType, true),
  StructField("vis", DoubleType, true),
  StructField("ws", DoubleType, true),
  StructField("wd", DoubleType, true),
  StructField("tempC", DoubleType, true),
  StructField("slp", DoubleType, true),
  StructField("precip", DoubleType, true),
  StructField("feature_weather_severity_index", DoubleType, true),
  StructField("feature_flight_category_ordinal", IntegerType, true)
))
val originSchema =
  StructType(flights.schema.fields ++ (1 to 12).map(i => StructField(s"Wo_h$i", woStructType, true)))

val originDF: DataFrame =
  Metrics.withJob("df-origin-build", "createDataFrame(originJoined)") {
    spark.createDataFrame(originJoined, originSchema)
  }
Metrics.persistCountDF("df-origin", "originDF materialize+persist", originDF, StorageLevel.MEMORY_AND_DISK)

// ------------------------
// 4) JOIN #2 — DESTINATION
// ------------------------
val flights2 = originDF.selectExpr("*")

// Map météo DEST — double émission (J et J+1)
val weatherDestPairs: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("map-dest-weather-build", "Build weatherDestPairs RDD") {
    weather.rdd.flatMap { r =>
      val wban  = r.getAs[String]("WBAN")
      val wdate = r.getAs[java.sql.Date]("WDATE")
      val wtime = r.getAs[String]("WTIME_HHMM")
      val hour  = hhmmToHourInt(wtime).getOrElse(-1)

      val base = WeatherSlim(
        hour, wban, wdate, wtime,
        r.getAs[java.lang.Double]("vis"),
        r.getAs[java.lang.Double]("ws"),
        r.getAs[java.lang.Double]("wd"),
        r.getAs[java.lang.Double]("tempC"),
        r.getAs[java.lang.Double]("slp"),
        r.getAs[java.lang.Double]("precip"),
        r.getAs[java.lang.Double]("feature_weather_severity_index"),
        r.getAs[java.lang.Integer]("feature_flight_category_ordinal")
      )

      val sameDay = ((wban, wdate), ("OT", base))
      val nextDay = ((wban, plusOne(wdate)), ("OT", base.copy(relHour = hour - 24)))
      Iterator(sameDay, nextDay)
    }
  }
Metrics.persistCount("map-dest-weather", "weatherDestPairs materialize", weatherDestPairs)

// Map vols DEST
val flightsDestPairs: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("map-dest-flights-build", "Build flightsDestPairs RDD") {
    flights2.rdd.map { r =>
      val keyWBAN = r.getAs[String]("DEST_WBAN")
      val arrDate = r.getAs[java.sql.Date]("ARR_DATE")
      ((keyWBAN, arrDate), ("FT", r))
    }
  }
Metrics.persistCount("map-dest-flights", "flightsDestPairs materialize", flightsDestPairs)

// Union + partition DEST
val destTagged: RDD[((String, java.sql.Date), (String, Any))] =
  Metrics.withJob("union-dest-build", "Build destTagged (union+partitionBy)") {
    weatherDestPairs.union(flightsDestPairs).partitionBy(partitionerDest)
  }
Metrics.persistCount("union-dest", "destTagged materialize", destTagged)

// Reduce DEST
val destJoined: RDD[Row] =
  Metrics.withJob("reduce-dest-build", "Build destJoined (combineByKey + enrich)") {
    destTagged.combineByKey(
      (v: (String, Any)) => {
        val ws = new ArrayBuffer[WeatherSlim]()
        val fs = new ArrayBuffer[Row]()
        if (v._1 == "OT") ws += v._2.asInstanceOf[WeatherSlim] else fs += v._2.asInstanceOf[Row]
        (ws, fs)
      },
      (comb: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row]), v: (String, Any)) => {
        if (v._1 == "OT") comb._1 += v._2.asInstanceOf[WeatherSlim]
        else               comb._2 += v._2.asInstanceOf[Row]
        comb
      },
      (c1: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row]), c2: (ArrayBuffer[WeatherSlim], ArrayBuffer[Row])) => {
        c1._1 ++= c2._1; c1._2 ++= c2._2; c1
      }
    ).values.flatMap { case (weatherBuf, flightBuf) =>
      val weatherByRel = weatherBuf
        .filter(w => w.relHour >= -24 && w.relHour <= 23)
        .map(w => w.relHour -> w).toMap

      flightBuf.iterator.map { fr =>
        val arrHH = hhmmToHourInt(fr.getAs[String]("ARR_TIME_HHMM")).getOrElse(0)
        val WdArr = pick12Prev(arrHH, weatherByRel)
        val wdStructs: Array[Any] = WdArr.map { w =>
          if (w == null) null
          else Row(
            w.relHour: java.lang.Integer,
            w.WBAN, w.WDATE, w.WTIME_HHMM, w.vis, w.ws, w.wd, w.tempC, w.slp, w.precip,
            w.feature_weather_severity_index, w.feature_flight_category_ordinal
          )
        }
        Row.fromSeq(fr.toSeq ++ wdStructs)
      }
    }
  }
Metrics.persistCount("reduce-dest", "destJoined materialize", destJoined)

// Schéma final
val finalSchema =
  StructType(originDF.schema.fields ++ (1 to 12).map(i => StructField(s"Wd_h$i", woStructType, true)))

val joinedDF: DataFrame =
  Metrics.withJob("df-final-build", "createDataFrame(destJoined)") {
    spark.createDataFrame(destJoined, finalSchema)
  }
Metrics.persistCountDF("df-final", "joinedDF materialize+persist", joinedDF, StorageLevel.MEMORY_AND_DISK)

// Récap global
println(s"[RESULT] Origin-joined rows: ${originDF.count()}, Final joined rows: ${joinedDF.count()}")

// Optionnel: unpersist pour libérer la RAM (laisse joinedDF persister si réutilisé)
weatherOriginPairs.unpersist(false)
flightsOriginPairs.unpersist(false)
originTagged.unpersist(false)
originJoined.unpersist(false)
originDF.unpersist(false)

weatherDestPairs.unpersist(false)
flightsDestPairs.unpersist(false)
destTagged.unpersist(false)
destJoined.unpersist(false)
// joinedDF: garde persist si tu l'utilises après

[METRIC][map-origin-weather-build] Build weatherOriginPairs RDD took 27.56 ms
[METRIC][map-origin-weather] weatherOriginPairs materialize took 1063.90 ms
[METRIC][map-origin-weather] records=3121962 partitions=14
[METRIC][map-origin-flights-build] Build flightsOriginPairs RDD took 415.45 ms
[METRIC][map-origin-flights] flightsOriginPairs materialize took 20304.78 ms
[METRIC][map-origin-flights] records=3908461 partitions=29
[METRIC][union-origin-build] Build originTagged (union+partitionBy) took 0.45 ms
[METRIC][union-origin] originTagged materialize took 44959.30 ms
[METRIC][union-origin] records=7030423 partitions=46
[METRIC][reduce-origin-build] Build originJoined (combineByKey + enrich) took 8.57 ms
[METRIC][reduce-origin] originJoined materialize took 270158.61 ms
[METRIC][reduce-origin] records=3908461 partitions=46
[METRIC][df-origin-build] createDataFrame(originJoined) took 11.29 ms
[METRIC][df-origin] originDF materialize+persist took 300277.36 ms
[METRIC][df-origin] records=3

defined object Metrics
flights = [flightId: string, ORIGIN_WBAN: string ... 143 more fields]
weather = [WBAN: string, WDATE: date ... 9 more fields]
defined class WeatherSlim
cores = 14


hhmmToHourInt: (hhmm: String)Option[Int]
plusOne: (d: java.sql.Date)java.sql.Date
pick12Prev: (depHour: Int, weatherByRelHour: Map[Int,WeatherSlim])Array[WeatherSlim]
pickParts: (mult: Double,...


14

In [16]:
joinedDF.printSchema

root
 |-- flightId: string (nullable = true)
 |-- ORIGIN_WBAN: string (nullable = true)
 |-- DEST_WBAN: string (nullable = true)
 |-- DEP_DATE: date (nullable = true)
 |-- ARR_DATE: date (nullable = true)
 |-- DEP_TIME_HHMM: string (nullable = true)
 |-- ARR_TIME_HHMM: string (nullable = true)
 |-- month_utc: string (nullable = true)
 |-- DEST_WBAN: string (nullable = true)
 |-- ORIGIN_WBAN: string (nullable = true)
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: integer (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- ARR_DELAY_NEW: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- WEATHER_DELAY: double (nullable = true)
 |-- NAS_DELAY: double (nullable = true)
 |-- D4: integer (nullable = true)
 |-- D3: integer (nullable = true)
 |-- D1: integer (nullable = true)
 |-- D2_15: 

In [17]:
joinedDF.show(1, 1000, true)

-RECORD 0------------------------------------------------------------------------------------------------------------------------
 flightId                           | 2013-06-29_20304_5407_13930_14952                                                          
 ORIGIN_WBAN                        | 94846                                                                                      
 DEST_WBAN                          | 93822                                                                                      
 DEP_DATE                           | 2013-06-29                                                                                 
 ARR_DATE                           | 2013-06-29                                                                                 
 DEP_TIME_HHMM                      | 0326                                                                                       
 ARR_TIME_HHMM                      | 0435                                                

### DataFrame Join

In [11]:
// =====================================================================
// Flight × Weather en DataFrame (Map → Hash partition → Reduce)
// - Fenêtre 12h avant départ (Wo_*) et 12h avant arrivée (Wd_*)
// - Gestion veille via relHour (duplication J et J+1)
// - Un seul job global avec Metrics.withJob
// =====================================================================
import org.apache.spark.sql.{DataFrame, Row, Column}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// ------------------------
// 0) Objet Metrics (inchangé)
// ------------------------
object Metrics {
  def withJob[T](id: String, desc: String)(body: => T): T = {
    val sc = spark.sparkContext
    sc.setJobGroup(id, desc, interruptOnCancel = false)
    val t0 = System.nanoTime()
    val res = body
    val dtMs = (System.nanoTime() - t0) / 1e6
    println(f"[METRIC][$id] $desc took ${dtMs}%.2f ms")
    sc.clearJobGroup()
    res
  }
  import org.apache.spark.storage.StorageLevel
  def persistCount(id: String, desc: String, rdd: org.apache.spark.rdd.RDD[_], lvl: StorageLevel = StorageLevel.MEMORY_ONLY): Long = {
    rdd.persist(lvl); val n = withJob(id, desc) { rdd.count() }
    println(s"[METRIC][$id] records=$n partitions=${rdd.getNumPartitions}"); n
  }
  def persistCountDF(id: String, desc: String, df: DataFrame, lvl: StorageLevel = StorageLevel.MEMORY_ONLY): Long = {
    df.persist(lvl); val n = withJob(id, desc) { df.count() }
    println(s"[METRIC][$id] records=$n partitions=${df.rdd.getNumPartitions}"); n
  }
}

// =====================================================================
// 1) Jointure complète exécutée sous un SEUL job
// =====================================================================


// ------------------------
// Paramètres de partitions "reducers"
// ------------------------

Metrics.withJob("flight-joins-df", "Build flights joins DataFrame") {
    val cores = spark.sparkContext.defaultParallelism
    def pickParts(mult: Double, minAbs: Int, maxAbs: Int): Int =
    math.min(maxAbs, math.max(minAbs, math.round(cores * mult).toInt))
    
    val numPartsOrigin = pickParts(3.3, 32, 128) // ≈ 40
    val numPartsDest   = pickParts(5.2, 48, 192) // ≈ 64
    spark.conf.set("spark.sql.shuffle.partitions", numPartsDest) // borne haute DF
    
    // ------------------------
    // Sélection des colonnes utiles (Map)
    // ------------------------
    //flightDF_filtered
    val flights = flightDF_mCovered.select(
    col("feature_flight_unique_id").as("flightId"),
    col("ORIGIN_WBAN"), col("DEST_WBAN"),
    col("UTC_FL_DATE").cast(DateType).as("DEP_DATE"),
    col("UTC_ARR_DATE").cast(DateType).as("ARR_DATE"),
    col("UTC_CRS_DEP_TIME").as("DEP_TIME_HHMM"),
    col("UTC_ARR_TIME").as("ARR_TIME_HHMM"),
    col("*")
    )
    
    val weather = weatherDF_pruned.select(
    trim(col("WBAN")).as("WBAN"),
    col("Date").cast(DateType).as("WDATE"),
    col("Time").as("WTIME_HHMM"),
    col("Visibility").as("vis"),
    col("WindSpeed").as("ws"),
    col("WindDirection").as("wd"),
    col("DryBulbCelsius").as("tempC"),
    col("SeaLevelPressure").as("slp"),
    col("HourlyPrecip").as("precip"),
    col("feature_weather_severity_index"),
    col("feature_flight_category_ordinal")
    ).where(col("WBAN").isNotNull && length(col("WBAN")) > 0 && col("WDATE").isNotNull)
    
    // ------------------------
    // Utilitaire HHMM -> hour [0..23] (sans UDF) (Map)
    // ------------------------
    def hhmmHourCol(c: Column): Column = {
    val s  = regexp_replace(c.cast("string"), ":", "")
    val p4 = lpad(s, 4, "0")
    (substring(p4, 1, 2).cast("int") % 24)
    }
    
    // ------------------------
    // Préparation météo avec relHour + duplication J/J+1 (Map)
    // ------------------------
    val weatherWithHour = weather
    .withColumn("hour", hhmmHourCol(col("WTIME_HHMM")))
    .na.fill(Map("hour" -> -1))
    
    val meteoSameDay = weatherWithHour
    .withColumn("relHour", col("hour"))
    .withColumn("DATE", col("WDATE"))
    
    val meteoNextDay = weatherWithHour
    .withColumn("relHour", col("hour") - lit(24))
    .withColumn("DATE", date_add(col("WDATE"), 1))
    
    val weatherRel = meteoSameDay.unionByName(meteoNextDay)
    .filter(col("relHour").between(-24, 23))
    
    // ------------------------
    // Reduce météo par clé → Map relHour -> struct (Reduce)
    // ------------------------
    val weatherStruct =
    struct(
      col("relHour").as("hour"),
      col("WBAN"), col("WDATE"), col("WTIME_HHMM"),
      col("vis"), col("ws"), col("wd"),
      col("tempC"), col("slp"), col("precip"),
      col("feature_weather_severity_index"),
      col("feature_flight_category_ordinal")
    )
    
    val weatherByKey: DataFrame =
    weatherRel
      .groupBy(col("WBAN"), col("DATE"))
      .agg(
        map_from_entries(
          collect_list(struct(col("relHour"), weatherStruct))
        ).as("wmap")
      )
    // (WBAN, DATE, wmap: map<int, struct{hour,WBAN,WDATE,WTIME_HHMM,...}>)
    
    // ------------------------
    // JOIN #1 — ORIGIN (Partition = hash(ORIGIN_WBAN, DEP_DATE))
    // ------------------------
    val flightsDep = flights
    .withColumn("depHour", coalesce(hhmmHourCol(col("DEP_TIME_HHMM")), lit(0)))
    
    val originPre = flightsDep
    .repartition(numPartsOrigin, col("ORIGIN_WBAN"), col("DEP_DATE")) // <-- Hash partition explicite
    .join(
      weatherByKey.hint("shuffle_hash"),
      col("ORIGIN_WBAN") === weatherByKey("WBAN") &&
      col("DEP_DATE")    === weatherByKey("DATE"),
      "left"
    )
    .drop(weatherByKey("WBAN")).drop(weatherByKey("DATE"))
    
    val originWithWoArr = originPre
    .withColumn("Wo", expr("transform(sequence(1, 12), i -> element_at(wmap, depHour - i))"))
    .drop("wmap")
    
    val woCols = (0 until 12).map(i => col("Wo").getItem(i).as(s"Wo_h${i+1}"))
    val originDF = originWithWoArr
    .select(col("*") +: woCols: _*)
    .drop("Wo")
    .persist()
    
    // ------------------------
    // JOIN #2 — DEST (Partition = hash(DEST_WBAN, ARR_DATE))
    // ------------------------
    val flightsArr = originDF
    .withColumn("arrHour", coalesce(hhmmHourCol(col("ARR_TIME_HHMM")), lit(0)))
    
    val destPre = flightsArr
    .repartition(numPartsDest, col("DEST_WBAN"), col("ARR_DATE"))     // <-- Hash partition explicite
    .join(
      weatherByKey.hint("shuffle_hash"),
      col("DEST_WBAN") === weatherByKey("WBAN") &&
      col("ARR_DATE")  === weatherByKey("DATE"),
      "left"
    )
    .drop(weatherByKey("WBAN")).drop(weatherByKey("DATE"))
    
    val destWithWdArr = destPre
    .withColumn("Wd", expr("transform(sequence(1, 12), i -> element_at(wmap, arrHour - i))"))
    .drop("wmap")
    
    val wdCols = (0 until 12).map(i => col("Wd").getItem(i).as(s"Wd_h${i+1}"))
    val joinedDF = destWithWdArr
    .select(col("*") +: wdCols: _*)
    .drop("Wd")
    .persist()
    
    // ------------------------
    // Action finale (déclenche l'exécution)
    // ------------------------
    println(s"Rows after ORIGIN join: ${originDF.count()}, rows after DEST join: ${joinedDF.count()}")
    
}

defined object Metrics


Rows after ORIGIN join: 3908461, rows after DEST join: 3908461
[METRIC][flight-joins-df] Build flights joins DataFrame took 7763.13 ms


In [13]:
joinedDF.show(1, 1000, true)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------
 flightId                           | 2013-06-18_19393_3747_12191_12892                                                           
 ORIGIN_WBAN                        | 12918                                                                                       
 DEST_WBAN                          | 23174                                                                                       
 DEP_DATE                           | 2013-06-18                                                                                  
 ARR_DATE                           | 2013-06-19                                                                                  
 DEP_TIME_HHMM                      | 2110                                                                                        
 ARR_TIME_HHMM                      | 0036                                         