# ✈️ FlightRadar24 - ETL & Airflow Pipeline

## Objectif

Ce projet a pour but de construire un pipeline **ETL industrialisé**, tolérant aux erreurs et observable, qui récupère les données de vol en temps réel depuis l’API FlightRadar24 toutes les **2 heures**, les nettoie, les transforme, puis les stocke sous **format Parquet**.  
Ces données sont ensuite analysées via **PySpark** pour générer des **indicateurs métier** sur le trafic aérien mondial.

### Installation des dépendances pour le projet

In [1]:
!pip install --upgrade FlightRadarAPI pandas pyarrow



In [2]:
!pip install findspark



##### Lister toutes les méthodes publiques de l'instance

In [3]:
from FlightRadar24 import FlightRadar24API
import inspect, textwrap, pprint

fr_api = FlightRadar24API()

In [4]:
api_methods = sorted(
    m for m in dir(fr_api)
    if not m.startswith("_") and inspect.ismethod(getattr(fr_api, m))
)

pprint.pprint(textwrap.wrap(", ".join(api_methods), width=100))

['get_airline_logo, get_airlines, get_airport, get_airport_details, '
 'get_airport_disruptions,',
 'get_airports, get_bookmarks, get_bounds, get_bounds_by_point, '
 'get_country_flag, get_flight_details,',
 'get_flight_tracker_config, get_flights, get_history_data, get_login_data, '
 'get_most_tracked,',
 'get_volcanic_eruptions, get_zones, is_logged_in, login, logout, search, '
 'set_flight_tracker_config']


## Extarct data from flight radar api

In [5]:
####################
#                  #
# Authored BY : me #
#                  #
####################

# Extract data from flightRadar api sous format dataframe

import pandas as pd
import logging
from math import radians, sin, cos, sqrt, atan2
from FlightRadar24 import FlightRadar24API

# Setup du logger
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


def haversine(lat1, lon1, lat2, lon2):
    if None in (lat1, lon1, lat2, lon2):
        return None
    R = 6371
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
    return R * 2 * atan2(sqrt(a), sqrt(1 - a))


def get_continent(lat, lon, zones_dict):
    for continent, zone in zones_dict.items():
        if all(k in zone for k in ['tl_y', 'tl_x', 'br_y', 'br_x']):
            if zone['br_y'] <= lat <= zone['tl_y'] and zone['tl_x'] <= lon <= zone['br_x']:
                return continent
    return None


def extract_flights():
    api = FlightRadar24API()

    logger.info(" Récupération des vols en cours...")
    flights = api.get_flights()
    logger.info(f" {len(flights)} vols récupérés.")

    logger.info(" Récupération des IATA pour enrichissement...")
    iata_codes = set()
    for f in flights:
        if f.origin_airport_iata:
            iata_codes.add(f.origin_airport_iata)
        if f.destination_airport_iata:
            iata_codes.add(f.destination_airport_iata)

    logger.info(f"🔍 {len(iata_codes)} codes IATA collectés. Enrichissement des aéroports...")
    airport_info = {}
    for code in list(iata_codes):
        try:
            a = api.get_airport(code)
            airport_info[code] = {
                "iata": code,
                "latitude": a.latitude,
                "longitude": a.longitude,
                "name": a.name,
                "country": a.country,
            }
        except Exception:
            continue

    logger.info(f" {len(airport_info)} aéroports enrichis.")

    logger.info(" Chargement des zones géographiques...")
    zones = api.get_zones()

    logger.info(" Construction du DataFrame enrichi...")
    enriched_flights = []

    for f in flights:
        origin = airport_info.get(f.origin_airport_iata)
        dest = airport_info.get(f.destination_airport_iata)

        if not origin or not dest:
            continue

        origin_continent = get_continent(origin["latitude"], origin["longitude"], zones)
        dest_continent = get_continent(dest["latitude"], dest["longitude"], zones)

        distance_km = haversine(
            origin["latitude"], origin["longitude"],
            dest["latitude"], dest["longitude"]
        )

        enriched_flights.append({
            "flight_id": f.id,
            "callsign": f.callsign,
            "airline_icao": f.airline_icao,
            "origin_iata": f.origin_airport_iata,
            "dest_iata": f.destination_airport_iata,
            "origin_country": origin["country"],
            "dest_country": dest["country"],
            "origin_continent": origin_continent,
            "dest_continent": dest_continent,
            "distance_km": distance_km,
            "aircraft_code": f.aircraft_code,
            "registration": f.registration,
        })

    df = pd.DataFrame(enriched_flights)
    logger.info(f" DataFrame final contenant {len(df)} lignes.")

    return df


