In [262]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, coalesce, concat, lit, explode, row_number, rank, sum
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import json

In [149]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, FloatType

In [150]:
spark = SparkSession.builder.getOrCreate()

In [151]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", StringType(), True),
    StructField("day", StringType(), True),
    StructField("dep_time", FloatType(), True),
    StructField("sched_dep_time", FloatType(), True),
    StructField("dep_delay", FloatType(), True),
    StructField("arr_time", FloatType(), True),
    StructField("sched_arr_time", FloatType(), True),
    StructField("arr_delay", FloatType(), True),
    StructField("carrier", StringType(), True),
    StructField("flight", StringType(), True),
    StructField("tailnum", StringType(), True),
    StructField("origin", StringType(), True),
    StructField("dest", StringType(), True),
    StructField("air_time", FloatType(), True),
    StructField("distance", FloatType(), True),
    StructField("hour", IntegerType(), True),
    StructField("minute", IntegerType(), True),
    StructField("time_hour", StringType(), True),
    StructField("name", StringType(), True)
])

In [152]:
df = spark.read.csv("/content/flights.csv", header=True, schema=schema)

In [153]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: float (nullable = true)
 |-- sched_dep_time: float (nullable = true)
 |-- dep_delay: float (nullable = true)
 |-- arr_time: float (nullable = true)
 |-- sched_arr_time: float (nullable = true)
 |-- arr_delay: float (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: string (nullable = true)
 |-- name: string (nullable = true)



In [154]:
df.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|         515.0|      2.0|   830.0|         819.0|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|  1400.0|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|         529.0|      4.0|   850.0|         830.0|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|  1416.0|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [155]:
df = df[['id','name','year','month','day','origin','distance','hour','minute']]

In [156]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- distance: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [157]:
df.show(truncate=False)

+---+------------------------+----+-----+---+------+--------+----+------+
|id |name                    |year|month|day|origin|distance|hour|minute|
+---+------------------------+----+-----+---+------+--------+----+------+
|0  |United Air Lines Inc.   |2013|1    |1  |EWR   |1400.0  |5   |15    |
|1  |United Air Lines Inc.   |2013|1    |1  |LGA   |1416.0  |5   |29    |
|2  |American Airlines Inc.  |2013|1    |1  |JFK   |1089.0  |5   |40    |
|3  |JetBlue Airways         |2013|1    |1  |JFK   |1576.0  |5   |45    |
|4  |Delta Air Lines Inc.    |2013|1    |1  |LGA   |762.0   |6   |0     |
|5  |United Air Lines Inc.   |2013|1    |1  |EWR   |719.0   |5   |58    |
|6  |JetBlue Airways         |2013|1    |1  |EWR   |1065.0  |6   |0     |
|7  |ExpressJet Airlines Inc.|2013|1    |1  |LGA   |229.0   |6   |0     |
|8  |JetBlue Airways         |2013|1    |1  |JFK   |944.0   |6   |0     |
|9  |American Airlines Inc.  |2013|1    |1  |LGA   |733.0   |6   |0     |
|10 |JetBlue Airways         |2013|1  

# Let's find the airline with the most flights.

In [158]:
airline_with_the_most_flights = df.groupBy(
    col("name").alias("airline")
).agg(
    count("*").alias("count_of_flights_by_airline")
).orderBy(
    col("count_of_flights_by_airline").desc()
)

# Let's find the longest distance the planes travels.

In [159]:
longest_distance = df.groupBy(
    col("name").alias("AirLine")
).agg(
    max(col("distance")).alias("longest_distance")
).orderBy(col("longest_distance").desc())

In [160]:
df = df.withColumn(
    "minutes_total",
    (col("hour").cast("int") * 60) + col("minute").cast("int")
)

# Let's find the flight that take the most time.

In [161]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+
| id|                name|year|month|day|origin|distance|hour|minute|minutes_total|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345|
|  4|Delta Air Lines Inc.|2013|    1|  1|   LGA|   762.0|   6|     0|          360|
|  5|United Air Lines ...|2013|    1|  1|   EWR|   719.0|   5|    58|          358|
|  6|     JetBlue Airways|2013|    1|  1|   EWR|  1065.0|   6|     0|          360|
|  7|ExpressJet Airlin...|2013|    1|  1|   LGA|   229.0|   6|     0|          360|
|  8|     JetBlue Airways|2013|    1|  1|   JFK|   944.0|   6|     0|       

# To find the flight route that takes the most time we need to add a new column (dest).

In [162]:
df2 = spark.read.csv("/content/flights.csv", header=True, schema=schema)

In [163]:
df2 = df2.select(col("id"),col("dest"))

In [164]:
df2.show()

+---+----+
| id|dest|
+---+----+
|  0| IAH|
|  1| IAH|
|  2| MIA|
|  3| BQN|
|  4| ATL|
|  5| ORD|
|  6| FLL|
|  7| IAD|
|  8| MCO|
|  9| ORD|
| 10| PBI|
| 11| TPA|
| 12| LAX|
| 13| SFO|
| 14| DFW|
| 15| BOS|
| 16| LAS|
| 17| FLL|
| 18| ATL|
| 19| PBI|
+---+----+
only showing top 20 rows



In [165]:
df = df.join(df2, on="id", how="inner")

In [166]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+
| id|                name|year|month|day|origin|distance|hour|minute|minutes_total|dest|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|
|  4|Delta Air Lines Inc.|2013|    1|  1|   LGA|   762.0|   6|     0|          360| ATL|
|  5|United Air Lines ...|2013|    1|  1|   EWR|   719.0|   5|    58|          358| ORD|
|  6|     JetBlue Airways|2013|    1|  1|   EWR|  1065.0|   6|     0|          360| FLL|
|  7|ExpressJet Airlin...|2013|    1|  1|   LGA|   229.0|   6|     0|          360| IAD|
|  8|     JetBlue Air

In [167]:
df = df.withColumn(
    "route",
    concat(col("origin"), lit("-"),col("dest"))
)

In [168]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+
| id|                name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|EWR-IAH|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|LGA-IAH|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|JFK-BQN|
|  4|Delta Air Lines Inc.|2013|    1|  1|   LGA|   762.0|   6|     0|          360| ATL|LGA-ATL|
|  5|United Air Lines ...|2013|    1|  1|   EWR|   719.0|   5|    58|          358| ORD|EWR-ORD|
|  6|     JetBlue Airways|2013|    1|  1|   EWR|  1065.0|   6|     0|          360| FLL|EWR-FLL|
|  7|ExpressJet Airlin...|2013

# Codes alone don't tell us anything.

We need to find a match between the airport code and the country it's in.

In [201]:
airports = spark.read.csv("/content/airports.csv",header=True)
airports = airports.drop(col("Information"))

In [204]:
airports = airports.select(col("IATA"), col("Country"))

In [205]:
airports.show()

+----+--------------------+
|IATA|             Country|
+----+--------------------+
| AAA|    French Polynesia|
| AAB|           Australia|
| AAC|               Egypt|
| AAD|             Somalia|
| AAE|             Algeria|
| AAF|United States of ...|
| AAG|              Brazil|
| AAH|             Germany|
| AAI|              Brazil|
| AAJ|            Suriname|
| AAK|            Kiribati|
| AAL|             Denmark|
| AAM|        South Africa|
| AAN|United Arab Emirates|
| AAO|           Venezuela|
| AAP|           Indonesia|
| AAQ|              Russia|
| AAR|             Denmark|
| AAS|           Indonesia|
| AAT|               China|
+----+--------------------+
only showing top 20 rows



In [206]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+
| id|                name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|EWR-IAH|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|LGA-IAH|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|JFK-BQN|
|  4|Delta Air Lines Inc.|2013|    1|  1|   LGA|   762.0|   6|     0|          360| ATL|LGA-ATL|
|  5|United Air Lines ...|2013|    1|  1|   EWR|   719.0|   5|    58|          358| ORD|EWR-ORD|
|  6|     JetBlue Airways|2013|    1|  1|   EWR|  1065.0|   6|     0|          360| FLL|EWR-FLL|
|  7|ExpressJet Airlin...|2013

In [207]:
df = df.withColumnRenamed("name","airline_name")

In [209]:
df = df.join(
    airports,
    df.origin == airports.IATA,
    "inner"
)

In [212]:
df = df.withColumnRenamed("Country","country_origin")

In [215]:
df = df.drop(col("IATA"))

In [216]:
df = df.join(
    airports,
    df.dest == airports.IATA,
    "inner"
)

In [217]:
df = df.drop(col("IATA"))

In [220]:
df = df.withColumnRenamed("Country","country_dest")

In [221]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
| id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|EWR-IAH|United States of ...|United States of ...|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|LGA-IAH|United States of ...|United States of ...|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|United States of ...|United States of ...|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|JFK-BQN|United States of ...|         Puerto Rico|
|  4|Delta Air Lines Inc.|2

In [225]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
| id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|EWR-IAH|United States of ...|United States of ...|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|LGA-IAH|United States of ...|United States of ...|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|United States of ...|United States of ...|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|JFK-BQN|United States of ...|         Puerto Rico|
|  4|Delta Air Lines Inc.|2

In [236]:
df.select(col("airline_name"),col("minutes_total")).distinct().orderBy(col("minutes_total").desc()).show()

+--------------------+-------------+
|        airline_name|minutes_total|
+--------------------+-------------+
|Delta Air Lines Inc.|         1439|
|     JetBlue Airways|         1439|
|     JetBlue Airways|         1438|
|Delta Air Lines Inc.|         1435|
|     JetBlue Airways|         1435|
|     JetBlue Airways|         1432|
|United Air Lines ...|         1425|
|United Air Lines ...|         1419|
|     JetBlue Airways|         1410|
|     JetBlue Airways|         1395|
|     JetBlue Airways|         1385|
|United Air Lines ...|         1380|
|     JetBlue Airways|         1380|
|     JetBlue Airways|         1378|
|     JetBlue Airways|         1375|
|     JetBlue Airways|         1373|
|     JetBlue Airways|         1371|
|     JetBlue Airways|         1370|
|     JetBlue Airways|         1369|
|     JetBlue Airways|         1366|
+--------------------+-------------+
only showing top 20 rows



In [237]:
df.show()

+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
| id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|
+---+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+
|  0|United Air Lines ...|2013|    1|  1|   EWR|  1400.0|   5|    15|          315| IAH|EWR-IAH|United States of ...|United States of ...|
|  1|United Air Lines ...|2013|    1|  1|   LGA|  1416.0|   5|    29|          329| IAH|LGA-IAH|United States of ...|United States of ...|
|  2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|United States of ...|United States of ...|
|  3|     JetBlue Airways|2013|    1|  1|   JFK|  1576.0|   5|    45|          345| BQN|JFK-BQN|United States of ...|         Puerto Rico|
|  4|Delta Air Lines Inc.|2

In [257]:
windows_spec = Window.partitionBy("airline_name").orderBy(col("distance").desc())
df_ranked = df.withColumn("distance_rank", rank().over(window_spec))
df_ranked = df_ranked.filter(col("distance_rank") == 1)
df_ranked.filter(col("month") == '12').show()

+------+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+-------------+
|    id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|distance_rank|
+------+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+-------------+
| 83247|Alaska Airlines Inc.|2013|   12|  1|   EWR|  2402.0|   7|     5|          425| SEA|EWR-SEA|United States of ...|United States of ...|            1|
| 84240|Alaska Airlines Inc.|2013|   12|  2|   EWR|  2402.0|   7|     5|          425| SEA|EWR-SEA|United States of ...|United States of ...|            1|
| 85257|Alaska Airlines Inc.|2013|   12|  3|   EWR|  2402.0|   7|     5|          425| SEA|EWR-SEA|United States of ...|United States of ...|            1|
| 88138|Alaska Airlines Inc.|2013|   12|  6|   EWR|  2402.0|   7

In [245]:
window_spec = Window.partitionBy("airline_name").orderBy(col("minutes_total"))

df_ranked = df.withColumn("flight_rank", row_number().over(window_spec))

df_ranked = df_ranked.filter(col("flight_rank") == 1)

df_ranked.show()

+------+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+-----------+
|    id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|flight_rank|
+------+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+-----------+
|   858|AirTran Airways C...|2013|    1|  2|   LGA|   762.0|   6|     0|          360| ATL|LGA-ATL|United States of ...|United States of ...|          1|
| 58577|Alaska Airlines Inc.|2013|   11|  4|   EWR|  2402.0|   7|     5|          425| SEA|EWR-SEA|United States of ...|United States of ...|          1|
|     2|American Airlines...|2013|    1|  1|   JFK|  1089.0|   5|    40|          340| MIA|JFK-MIA|United States of ...|United States of ...|          1|
|     4|Delta Air Lines Inc.|2013|    1|  1|   LGA|   762.0|   6|     0|    

In [261]:
df.groupBy(
    col("airline_name")
).agg(
    count("*") # funkcja agregująca
).show()

+--------------------+--------+
|        airline_name|count(1)|
+--------------------+--------+
|   Endeavor Air Inc.|   18460|
|      Virgin America|    5162|
|SkyWest Airlines ...|      32|
|United Air Lines ...|   58665|
|Frontier Airlines...|     685|
|Southwest Airline...|   12275|
|ExpressJet Airlin...|   54173|
|     JetBlue Airways|   54635|
|AirTran Airways C...|    3260|
|     US Airways Inc.|   20536|
|           Envoy Air|   26397|
|Hawaiian Airlines...|     342|
|Alaska Airlines Inc.|     714|
|Delta Air Lines Inc.|   48110|
|  Mesa Airlines Inc.|     601|
|American Airlines...|   32729|
+--------------------+--------+



In [264]:
df.groupBy(
    col("airline_name")
).agg(
    sum(col("distance")).alias("sum_dist")
).show()

+--------------------+-----------+
|        airline_name|   sum_dist|
+--------------------+-----------+
|   Endeavor Air Inc.|  9788152.0|
|      Virgin America|1.2902327E7|
|SkyWest Airlines ...|    16026.0|
|United Air Lines ...|8.9705524E7|
|Frontier Airlines...|  1109700.0|
|Southwest Airline...|1.2229203E7|
|ExpressJet Airlin...|3.0498951E7|
|     JetBlue Airways|5.8384137E7|
|AirTran Airways C...|  2167344.0|
|     US Airways Inc.|1.1365778E7|
|           Envoy Air|1.5033955E7|
|Hawaiian Airlines...|  1704186.0|
|Alaska Airlines Inc.|  1715028.0|
|Delta Air Lines Inc.|5.9507317E7|
|  Mesa Airlines Inc.|   225395.0|
|American Airlines...|4.3864584E7|
+--------------------+-----------+



In [281]:
window_speed = Window.partitionBy("airline_name").orderBy(col("distance").desc())
df_with_win_speed = df.withColumn("sum_dist",sum(col("distance")).over(window_speed))

+---+-----------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+---------+
| id|     airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest| sum_dist|
+---+-----------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+---------+
|116|Endeavor Air Inc.|2013|    1|  1|   JFK|  1029.0|   8|    10|          490| MSP|JFK-MSP|United States of ...|United States of ...|9788152.0|
|427|Endeavor Air Inc.|2013|    1|  1|   JFK|   228.0|  15|     0|          900| IAD|JFK-IAD|United States of ...|United States of ...|9788152.0|
|428|Endeavor Air Inc.|2013|    1|  1|   JFK|   301.0|  14|    55|          895| BUF|JFK-BUF|United States of ...|United States of ...|9788152.0|
|433|Endeavor Air Inc.|2013|    1|  1|   JFK|   209.0|  15|     0|          900| SYR|JFK-SYR|United States of ...|United Sta

In [280]:
df_with_speed.show(5)

+----+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+--------+
|  id|        airline_name|year|month|day|origin|distance|hour|minute|minutes_total|dest|  route|      country_origin|        country_dest|sum_dist|
+----+--------------------+----+-----+---+------+--------+----+------+-------------+----+-------+--------------------+--------------------+--------+
| 123|AirTran Airways C...|2013|    1|  1|   LGA|   762.0|   8|    10|          490| ATL|LGA-ATL|United States of ...|United States of ...|       1|
|  78|Alaska Airlines Inc.|2013|    1|  1|   EWR|  2402.0|   7|    25|          445| SEA|EWR-SEA|United States of ...|United States of ...|       1|
|  94|American Airlines...|2013|    1|  1|   JFK|  2586.0|   7|    45|          465| SFO|JFK-SFO|United States of ...|United States of ...|       1|
|  55|Delta Air Lines Inc.|2013|    1|  1|   JFK|  2586.0|   7|     0|          420| SFO|JFK-SFO|United St