In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, udf, when, desc, sum, dense_rank

In [None]:
# Initialisation de la SparkSession
spark = SparkSession \
    .builder \
    .appName("dataframes-flights") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
# Lecture du fichier
flights_file = "data/flights.csv"

flights_df = spark.read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(flights_file, format="csv")

In [None]:
# 1
flights_df.filter(col("distance") > 1000) \
    .show()

In [None]:
# 2
delay_df = flights_df.groupBy("destination") \
    .agg(sum("delay").alias("Count")) \
    .orderBy("Count", ascending=False)

delay_df.show(n=10)

In [None]:
# 3
delay_df.write.format("csv").save("delay.csv", mode='overwrite')

In [None]:
# 4
delay_df.repartition(10).write.format("csv").save("delay.csv", mode='overwrite')

In [None]:
# 5
flights_df.select("delay", "origin", "destination") \
    .where(col("origin") == "SFO") \
    .where(col("destination") == "ORD") \
    .where(col("delay") > 120) \
    .orderBy("delay", ascending=False) \
    .show()

In [None]:
# 6
flights_df.select("delay", "origin", "destination") \
    .withColumn("delay_type",
                when(col("delay") >= 360, "Très long retard")
                .when((col("delay") >= 120) & (col("delay") < 360), "Long retard")
                .when((col("delay") >= 60) & (col("delay") < 120), "Retard modéré")
                .when((col("delay") > 0) & (col("delay") < 60), "Retard tolérable")
                .when(col("delay") == 0, "Pas de retard")
                .otherwise("En avance")) \
    .show()

In [None]:
# 7
df = flights_df.select("origin", "destination", "delay") \
    .where(col("origin").isin("SEA", "SFO", "JFK")) \
    .where(col("destination").isin("SEA", "SFO", "JFK", "DEN", "ORD", "LAX", "ATL")) \
    .groupBy("origin", "destination") \
    .agg(sum("delay").alias("TotalDelays")) \
    .orderBy("origin")

df.show()

In [None]:
# 8
windowSpec = Window.partitionBy("origin").orderBy(desc("TotalDelays"))

df.withColumn("dense_rank", dense_rank().over(windowSpec)) \
    .where(col("dense_rank") <= 3) \
    .show()

In [None]:
# 9
def route(origin, destination):
    return origin + "-" + destination


route_col = udf(route, StringType())

flights_df.withColumn('route', route_col('origin', 'destination')).show()