In [6]:
df = extract_flights()
df.head()

Unnamed: 0,flight_id,callsign,airline_icao,origin_iata,dest_iata,origin_country,dest_country,origin_continent,dest_continent,distance_km,aircraft_code,registration
0,3b48565c,ARG1141,ARG,FCO,EZE,Italy,Argentina,europe,southamerica,11155.832877,A332,LV-FVH
1,3b48b4e9,DAL201,DAL,JNB,ATL,South Africa,United States,africa,northamerica,13582.574536,A359,N523DN
2,3b48c263,AIC186,AIC,YVR,DEL,Canada,India,northamerica,asia,11137.280992,B77W,VT-ALO
3,3b48e840,AIC119,AIC,BOM,JFK,India,United States,asia,northamerica,12532.259844,B77W,VT-AEO
4,3b48f27a,AIC105,AIC,DEL,EWR,India,United States,asia,northamerica,11763.73079,A359,VT-JRE


|   | Indicateur                                                  | Colonnes utilisées                                                                                              | Source (fonction de l’API)                                                                                            |
| --: | ----------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| 1️⃣ | **Compagnie avec le plus de vols en cours**                 | `airline_icao`                                                                                                  | `get_flights()`                                                                                                       |
| 2️⃣ | **Par continent, compagnie avec le plus de vols régionaux** | `airline_icao`, `origin_airport_iata`, `destination_airport_iata`, `origin_continent`, `destination_continent`  | `get_flights()` + `get_airport()` (pour lat/lon) + `get_zones()` (pour continents)                                    |
| 3️⃣ | **Vol avec le trajet le plus long**                         | `origin_airport_iata`, `destination_airport_iata`, `distance_km` (calculée à partir de `latitude`, `longitude`) | `get_flights()` + `get_airport()` (pour coordonnées)                                                                  |
| 4️⃣ | **Distance moyenne par continent**                          | `origin_continent`, `distance_km`                                                                               | `get_flights()` + `get_airport()` + `get_zones()`                                                                     |
| 5️⃣ | **Constructeur avion avec le plus de vols actifs**          | `registration` → enrichissement via `aircraft.manufacturer` ou `equip`                                          | 🔴 Non exploitable : `get_flight_details()` ne fournit pas `manufacturer`, `search()` renvoie `equip` mais pas fiable |
| 6️⃣ | **Top 3 modèles d’avion par pays de la compagnie**          | `airline_icao`, `aircraft_code`, `registration` + `country` (via `get_airlines()`)                              | 🟡 Partiel : `get_flights()` (codes) + `get_airlines()` (pays) ; pas de "vrai modèle" sans mapping externe            |



--> let's do some data exploration to understand our data, so that we can do the data cleaning

### EDA( exploration data analysis )

In [7]:
print("Columns \n",df.columns)

Columns 
 Index(['flight_id', 'callsign', 'airline_icao', 'origin_iata', 'dest_iata',
       'origin_country', 'dest_country', 'origin_continent', 'dest_continent',
       'distance_km', 'aircraft_code', 'registration'],
      dtype='object')


In [8]:
print("types \n", df.dtypes)

types 
 flight_id            object
