In [7]:
from pyspark.sql import SparkSession

# Inicjalizacja sesji Spark z konfiguracją dla JDBC
spark = SparkSession.builder \
    .appName("FlightsAnalysisApp") \
    .config("spark.jars", "spark-bigquery-connector.jar") \
    .getOrCreate()

In [8]:
# Funkcja do ładowania danych z bazy danych do PySpark DataFrame
def load_data_from_db(dbtable, spark_session):
    return spark.read.format("bigquery") \
            .option("project", "mentoring-372319") \
            .option("dataset", "flights_database") \
            .option("table", dbtable) \
            .load()



# Załadowanie danych z różnych tabel
spark_df_bookings = load_data_from_db("bookings", spark)
spark_df_tickets = load_data_from_db("tickets", spark)
spark_df_ticket_flights = load_data_from_db("ticket_flights", spark)
spark_df_flights = load_data_from_db("flights", spark)
spark_df_airports = load_data_from_db("airports_data", spark)

# # Wyświetlenie danych
# spark_df_bookings.show()
# spark_df_tickets.show()
# spark_df_ticket_flights.show()
# spark_df_flights.show()
# spark_df_airports.show()

In [9]:
# # Joining of tables
# df_combined = df_bookings.merge(df_tickets, on='book_ref')\
#                          .merge(df_ticket_flights, on='ticket_no')\
#                          .merge(df_flights, on='flight_id')\
#                          .merge(df_airports, left_on='arrival_airport', right_on='airport_code')

# print(df_combined.head())



spark_df_combined = spark_df_bookings.join(
        spark_df_tickets,
        on=['book_ref'],
        how='left'
    )

spark_df_combined = spark_df_combined.join(
        spark_df_ticket_flights,
        on=['ticket_no'],
        how='left'
    )

spark_df_combined = spark_df_combined.join(
        spark_df_flights,
        on=['flight_id'],
        how='left'
    )

spark_df_combined = spark_df_combined.join(
        spark_df_airports,
        spark_df_combined['arrival_airport'] == spark_df_airports['airport_code'],
        how='left'
    )


# # Wyświetlenie danych
# spark_df_bookings.show()
# spark_df_tickets.show()
# spark_df_ticket_flights.show()
# spark_df_flights.show()
# spark_df_airports.show()

#spark_df_combined.head(5)

## Sum of bookings made per arrival airport daily - average 

In [12]:
from pyspark.sql.functions import count, countDistinct, col

# Krok 1: Oblicz liczbę rezerwacji na lotnisko
liczba_rezerwacji_per_lotnisko = spark_df_combined.groupBy("airport_name").agg(count("book_date").alias("l_rezerwacji"))

# Krok 2: Oblicz liczbę dni z rezerwacjami na lotnisko
liczba_dni_z_rezerwacjami = spark_df_combined.groupBy("airport_name").agg(countDistinct("book_date").alias("l_dni"))

# Krok 3: Połącz oba DataFrame na podstawie nazwy lotniska
srednia_liczba_rezerwacji_na_dzien = liczba_rezerwacji_per_lotnisko.join(liczba_dni_z_rezerwacjami, on="airport_name")

# Krok 4: Oblicz średnią liczbę rezerwacji na dzień dla każdego lotniska
srednia_liczba_rezerwacji_na_dzien = srednia_liczba_rezerwacji_na_dzien.withColumn("srednia_rezerwacji_na_dzien", col("l_rezerwacji") / col("l_dni"))

In [14]:
srednia_liczba_rezerwacji_na_dzien = srednia_liczba_rezerwacji_na_dzien.orderBy(col('srednia_rezerwacji_na_dzien').desc())

# Pokaż wynik
srednia_liczba_rezerwacji_na_dzien.show()



