## Flight Radar 24 Kata Solver

In [69]:
from FlightRadar24.api import FlightRadar24API
fr_api = FlightRadar24API()

In [70]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from datetime import datetime

In [90]:
# Getting flights list 
def get_flights_dataframe(fr_api,spark):
    flights = fr_api.get_flights()
    # Transform to a DataFrame
    flights_to_dict=[]
    for flight in flights:
        flights_to_dict.append({
            "id":flight.id,
            "icao_24bit":flight.icao_24bit,
            "latitude":flight.latitude,
            "longitude":flight.longitude,
            "heading":flight.heading,
            "altitude":flight.altitude,
            "ground_speed":flight.ground_speed,
            "aircraft_code":flight.aircraft_code,
            "registration":flight.registration,
            "time":flight.time,
            "origin_airport_iata":flight.origin_airport_iata,
            "destination_airport_iata":flight.destination_airport_iata,
            "number":flight.number,
            "airline_iata":flight.airline_iata,
            "on_ground":flight.on_ground,
            "vertical_speed":flight.vertical_speed,
            "callsign":flight.callsign,
            "airline_icao":flight.airline_icao
        })
    flights_df=spark.createDataFrame(data=flights_to_dict).na.replace(["N/A","NaN",""],None).dropna()
    return flights_df


In [72]:
# Getting airlines list
def get_airlines_dataframe(fr_api,spark):
    airlines = fr_api.get_airlines()
    # Transform to a DataFrame
    airlines_df=spark.createDataFrame(data=airlines).na.replace(["N/A","NaN",""],None).dropna()    
    return airlines_df

In [None]:
#Getting zones list
zones=fr_api.get_zones()
zones

In [74]:
def dataframe_to_csv(dataframe):
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
    output_path = f"Flights/rawzone/tech_year={timestamp[:4]}/tech_month={timestamp[:6]}/tech_day={timestamp[:8]}/flights{timestamp}"
    dataframe.write.csv(output_path, header=True, mode="overwrite")

### 1. Companie avec le plus de vols en cours

In [75]:
def task_1(flights_df):
    grouped_active_flights = flights_df.filter("on_ground==0").groupBy("airline_icao").agg({"id": "count"}).withColumnRenamed("count(id)", "count")
    max_count = grouped_active_flights.agg({"count": "max"}).collect()[0][0]
    most_active_airline=grouped_active_flights.filter(F.col("count") == max_count)

    # Store results in a csv file
    dataframe_to_csv(most_active_airline)

    return most_active_airline.collect()


### 2. Pour chaque continent, la compagnie avec le plus de vols régionaux actifs

In [76]:

def task_2(flights_df,udf_get_continent):
    grouped_regional_flights = flights_df.filter("origin_airport_iata==destination_airport_iata" and "on_ground==0")\
    .withColumn("continent", udf_get_continent(F.col("latitude"), F.col("longitude")))\
    .groupBy(["continent","airline_icao"]).agg(F.count("id")).withColumnRenamed("count(id)", "count")

    windows=Window.partitionBy("continent").orderBy(F.col("count").desc())

    grouped_regional_flights=grouped_regional_flights.withColumn("row_number", F.row_number().over(windows)).filter(F.col("row_number") == 1).drop("row_number")


    # Store results in a csv file
    dataframe_to_csv(grouped_regional_flights)

    return grouped_regional_flights.collect()


### 3. Le vol en cours avec le trajet le plus long

In [77]:
def task_3(flights_df,spark):    
    longest_flight_trip= flights_df.orderBy(F.col("time").desc()).first()
    df=spark.createDataFrame(data=[longest_flight_trip])
    dataframe_to_csv(df)
    return longest_flight_trip


### 4. Pour chaque continent, la longueur de vol moyenne

In [78]:
def task_4(flights_df,udf_get_continent):
    average_flight_duration_per_continent = flights_df.withColumn("continent", udf_get_continent(F.col("latitude"), F.col("longitude")))\
    .groupBy("continent").avg("time").withColumnRenamed("avg(time)", "average_time")

    # Store results in a csv file
    dataframe_to_csv(average_flight_duration_per_continent)

    return average_flight_duration_per_continent.collect()

### 5. L'entreprise constructeur d'avions avec le plus de vols actifs

In [79]:

def task_5(flights_df,spark):    
    most_active_aircraft = flights_df.filter("on_ground==0").groupBy("aircraft_code").agg({"id": "count"}).withColumnRenamed("count(id)", "count")\
        .orderBy(F.col("count").desc()).first()
    df=spark.createDataFrame(data=[most_active_aircraft])
    dataframe_to_csv(df)
    return most_active_aircraft
# join with planes table to get the manifucturer

### 6. Pour chaque pays de compagnie aérienne, le top 3 des modèles d'avion en usage

In [80]:
def task_6(flights_df):    
    top_3_aircrafts_per_country = flights_df.groupBy(["airline_iata","aircraft_code"]).agg({"id": "count"}).withColumnRenamed("count(id)", "count") \
        .orderBy(F.col("count").desc())

    windows=Window.partitionBy("airline_iata").orderBy(F.col("count").desc())

    top_3_aircrafts_per_country=top_3_aircrafts_per_country.withColumn("row_number", F.row_number().over(windows)).filter(F.col("row_number") <= 3).drop("row_number")


    # Store results in a csv file
    dataframe_to_csv(top_3_aircrafts_per_country)

    return top_3_aircrafts_per_country.collect()


In [81]:
def get_continent(latitude,longitude,zones):
    for continent, value in zones.items():
        if value["tl_x"] <= latitude <= value["br_x"] and  value["br_y"]<= longitude <=value["tl_y"] :
            return continent
    return 'unknown'

In [82]:
def FlightRadar24ETL():
    fr_api = FlightRadar24API()
    spark=SparkSession.builder.getOrCreate()
    flights_df=get_flights_dataframe(fr_api,spark)
    # print(flights_df.head(3))
    zones=fr_api.get_zones()
    udf_get_continent=F.udf(get_continent)
    

    task_1(flights_df)
    task_2(flights_df,udf_get_continent)
    task_3(flights_df,spark)
    task_4(flights_df,udf_get_continent)
    task_5(flights_df,spark)
    task_6(flights_df)    

In [83]:
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

# Exécuter le pipeline toutes les 2 heures
scheduler.add_job(FlightRadar24ETL, 'interval', seconds=5)

scheduler.start()


  fields = [
  for column, series in pdf.iteritems():


KeyboardInterrupt: 