callsign             object
airline_icao         object
origin_iata          object
dest_iata            object
origin_country       object
dest_country         object
origin_continent     object
dest_continent       object
distance_km         float64
aircraft_code        object
registration         object
dtype: object


In [9]:
# shape
df.shape

(1299, 12)

In [10]:
# Valeurs uniques
print("Valeurs unique \n",df.nunique().sort_values(ascending=False))

Valeurs unique 
 flight_id           1299
callsign            1299
registration        1299
distance_km          994
dest_iata            253
origin_iata          244
airline_icao         182
dest_country         105
origin_country        87
aircraft_code         52
dest_continent         8
origin_continent       7
dtype: int64


In [11]:
# check for Duplicates for id

print("duplicates \n",df.duplicated(subset=['flight_id']).sum())

duplicates 
 0


In [12]:
# Statistiques descriptives

print("Statistiques descriptives \n", df.describe(include='all'))

Statistiques descriptives 
        flight_id callsign airline_icao origin_iata dest_iata origin_country  \
count       1299     1299         1299        1299      1299           1299   
unique      1299     1299          182         244       253             87   
top     3b4a8903   AVA229          QTR         DOH       LHR  United States   
freq           1        1           99          77        77            373   
mean         NaN      NaN          NaN         NaN       NaN            NaN   
std          NaN      NaN          NaN         NaN       NaN            NaN   
min          NaN      NaN          NaN         NaN       NaN            NaN   
25%          NaN      NaN          NaN         NaN       NaN            NaN   
50%          NaN      NaN          NaN         NaN       NaN            NaN   
75%          NaN      NaN          NaN         NaN       NaN            NaN   
max          NaN      NaN          NaN         NaN       NaN            NaN   

         dest_country o

--> IL faut commentez

In [13]:
## Répartition des compagnies

In [14]:
print("Répartition des compagnies   \n", df["airline_icao"].value_counts().head(10))

Répartition des compagnies   
 airline_icao
QTR    99
UAE    82
UAL    65
AAL    62
DAL    44
THY    40
BAW    26
CES    26
KAL    25
AFR    25
Name: count, dtype: int64


In [15]:
# Répartition des origines :

In [16]:
df["dest_country"].value_counts().head(10)

print("Répartition des origines   \n", df["origin_country"].value_counts().head(10))

Répartition des origines   
 origin_country
United States           373
China                    93
United Arab Emirates     93
Qatar                    78
Australia                57
Japan                    50
South Korea              45
Brazil                   33
Hong Kong                24
Russia                   24
Name: count, dtype: int64


In [17]:
# Répartition des destinations :

print("Répartition des destinations   \n", df["dest_iata"].value_counts().head(10))

Répartition des destinations   
 dest_iata
LHR    77
JFK    45
LAX    39
CDG    38
FRA    34
DOH    29
ICN    28
SVO    27
IST    26
MAD    26
Name: count, dtype: int64


--> To sum Up about our Data :

    # callsign, airline_iata, airline_icao : pour identifier les compagnies

    # aircraft_code, registration, squawk : pour identifier les avions

    # latitude, longitude, altitude : position

    # origin, destination : codes IATA

    # ground_speed, heading, vertical_speed, on_ground : pour l'état du vol

    # time : timestamp (en secondes POSIX, à convertir si besoin)

## Transform Data

### Data Cleaning

In [18]:
# Valeurs manquantes
missing_values = (df.isna().sum() + (df == "").sum()).sort_values(ascending=False)
print("Valeurs manquantes (en nombre) \n", missing_values)

Valeurs manquantes (en nombre) 
 airline_icao        24
origin_continent     3
dest_continent       2
callsign             1
registration         1
flight_id            0
origin_iata          0
dest_iata            0
dest_country         0
origin_country       0
distance_km          0
aircraft_code        0
dtype: int64


In [19]:
# Pourcentage de valeurs manquantes, correctement affiché en %
missing_percent = ((df.isna().sum() + (df == "").sum()) / len(df) * 100).sort_values(ascending=False)
print("Valeurs manquantes en % :\n", missing_percent.round(2))

