In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkMySQLIntegration") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.30") \
    .getOrCreate()


In [3]:
flights_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql-nadim.alwaysdata.net:3306/nadim_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "flights") \
    .option("user", "nadim") \
    .option("password", "root123456789@") \
    .load()


In [None]:
flights_df.show()


In [None]:
planes_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql-nadim.alwaysdata.net:3306/nadim_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "planes") \
    .option("user", "nadim") \
    .option("password", "root123456789@") \
    .load()


In [None]:
weather_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql-nadim.alwaysdata.net:3306/nadim_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "weather") \
    .option("user", "nadim") \
    .option("password", "root123456789@") \
    .load()


In [None]:
airlines_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql-nadim.alwaysdata.net:3306/nadim_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "airlines") \
    .option("user", "nadim") \
    .option("password", "root123456789@") \
    .load()


In [None]:
airports_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql-nadim.alwaysdata.net:3306/nadim_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "airports") \
    .option("user", "nadim") \
    .option("password", "root123456789@") \
    .load()


In [None]:
flights_df.printSchema()


In [None]:
weather_df.printSchema()


In [None]:
planes_df.printSchema()


In [None]:
airports_df.printSchema()


In [None]:
airlines_df.printSchema()


## Request 1: Counting Airports, Timezones, Companies, Planes, and Cancelled Flights


### 1.1. Nombre d'aéroports au total, en tant que départ et en tant que destination :

In [None]:
# Compter les aéroports distincts en tant que départ et destination
from pyspark.sql.functions import col

total_airports = flights_df.select(col("origin").alias("airport")).union(flights_df.select(col("dest").alias("airport"))).distinct().count()
departure_airports = flights_df.select("origin").distinct().count()
destination_airports = flights_df.select("dest").distinct().count()

total_airports, departure_airports, destination_airports


### 1.2. Combien d'aéroports n'observent pas l'heure d'été (DST) et combien de fuseaux horaires y a-t-il ?

In [None]:
# Compter les aéroports qui n'observent pas l'heure d'été (DST) et les fuseaux horaires distincts
from pyspark.sql.functions import col

airports_without_dst = airports_df.filter(col("dst") == "N").count()
distinct_timezones = airports_df.select("tzone").distinct().count()

airports_without_dst, distinct_timezones


### 1.3. Combien de compagnies aériennes, d'avions et de vols annulés ?

In [None]:
# Compter les compagnies aériennes distinctes et les avions distincts
distinct_airlines = airlines_df.select("carrier").distinct().count()
distinct_planes = planes_df.select("tailnum").distinct().count()

distinct_airlines, distinct_planes


### 2. Quel est l'aéroport de départ le plus emprunté ?

In [None]:
from pyspark.sql.functions import desc

# Trouver l'aéroport de départ le plus emprunté
most_used_departure_airport = flights_df.groupBy("origin").count().orderBy(desc("count")).first()["origin"]

most_used_departure_airport


### 2.1. Quelles sont les 10 destinations les plus (moins) prisées (en indiquant le nom complet des destinations et le % correspondant pour chaque destination) ?

In [None]:
# Trouver les 10 destinations les plus prisées
top_10_destinations = flights_df.groupBy("dest").count().orderBy(desc("count")).limit(10)

# Trouver les 10 destinations les moins prisées
bottom_10_destinations = flights_df.groupBy("dest").count().orderBy("count").limit(10)

top_10_destinations.show()
bottom_10_destinations.show()


### 2.2. Quelles sont les 10 avions qui ont le plus (moins) décollé ?

In [None]:
# Trouver les 10 avions qui ont le plus décollé
top_10_airplanes = flights_df.groupBy("tailnum").count().orderBy(desc("count")).limit(10)

# Trouver les 10 avions qui ont le moins décollé
bottom_10_airplanes = flights_df.groupBy("tailnum").count().orderBy("count").limit(10)

top_10_airplanes.show()
bottom_10_airplanes.show()


### 3. Combien chaque compagnie a desservi de destinations ?

In [None]:
from pyspark.sql import functions as F