+--------------------+------------+-----+---------------------------+
|        airport_name|l_rezerwacji|l_dni|srednia_rezerwacji_na_dzien|
+--------------------+------------+-----+---------------------------+
|{"en": "Domodedov...|     1164744|  390|          2986.523076923077|
|{"en": "Sheremety...|      960332|  389|          2468.719794344473|
|{"en": "Pulkovo A...|      893576|  389|         2297.1105398457585|
|{"en": "Vnukovo I...|      587874|  389|         1511.2442159383033|
|{"en": "Irkutsk A...|      398329|  389|         1023.9820051413882|
|{"en": "Kazan Int...|      341207|  389|          877.1388174807198|
|{"en": "Tolmachev...|      273107|  388|          703.8840206185567|
|{"en": "Khabarovs...|      245007|  389|          629.8380462724936|
|{"en": "Sochi Int...|      232123|  388|          598.2551546391752|
|{"en": "Bolshoye ...|      181155|  387|          468.1007751937984|
|{"en": "Bryansk A...|      142780|  388|          367.9896907216495|
|{"en": "Belgorod ..

                                                                                

In [34]:
spark_df_combined_departure_airport = spark_df_bookings.join(
        spark_df_tickets,
        on=['book_ref'],
        how='left'
    )

spark_df_combined_departure_airport = spark_df_combined_departure_airport.join(
        spark_df_ticket_flights,
        on=['ticket_no'],
        how='left'
    )

spark_df_combined_departure_airport = spark_df_combined_departure_airport.join(
        spark_df_flights,
        on=['flight_id'],
        how='left'
    )

spark_df_combined_departure_airport = spark_df_combined_departure_airport.join(
        spark_df_airports,
        spark_df_combined_departure_airport['departure_airport'] == spark_df_airports['airport_code'],
        how='left'
    )


In [35]:
spark_df_combined_departure_airport.show()

[Stage 233:>                                                        (0 + 1) / 1]

+---------+-------------+--------+-------------------+------------+------------+--------------+--------------------+---------------+------+---------+-------------------+-------------------+-----------------+---------------+-------+-------------+-------------------+-------------------+------------+--------------------+--------------------+--------------------+-------------+
|flight_id|    ticket_no|book_ref|          book_date|total_amount|passenger_id|passenger_name|        contact_data|fare_conditions|amount|flight_no|scheduled_departure|  scheduled_arrival|departure_airport|arrival_airport| status|aircraft_code|   actual_departure|     actual_arrival|airport_code|        airport_name|                city|         coordinates|     timezone|
+---------+-------------+--------+-------------------+------------+------------+--------------+--------------------+---------------+------+---------+-------------------+-------------------+-----------------+---------------+-------+-------------+---

                                                                                

## Average departuring passenger count per flight on arrival airport

In [15]:
from pyspark.sql.functions import count, countDistinct, col

# Krok 1: Oblicz liczbę pasazerow na lotnisko
liczba_pasazerow_per_lotnisko = spark_df_combined.groupBy("airport_name").agg(count("ticket_no").alias("l_pasazerow"))

# Krok 2: Oblicz liczbę lotow
liczba_lotow = spark_df_combined.groupBy("airport_name").agg(countDistinct("flight_id").alias("l_lotow"))

# Krok 3: Połącz oba DataFrame na podstawie nazwy lotniska
srednia_liczba_pasazerow_na_lot = liczba_pasazerow_per_lotnisko.join(liczba_lotow, on="airport_name")

# Krok 4: Oblicz średnią liczbę pasazerow na lot dla każdego lotniska
srednia_liczba_pasazerow_na_lot = srednia_liczba_pasazerow_na_lot.withColumn("srednia_liczba_pasazerow_na_lot", col("l_pasazerow") / col("l_lotow"))

srednia_liczba_pasazerow_na_lot = srednia_liczba_pasazerow_na_lot.orderBy(col('srednia_liczba_pasazerow_na_lot').desc())

In [16]:
# Pokaż wynik
srednia_liczba_pasazerow_na_lot.show()

24/07/22 13:05:56 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.186.0.16:50140 is closed
24/07/22 13:05:56 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 79 from block manager BlockManagerId(17, kuba-cluster-w-0.europe-central2-b.c.mentoring-372319.internal, 36487, None)
java.io.IOException: Connection from /10.186.0.16:50140 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) ~[spark-network-common_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) ~[spark-network-common_2.12-3.5.0.jar:3.5.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-tran

+--------------------+-----------+-------+-------------------------------+
|        airport_name|l_pasazerow|l_lotow|srednia_liczba_pasazerow_na_lot|
+--------------------+-----------+-------+-------------------------------+
|{"en": "Kazan Int...|     341207|   1181|             288.91363251481795|
|{"en": "Irkutsk A...|     398329|   1425|             279.52912280701753|
|{"en": "Ugolny Ai...|      21746|    111|              195.9099099099099|
|{"en": "Pulkovo A...|     893576|   6175|             144.70866396761133|
|{"en": "Kemerovo ...|      53401|    393|             135.88040712468194|
|{"en": "Anapa Vit...|     100820|    779|             129.42233632862644|
|{"en": "Khrabrovo...|     122756|    952|              128.9453781512605|
|{"en": "Bratsk Ai...|      48248|    393|             122.76844783715013|
|{"en": "Khabarovs...|     245007|   2118|              115.6784702549575|
|{"en": "Chita-Kad...|      53545|    499|             107.30460921843688|
|{"en": "Naryan Ma...|   

24/07/22 13:08:25 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.186.0.16:50120 is closed
24/07/22 13:08:25 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 99 from block manager BlockManagerId(16, kuba-cluster-w-0.europe-central2-b.c.mentoring-372319.internal, 33423, None)
java.io.IOException: Connection from /10.186.0.16:50120 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) ~[spark-network-common_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) ~[spark-network-common_2.12-3.5.0.jar:3.5.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-tran

## Average departuring passenger count per flight on arrival airport (for business class only)

In [17]:
from pyspark.sql.functions import count, countDistinct, col


# Krok 0: Filtruj tylko rezerwacje dla klasy biznes
business_class_df = spark_df_combined.filter(col("fare_conditions") == "Business")

# Krok 1: Oblicz liczbę pasażerów na lotnisko
liczba_pasazerow_per_lotnisko = business_class_df.groupBy("airport_name").agg(count("ticket_no").alias("l_pasazerow"))

# Krok 2: Oblicz liczbę lotow
liczba_lotow = spark_df_combined.groupBy("airport_name").agg(countDistinct("flight_id").alias("l_lotow"))

# Krok 3: Połącz oba DataFrame na podstawie nazwy lotniska
srednia_liczba_pasazerow_na_lot = liczba_pasazerow_per_lotnisko.join(liczba_lotow, on="airport_name")

# Krok 4: Oblicz średnią liczbę pasazerow na lot dla każdego lotniska
srednia_liczba_pasazerow_na_lot = srednia_liczba_pasazerow_na_lot.withColumn("srednia_liczba_pasazerow_na_lot", col("l_pasazerow") / col("l_lotow"))

srednia_liczba_pasazerow_na_lot = srednia_liczba_pasazerow_na_lot.orderBy(col('srednia_liczba_pasazerow_na_lot').desc())

# Pokaż wynik
srednia_liczba_pasazerow_na_lot.show()



+--------------------+-----------+-------+-------------------------------+
|        airport_name|l_pasazerow|l_lotow|srednia_liczba_pasazerow_na_lot|
+--------------------+-----------+-------+-------------------------------+
|{"en": "Kazan Int...|      97689|   1181|              82.71718882303134|
|{"en": "Irkutsk A...|     109581|   1425|              76.89894736842105|
|{"en": "Ugolny Ai...|       7720|    111|              69.54954954954955|
|{"en": "Bratsk Ai...|      16895|    393|              42.98982188295165|
|{"en": "Kemerovo ...|      13548|    393|              34.47328244274809|
|{"en": "Sochi Int...|      75816|   2342|              32.37233134073441|
|{"en": "Pulkovo A...|     189136|   6175|              30.62931174089069|
|{"en": "Yuzhno-Sa...|      35235|   1157|             30.453759723422646|
|{"en": "Khabarovs...|      62476|   2118|             29.497639282341833|
|{"en": "Anapa Vit...|      22288|    779|             28.611039794608473|
|{"en": "Tolmachev...|   

                                                                                

## Average number of flights per arrival airport per day

In [19]:
from pyspark.sql.functions import count, countDistinct, col

# Krok 1: Oblicz liczbę lotow
liczba_lotow = spark_df_combined.groupBy("airport_name").agg(countDistinct("flight_id").alias("l_lotow"))

# Krok 2: Oblicz liczbe dni z rezerwacjami
liczba_dni_z_rezerwacjami = spark_df_combined.groupBy("airport_name").agg(countDistinct("book_date").alias("l_dni"))

# Krok 3: Połącz oba DataFrame na podstawie nazwy lotniska
srednia_liczba_lotow_na_dzien = liczba_dni_z_rezerwacjami.join(liczba_lotow, on="airport_name")

# Krok 4: Oblicz średnią liczbę lotow na dzien dla każdego lotniska
srednia_liczba_lotow_na_dzien = srednia_liczba_lotow_na_dzien.withColumn("srednia_liczba_lotow_na_dzien", col("l_lotow") / col("l_dni"))

srednia_liczba_lotow_na_dzien = srednia_liczba_lotow_na_dzien.orderBy(col('srednia_liczba_lotow_na_dzien').desc())

# Pokaż wynik
srednia_liczba_lotow_na_dzien.show()



+--------------------+-----+-------+-----------------------------+
|        airport_name|l_dni|l_lotow|srednia_liczba_lotow_na_dzien|
+--------------------+-----+-------+-----------------------------+
|{"en": "Domodedov...|  390|  14444|            37.03589743589744|
|{"en": "Sheremety...|  389|  14103|           36.254498714652954|
|{"en": "Vnukovo I...|  389|   8034|           20.652956298200515|
|{"en": "Pulkovo A...|  389|   6175|           15.874035989717223|
|{"en": "Tolmachev...|  388|   3924|            10.11340206185567|
|{"en": "Bolshoye ...|  387|   2622|            6.775193798449612|
|{"en": "Sochi Int...|  388|   2342|            6.036082474226804|
|{"en": "Bryansk A...|  388|   2341|            6.033505154639175|
|{"en": "Novy Uren...|  388|   2289|            5.899484536082475|
|{"en": "Koltsovo ...|  387|   2281|             5.89405684754522|
|{"en": "Khabarovs...|  389|   2118|           5.4447300771208225|
|{"en": "Yemelyano...|  388|   1722|            4.438144329896

                                                                                

## Average flight delay on airport per day

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, col, avg

# Krok 0: Filtruj tylko loty ze statusem Arrived
arrived_df = spark_df_combined.filter(col("status") == "Arrived")

# Oblicz liczbę unikalnych lotów na lotnisko
arrived_df = arrived_df.groupBy("airport_name").agg(countDistinct("flight_id").alias("number_of_flights"))

# Krok 1: Filtruj wiersze z wartościami NULL w kolumnach actual_arrival i scheduled_arrival
spark_df_combined_filtered = spark_df_combined.dropna(subset=["actual_arrival", "scheduled_arrival"])

# Oblicz różnicę pomiędzy rzeczywistym a planowanym czasem przylotu
srednie_opoznienie_na_lot = spark_df_combined_filtered.withColumn("opoznienie", ( col("actual_arrival") - col("scheduled_arrival") ) / 60)

# Oblicz średnie opóźnienie na lotnisko
srednie_opoznienie_na_lot = srednie_opoznienie_na_lot.groupBy("airport_name").agg(avg("opoznienie").alias("srednie_opoznienie_na_opozniony_lot_(min)"))

# Krok 3: Połącz oba DataFrame na podstawie odpowiednich nazw kolumn
result_df = arrived_df.join(srednie_opoznienie_na_lot, arrived_df.airport_name == srednie_opoznienie_na_lot.airport_name, how="left").drop(srednie_opoznienie_na_lot.airport_name)

# Krok 4: Posortuj wyniki według średniego opóźnienia na lot w porządku malejącym
result_df = result_df.orderBy(col("srednie_opoznienie_na_opozniony_lot_(min)").desc())

# Pokaż wynik
result_df.show()


                                                                                

+-----------------+--------------------+-----------------------------------+
|number_of_flights|        airport_name|srednie_opoznienie_na_opozniony_lot|
+-----------------+--------------------+-----------------------------------+
|               52|{"en": "Nyagan Ai...|                   17.1729873586161|
|              103|{"en": "Ugolny Ai...|                 17.096651371668457|
|              522|{"en": "Nadym Air...|                  16.13683923705722|
|              887|{"en": "Khrabrovo...|                 15.933899752369568|
|             1093|{"en": "Kazan Int...|                  15.84957159426797|
|              315|{"en": "Nalchik A...|                 15.617204863380834|
|              262|{"en": "Ust-Ilims...|                 15.419891801914273|
|              785|{"en": "Begishevo...|                 15.273426667263049|
|              341|{"en": "Kursk Eas...|                 15.167074663402692|
|             1078|{"en": "Nizhnevar...|                 15.081087301769973|