Valeurs manquantes en % :
 airline_icao        1.85
origin_continent    0.23
dest_continent      0.15
callsign            0.08
registration        0.08
flight_id           0.00
origin_iata         0.00
dest_iata           0.00
dest_country        0.00
origin_country      0.00
distance_km         0.00
aircraft_code       0.00
dtype: float64


In [20]:
import pandas as pd
import logging

logger = logging.getLogger(__name__)

def clean_flights_data(df: pd.DataFrame) -> pd.DataFrame:
    logger.info("Nettoyage des données de vol...")

    # Étape 1 : Supprimer les colonnes avec + de 50% de valeurs manquantes
    missing_ratio = (df.isna().sum() + (df == "").sum()) / len(df)
    cols_to_drop = missing_ratio[missing_ratio > 0.5].index.tolist()
    if cols_to_drop:
        logger.info(f"Colonnes supprimées (plus de 50% de valeurs manquantes) : {cols_to_drop}")
        df = df.drop(columns=cols_to_drop)

    # Étape 2 : Supprimer les lignes avec au moins une valeur manquante ou vide
    initial_row_count = len(df)
    df = df[~(df.isna() | (df == "")).any(axis=1)]
    removed_rows = initial_row_count - len(df)
    logger.info(f"Lignes supprimées pour valeurs manquantes : {removed_rows}")

    logger.info("Nettoyage terminé.")
    return df


In [21]:
df_cleaned = clean_flights_data(df)

In [22]:
# Pourcentage de valeurs manquantes, correctement affiché en %
missing_percent = ((df_cleaned.isna().sum() + (df_cleaned == "").sum()) / len(df_cleaned) * 100).sort_values(ascending=False)
print("Valeurs manquantes en % après le cleaning :\n", missing_percent.round(2))

Valeurs manquantes en % après le cleaning :
 flight_id           0.0
callsign            0.0
airline_icao        0.0
origin_iata         0.0
dest_iata           0.0
origin_country      0.0
dest_country        0.0
origin_continent    0.0
dest_continent      0.0
distance_km         0.0
aircraft_code       0.0
registration        0.0
dtype: float64


--> No missing data

## Load Data into CSV format ( Or parquet )

In [23]:
import os
from datetime import datetime, timezone
import pandas as pd

def save_to_csv(df: pd.DataFrame, base_path="Flights/rawzone") -> str:
    """
    Sauvegarde le DataFrame au format CSV avec une nomenclature horodatée.
    Exemple de chemin : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-13/flights_20250713124500.csv
    """
    now = datetime.now(timezone.utc)

    # Création du chemin horodaté
    path = os.path.join(
        base_path,
        f"tech_year={now.year}",
        f"tech_month={now.strftime('%Y-%m')}",
        f"tech_day={now.strftime('%Y-%m-%d')}"
    )
    os.makedirs(path, exist_ok=True)

    # Nom du fichier CSV
    filename = f"flights_{now.strftime('%Y%m%d%H%M%S')}.csv"
    full_path = os.path.join(path, filename)

    # Sauvegarde en CSV
    df.to_csv(full_path, index=False)
    print(f"[INFO] Fichier CSV sauvegardé : {full_path}")

    return full_path


In [24]:
df_cleaned = clean_flights_data(df)
save_to_csv(df_cleaned)

[INFO] Fichier CSV sauvegardé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717112538.csv


'Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717112538.csv'

## Spark Analysis


In [25]:
def get_latest_csv_path(base_path="Flights/rawzone") -> str:
    """
    Parcourt récursivement les sous-dossiers pour trouver le fichier CSV le plus récent.
    """
    latest_time = None
    latest_file = None

    for root, _, files in os.walk(base_path):
        for file in files:
            if file.endswith(".csv"):
                full_path = os.path.join(root, file)
                file_time = os.path.getmtime(full_path)
                if not latest_time or file_time > latest_time:
                    latest_time = file_time
                    latest_file = full_path

    if latest_file:
        print(f"[INFO] Dernier fichier CSV trouvé : {latest_file}")
        return latest_file
    else:
        raise FileNotFoundError("Aucun fichier CSV trouvé dans le répertoire.")


