In [46]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import *

In [101]:
spark = SparkSession \
    .builder \
    .getOrCreate()
flights1 = spark.read.option('header', True).parquet('workspace/query1') \
    .withColumn("departure_date", col("departure_date")).withColumn("departure_time", col("departure_time")) \
    .fillna({"departure_time": "2015-01-01", "departure_time": "00:00:00"})
flights1.show()



+--------------+--------------+-------+-------------+--------------------------+---------------------------+-----------------------------+
|departure_date|departure_time|airline|flight_number|daily_flight_serial_number|airline_daily_flights_count|time_since_previous_departure|
+--------------+--------------+-------+-------------+--------------------------+---------------------------+-----------------------------+
|    2015-01-01|      00:00:00|     AA|         1285|                        59|                       1473|                            0|
|    2015-01-01|      00:00:00|     MQ|         3263|                       295|                        929|                            0|
|    2015-01-01|      00:00:00|     OO|         5445|                        33|                       1487|                            0|
|    2015-01-01|      06:05:00|     UA|         1224|                       840|                       1285|                            2|
|    2015-01-01|      06:12

In [287]:
year_month = spark.udf\
    .register("year_month", lambda year, month: f"{year}-{month}")
airlines_csv = spark.read.option('header', True).csv('workspace/airlines.csv')
airlines = spark.read.option("mode", "PERMISSIVE").option("multiline", True).json('workspace/airlines.json')\
    .withColumn("year_month", year_month(col("Time.Year"), col("Time.Month")))\
    .withColumn("number_of_delays_for_airport", col("Statistics.Flights.Diverted") + col("Statistics.Flights.Cancelled") + col("Statistics.Flights.Delayed"))\
    .withColumn("airline_name", explode(split(col("Statistics.Carriers.Names"), ',')))\
    .withColumn("airport_code", col("Airport.Code"))
cond = [airlines.airline_name == airlines_csv.AIRLINE]
airlines_new = airlines.join(airlines_csv, cond, 'left').select('year_month', 'airport_code', 'number_of_delays_for_airport', col('IATA_CODE').alias('airline_iata_code'), 'airline_name').sort('number_of_delays_for_airport')\
    .fillna({'airline_iata_code': 'N/A'})
airlines_new.write.parquet('workspace/new_airlines')


In [288]:
year_month = spark.udf\
    .register("year_month", lambda year, month: f"{year}-{month}")
cond1 = col("DEPARTURE_DELAY") > 0
cond2 = col("ARRIVAL_DELAY") > 0
cond3 = col("CANCELLED") == 1
cond4 = col("DEPARTURE_DELAY") <= 0
cond5 = col("DIVERTED") == 1
flights_csv = spark.read.option('header', True).csv('workspace/flights.csv')\
    .withColumn("year_month", year_month(col("YEAR"), col("MONTH")))\
    .withColumn("delay_origin", when((cond1&cond2|cond3), 1))\
    .withColumn("delay_dest", when((cond4&cond2|cond5), 1))
origin = flights_csv.select('year_month', 'delay_origin', 'AIRLINE', 'ORIGIN_AIRPORT').groupBy('year_month', 'AIRLINE', 'ORIGIN_AIRPORT').agg(sum("delay_origin").alias("delay_origin")).orderBy('year_month', 'AIRLINE')
dest = flights_csv.select('year_month', 'delay_dest' , 'AIRLINE', 'DESTINATION_AIRPORT').groupBy('year_month', 'AIRLINE', 'DESTINATION_AIRPORT').agg(sum("delay_dest").alias("delay_dest")).orderBy('year_month', 'AIRLINE')
dest_renamed = dest.withColumnRenamed('AIRLINE', 'AIRLINE1').withColumnRenamed('year_month', 'month_year')
origin_renamed = origin.withColumnRenamed('AIRLINE', 'AIRLINE1').withColumnRenamed('year_month', 'month_year')
dest_renamed.write.parquet('workspace/dest')
origin.write.parquet('workspace/origin')


In [290]:
new_airlines = spark.read.option('header', True).parquet('workspace/new_airlines')
new_origin = spark.read.option('header', True).parquet('workspace/origin')
new_dest = spark.read.option('header', True).parquet('workspace/dest')

In [291]:
new_origin.printSchema()

root
 |-- year_month: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- delay_origin: long (nullable = true)



In [292]:
new_dest.printSchema()

root
 |-- month_year: string (nullable = true)
 |-- AIRLINE1: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- delay_dest: long (nullable = true)



In [301]:
cond1 = new_origin.AIRLINE == new_dest.AIRLINE1
cond2 = new_origin.year_month == new_dest.month_year
cond3 = new_origin.ORIGIN_AIRPORT == new_dest.DESTINATION_AIRPORT
full = new_origin.join(new_dest, cond1&cond2&cond3, 'outer').orderBy('year_month', 'AIRLINE')
new_full = full.withColumn("year_month_", when(full.year_month.isNull(), full.month_year).otherwise(full.year_month))\
    .withColumn("airline_", when(full.AIRLINE.isNull(), full.AIRLINE1).otherwise(full.AIRLINE))\
    .withColumn("airport_", when(full.ORIGIN_AIRPORT.isNull(), full.DESTINATION_AIRPORT).otherwise(full.ORIGIN_AIRPORT))\
    .withColumn("delay_", col('delay_dest') + col('delay_origin'))

In [309]:
new_full = new_full.select('year_month_', 'airline_', 'airport_', 'delay_').orderBy('year_month_', 'airport_')
new_full.write.parquet('workspace/new_full')

In [310]:
new_full = spark.read.option('header', True).parquet('workspace/new_full')

In [319]:
cond1 = new_airlines.airline_iata_code == new_full.airline_
cond2 = new_airlines.year_month == new_full.year_month_
cond3 = new_airlines.airport_code == new_full.airport_
res = new_airlines.join(new_full, cond1&cond2&cond3, 'left')
res_ = res.select('*')\
.groupBy('year_month', 'airport_code', 'airline_iata_code', 'number_of_delays_for_airport', 'airline_name')\
.agg(sum("delay_").alias("number_of_delays_for_airline_in_airport"))
res.write.parquet('workspace/task2_parquet')
res.write.csv('workspace/task2_csv')