# Compter le nombre de destinations desservies par chaque compagnie
destinations_per_airline = flights_df.groupBy("carrier") \
    .agg(F.count("dest").alias("number_of_destinations_served"))

destinations_per_airline.show()


### 3.1. Combien chaque compagnie a desservi de destinations par aéroport d'origine ?

In [None]:
# Compter le nombre de destinations desservies par chaque compagnie par aéroport d'origine
destinations_per_airline_per_origin = flights_df.groupBy("origin", "carrier").agg({"dest": "count"})

destinations_per_airline_per_origin.show()


### 4. Trouver tous les vols ayant atterri à Houston (IAH ou HOU) ?



In [None]:
# Trouver tous les vols ayant atterri à Houston (IAH ou HOU)
flights_to_houston = flights_df.filter((flights_df["dest"] == "IAH") | (flights_df["dest"] == "HOU"))

flights_to_houston.show()


## 4.1. Combien de vols partent des aéroports de New York (NYC) vers Seattle ?

In [None]:
# Combien de vols partent des aéroports de New York (NYC) vers Seattle ?
flights_to_seattle = flights_df.filter((flights_df["origin"].startswith("JFK")) | (flights_df["origin"].startswith("LGA")) | (flights_df["origin"].startswith("EWR"))) \
    .filter(flights_df["dest"] == "SEA")

flights_to_seattle.count()


## 4.2. Combien de compagnies desservent la destination Seattle (SEA) et combien d'avions uniques volent vers Seattle ?

In [None]:
# Combien de compagnies desservent la destination Seattle (SEA) ?
unique_carriers_to_seattle = flights_to_seattle.select("carrier").distinct().count()

# Combien d'avions uniques volent vers Seattle ?
unique_airplanes_to_seattle = flights_to_seattle.select("tailnum").distinct().count()

unique_carriers_to_seattle, unique_airplanes_to_seattle


## 5. Trouver le nombre de vols par destination ?

In [None]:
# Trouver le nombre de vols par destination
flights_per_destination = flights_df.groupBy("dest").count()

flights_per_destination.show()


## 5.1. Trier les vols par destination, puis par aéroport d'origine, puis par compagnie dans un ordre alphabétique croissant (en réalisant les jointures nécessaires pour obtenir les noms explicites des aéroports) ?

In [None]:
from pyspark.sql.functions import asc

# Jointure pour obtenir les noms explicites des aéroports
flights_with_airport_names = flights_df.join(airports_df, flights_df["dest"] == airports_df["faa"], "left") \
    .select(flights_df["*"], airports_df["name"].alias("destination_name"))

# Trier les vols par destination, aéroport d'origine et compagnie en ordre alphabétique croissant
sorted_flights = flights_with_airport_names \
    .orderBy(asc("dest"), asc("origin"), asc("carrier"))

sorted_flights.show()


## 6. Quelles sont les compagnies qui n'opèrent pas sur tous les aéroports d'origine ?

In [None]:
from pyspark.sql.functions import col

# Trouver les compagnies qui n'opèrent pas sur tous les aéroports d'origine
companies_not_operating_on_all_origins = flights_df.groupBy("carrier", "origin").count() \
    .groupBy("carrier").count() \
    .filter(col("count") < airports_df.select("faa").distinct().count())

companies_not_operating_on_all_origins.show()


## 6.1. Quelles sont les compagnies qui desservent l'ensemble des destinations ?

In [None]:
from pyspark.sql.functions import collect_list

# Trouver les compagnies qui desservent l'ensemble des destinations
all_destinations = airports_df.select("faa").distinct().collect()
all_destinations_list = [row.faa for row in all_destinations]

companies_serving_all_destinations = flights_df.groupBy("carrier", "dest").count() \
    .groupBy("carrier").agg(collect_list("dest").alias("served_destinations")) \
    .filter(col("served_destinations") == all_destinations_list)

companies_serving_all_destinations.show()


## 7. Quelles sont les destinations qui sont exclusives à certaines compagnies ?

In [None]:
from pyspark.sql.functions import collect_list

# Trouver les destinations qui sont exclusives à certaines compagnies
destinations_per_company = flights_df.groupBy("carrier").agg(collect_list("dest").alias("served_destinations"))