In [35]:
# spark_analysis.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc, row_number, when
from pyspark.sql.window import Window

def run_spark_analysis():
    spark = SparkSession.builder.appName("FlightRadar24 Analysis").getOrCreate()

    csv_path = get_latest_csv_path()
    df = spark.read.option("header", True).option("inferSchema", True).csv(csv_path)
    df.cache()

    print(" Début de l’analyse Spark...\n")

    # -------------------------------
    # 1. Compagnie avec le plus de vols en cours
    # -------------------------------
    df1 = df.groupBy("airline_icao").agg(count("*").alias("nb_vols")).orderBy(desc("nb_vols"))
    top_airline = df1.limit(1)
    print("1. Compagnie avec le plus de vols en cours :")
    top_airline.show(truncate=False)

    # -------------------------------
    # 2. Compagnie avec le plus de vols régionaux par continent
    # -------------------------------
    regional_df = df.filter(
        (col("origin_continent").isNotNull()) &
        (col("dest_continent").isNotNull()) &
        (col("origin_continent") == col("dest_continent")) &
        (col("airline_icao").isNotNull()) &
        (col("airline_icao") != "")
    )

    regional_counts = regional_df.groupBy("origin_continent", "airline_icao") \
                                 .agg(count("*").alias("nb_vols"))

    region_window = Window.partitionBy("origin_continent").orderBy(desc("nb_vols"))
    top_regionals = regional_counts.withColumn("rank", row_number().over(region_window)) \
                                   .filter(col("rank") == 1) \
                                   .drop("rank")

    print("2. Compagnie avec le plus de vols régionaux par continent :")
    top_regionals.show(truncate=False)

    # -------------------------------
    # 3. Vol avec le trajet le plus long
    # -------------------------------
    longest_flight = df.filter(col("distance_km").isNotNull()) \
                       .orderBy(desc("distance_km")) \
                       .limit(1)

    print("3. Vol avec le trajet le plus long :")
    longest_flight.show(truncate=False)

    # -------------------------------
    # 4. Distance moyenne par continent
    # -------------------------------
    avg_distance = df.filter(col("distance_km").isNotNull()) \
                     .groupBy("origin_continent") \
                     .agg(avg("distance_km").alias("avg_distance_km")) \
                     .orderBy("origin_continent")

    print("4. Distance moyenne des vols par continent :")
    avg_distance.show(truncate=False)

    # -------------------------------
    # 5. Constructeur avec le plus de vols actifs
    # -------------------------------
    constructor_map = {
        "A3": "Airbus", "A2": "Airbus",
        "B7": "Boeing", "B8": "Boeing",
        "E1": "Embraer", "E2": "Embraer", "EMB": "Embraer",
        "CRJ": "Bombardier", "C": "Bombardier", "DH": "Bombardier",
        "AT": "ATR",
        "DC": "McDonnell Douglas", "MD": "McDonnell Douglas",
        "G": "Gulfstream", "F": "Dassault",
        "C5": "Cessna", "C6": "Cessna",
        "SU": "Sukhoi", "C9": "COMAC", "MRJ": "Mitsubishi",
        "TU": "Tupolev", "IL": "Ilyushin", "AN": "Antonov"
    }

    constructor_expr = None
    for prefix, name in constructor_map.items():
        condition = col("aircraft_code").startswith(prefix)
        constructor_expr = when(condition, name) if constructor_expr is None else constructor_expr.when(condition, name)
    constructor_expr = constructor_expr.otherwise("Inconnu")

    df = df.withColumn("constructeur", constructor_expr)

    constructeur_top = df.groupBy("constructeur").count().orderBy(col("count").desc()).limit(1)

    print("5. Constructeur avec le plus de vols actifs :")
    constructeur_top.show(truncate=False)

    # -------------------------------
    # 6. Top 3 modèles d’avion par pays de la compagnie
    # -------------------------------
    if "origin_country" in df.columns and "aircraft_code" in df.columns:
        model_counts = df.filter(
            col("origin_country").isNotNull() & col("aircraft_code").isNotNull()
        ).groupBy("origin_country", "aircraft_code") \
         .agg(count("*").alias("nb_vols"))

        window = Window.partitionBy("origin_country").orderBy(desc("nb_vols"))
        top_models = model_counts.withColumn("rank", row_number().over(window)) \
                                 .filter(col("rank") <= 3) \
                                 .select("origin_country", "aircraft_code", "nb_vols", "rank") \
                                 .orderBy("origin_country", "rank")

        print("6. Top 3 modèles d’avion en usage par pays de compagnie :")
        top_models.show(100, truncate=False)
    else:
        print(" Colonnes nécessaires absentes pour KPI 6.")

    spark.stop()


In [36]:
run_spark_analysis()

[INFO] Dernier fichier CSV trouvé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717112538.csv
 Début de l’analyse Spark...

1. Compagnie avec le plus de vols en cours :
+------------+-------+
|airline_icao|nb_vols|
+------------+-------+
|QTR         |99     |
+------------+-------+

2. Compagnie avec le plus de vols régionaux par continent :
+----------------+------------+-------+
|origin_continent|airline_icao|nb_vols|
+----------------+------------+-------+
|africa          |ETH         |7      |
|asia            |UAE         |6      |
|europe          |EZY         |3      |
|northamerica    |AAL         |38     |
|oceania         |JST         |10     |
|southamerica    |LAN         |2      |
+----------------+------------+-------+

3. Vol avec le trajet le plus long :
+---------+--------+------------+-----------+---------+--------------+------------+----------------+--------------+------------------+-------------+------------+
|flight_id|calls

In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, abs

spark = SparkSession.builder.appName("FlightRadar24 Analysis").getOrCreate()

In [38]:
# Adapter le path selon vtre notebook
df = spark.read.option("header", True).option("inferSchema", True).csv("/content/Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717104706.csv")

# -------------------------------
# BONUS : Aéroport avec le plus d’écart entre vols sortants et entrants
# -------------------------------
if "origin_iata" in df.columns and "dest_iata" in df.columns:

    outgoing = df.groupBy("origin_iata") \
                 .agg(count("*").alias("nb_sortants")) \
                 .withColumnRenamed("origin_iata", "iata")

    incoming = df.groupBy("dest_iata") \
                 .agg(count("*").alias("nb_entrants")) \
                 .withColumnRenamed("dest_iata", "iata")

    airport_diff = outgoing.join(incoming, on="iata", how="outer") \
                           .fillna(0) \
                           .withColumn("diff_abs", abs(col("nb_sortants") - col("nb_entrants")))

    top_diff = airport_diff.orderBy(desc("diff_abs")).limit(1)

    print(" BONUS : Aéroport avec la plus grande différence entre vols sortants et entrants :")
    top_diff.show(truncate=False)

else:
    print(" Colonnes 'origin_iata' ou 'dest_iata' manquantes dans le DataFrame.")


 BONUS : Aéroport avec la plus grande différence entre vols sortants et entrants :
+----+-----------+-----------+--------+
|iata|nb_sortants|nb_entrants|diff_abs|
+----+-----------+-----------+--------+
|LHR |1          |82         |81      |
+----+-----------+-----------+--------+



## Pipeline ETL

In [39]:
import logging
from datetime import datetime
import traceback