all_destinations = airports_df.select("faa").distinct().collect()
all_destinations_list = [row.faa for row in all_destinations]

exclusive_destinations = destinations_per_company.filter(~col("served_destinations").contains(all_destinations_list))

exclusive_destinations.show()


## 8. Filtrer les vols pour trouver ceux exploités par United (UA), American (AA) ou Delta (DL) ?

In [None]:
# Filtrer les vols exploités par United (UA), American (AA) ou Delta (DL)
selected_airlines = ["UA", "AA", "DL"]

flights_operated_by_selected_airlines = flights_df.filter(flights_df["carrier"].isin(selected_airlines))

flights_operated_by_selected_airlines.show()


## Request 4: Flights to Houston and NYC to Seattle Information

In [None]:
# Flights that landed in Houston (IAH or HOU)
flights_to_houston = flights_df.filter(flights_df["dest"].isin(["IAH", "HOU"]))

# Flights from NYC airports to Seattle
nyc_to_seattle_flights = flights_df.filter(flights_df["origin"].isin(["JFK", "LGA", "EWR"]) & (flights_df["dest"] == "SEA"))

# Number of companies serving from NYC to Seattle
companies_serving_nyc_sea = nyc_to_seattle_flights.select("carrier").distinct().count()

# Number of unique planes serving from NYC to Seattle
unique_planes_nyc_sea = nyc_to_seattle_flights.select("tailnum").distinct().count()

(flights_to_houston.count(), nyc_to_seattle_flights.count(), companies_serving_nyc_sea, unique_planes_nyc_sea)


## Request 5: Number of Flights per Destination

In [None]:
# Number of flights per destination
flights_per_destination = flights_df.groupBy("dest").count()

# Sorting flights by destination, origin, and carrier
sorted_flights = flights_df.join(airports_df, flights_df["dest"] == airports_df["faa"]) \
    .join(airlines_df, flights_df["carrier"] == airlines_df["carrier"]) \
    .orderBy("dest", "origin", "carrier")

(flights_per_destination, sorted_flights)


In [None]:
from pyspark.sql.functions import col, count

# Count the number of flights per destination
flights_per_destination = flights_df.groupBy("dest").agg(count("*").alias("num_flights"))

# Join the flights with the airports DataFrame to get the full names
flights_with_airports = flights_df.join(airports_df, flights_df.dest == airports_df.faa, "left_outer") \
                                  .select("dest", "name", "origin", "carrier")

# Join the flights_with_airports with the airlines DataFrame to get the full names of the airlines
flights_with_airports_and_airlines = flights_with_airports.join(airlines_df, flights_with_airports.carrier == airlines_df.carrier, "left_outer") \
                                                          .select("name", "origin", "carrier", "airline")

# Sort the results by destination name, origin, and airline in ascending order
sorted_flights = flights_with_airports_and_airlines.orderBy(col("name"), col("origin"), col("airline"))

# Show the sorted flights DataFrame
sorted_flights.show()


## Request 6: Companies Not Operating on All Airports

In [None]:
from pyspark.sql.functions import collect_set

# Find all unique origins and destinations
unique_origins = flights_df.select("origin").distinct()
unique_destinations = flights_df.select("dest").distinct()

# Count the total number of unique origins and destinations
num_unique_origins = unique_origins.count()
num_unique_destinations = unique_destinations.count()

# Aggregate all origins and destinations per airline
airlines_origins = flights_df.groupBy("carrier").agg(collect_set("origin").alias("served_origins"))
airlines_destinations = flights_df.groupBy("carrier").agg(collect_set("dest").alias("served_destinations"))

# Find airlines that do not operate on all airports
airlines_not_all_origins = airlines_origins.filter(size(col("served_origins")) < num_unique_origins)
airlines_all_destinations = airlines_destinations.filter(size(col("served_destinations")) == num_unique_destinations)

# Show airlines that do not operate on all airports and those that serve all destinations
airlines_not_all_origins.show()
airlines_all_destinations.show()


## Request 7: Quelles sont les destinations qui sont exclusives à certaines compagnies?

## Request 8: Filtrer le vol pour trouver ceux exploités par United, American ou Delta.