def run_pipeline():
    logger = logging.getLogger("FlightRadarETL")
    logging.basicConfig(level=logging.INFO)

    try:
        logger.info("🛫 Lancement du pipeline ETL FlightRadar24")

        # 1. Extraction

        logger.info("Extraction des données")

        df = extract_flights()

        # 2. Nettoyage
        logger.info(" Transformation des données en cours...")

        df_cleaned = clean_flights_data(df)

        # 3. Sauvegarde (CSV ou Parquet)
        path_saved = save_to_csv(df_cleaned)

        logger.info(f" Données sauvegardées dans : {path_saved}")

        # 4. Déclenchement de l’analyse Spark
        run_spark_analysis()

    except Exception as e:
        logger.error(f" Une erreur est survenue : {e}")
        traceback.print_exc()


### Tester de la pipeline

In [40]:
run_pipeline()

[INFO] Fichier CSV sauvegardé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717113353.csv
[INFO] Dernier fichier CSV trouvé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717113353.csv
 Début de l’analyse Spark...

1. Compagnie avec le plus de vols en cours :
+------------+-------+
|airline_icao|nb_vols|
+------------+-------+
|QTR         |99     |
+------------+-------+

2. Compagnie avec le plus de vols régionaux par continent :
+----------------+------------+-------+
|origin_continent|airline_icao|nb_vols|
+----------------+------------+-------+
|africa          |ETH         |6      |
|asia            |UAE         |7      |
|europe          |THY         |4      |
|northamerica    |AAL         |35     |
|oceania         |JST         |11     |
|southamerica    |LAN         |2      |
+----------------+------------+-------+

3. Vol avec le trajet le plus long :
+---------+--------+------------+-----------+----

## Lancement du Cronjonb à fréquence de 2 min pour tester

In [41]:
import time
import logging
import traceback
from datetime import datetime

log_file = "pipeline.log"
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("FlightRadarETL")

def run_pipeline_job():
    logger.info(" Lancement du job ETL")
    print("\n Lancement du job ETL...")

    try:
        run_pipeline()
        logger.info("Pipeline terminé avec succès.")
        print(" Pipeline terminé avec succès.")
    except Exception as e:
        logger.error(f" Erreur dans le pipeline : {e}")
        logger.error(traceback.format_exc())
        print(f" Erreur dans le pipeline : {e}")

# Boucle de test : exécuter toutes les 2 minutes
for i in range(3):
    print(f"\n=========== 🔄 Lancement #{i+1} à {datetime.now().strftime('%H:%M:%S')} ===========")
    run_pipeline_job()
    print("\n ")
    print(f"[🕒] Prochain lancement dans 2 minutes...\n")
    time.sleep(2 * 60)




 Lancement du job ETL...
[INFO] Fichier CSV sauvegardé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717113619.csv
[INFO] Dernier fichier CSV trouvé : Flights/rawzone/tech_year=2025/tech_month=2025-07/tech_day=2025-07-17/flights_20250717113619.csv
 Début de l’analyse Spark...

1. Compagnie avec le plus de vols en cours :
+------------+-------+
|airline_icao|nb_vols|
+------------+-------+
|QTR         |99     |
+------------+-------+

2. Compagnie avec le plus de vols régionaux par continent :
+----------------+------------+-------+
|origin_continent|airline_icao|nb_vols|
+----------------+------------+-------+
|africa          |ETH         |5      |
|asia            |UAE         |7      |
|europe          |THY         |3      |
|northamerica    |AAL         |33     |
|oceania         |JST         |10     |
|southamerica    |LAN         |2      |
+----------------+------------+-------+

3. Vol avec le trajet le plus long :
+---------+--------+--

### Lancement d'un Job chaque 2 heures ( `while True:` pour exécution infinie )

In [None]:
import time

while True:
    try:
        run_pipeline()
        print(" Pipeline exécutée avec succès.")
    except Exception as e:
        print(f" Erreur : {e}")
    print("\n ")
    print("⏳ En attente de 2h...")
    print("\n ")
    time.sleep(2 * 60 * 60)
