In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark import  SQLContext
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName('parquettodataframe').getOrCreate()

In [5]:
df = spark.read.parquet('flights.parquet')

### **Задача №1**
Постройте сводную таблицу отображающую топ 10 рейсов по коду рейса (TAIL_NUMBER) и числу вылетов за все время. Отсеките значения без указания кода рейса.

In [32]:
top_flights = df \
        .where(df['TAIL_NUMBER'].isNotNull()) \
        .groupBy(df['TAIL_NUMBER']) \
        .agg(F.count('FLIGHT_NUMBER')) \
        .select(F.col('TAIL_NUMBER'),
                F.col('count(FLIGHT_NUMBER)')) \
        .orderBy(F.col('count(FLIGHT_NUMBER)').desc())\
        .limit(10) \
        .withColumnRenamed('count(FLIGHT_NUMBER)', 'count')

In [33]:
top_flights.show(10)

+-----------+-----+
|TAIL_NUMBER|count|
+-----------+-----+
|     N488HA|  212|
|     N484HA|  193|
|     N492HA|  181|
|     N491HA|  180|
|     N477HA|  175|
|     N480HA|  173|
|     N493HA|  173|
|     N483HA|  169|
|     N490HA|  168|
|     N478HA|  165|
+-----------+-----+



In [40]:
#top_flights.write.parquet('results.parquet')

### **Задача №2**
Найдите топ 10 авиамаршрутов (ORIGIN_AIRPORT, DESTINATION_AIRPORT) по наибольшему числу рейсов, а так же посчитайте среднее время в полете (AIR_TIME).

**Требуемые поля:**  

|Колонка| Описание|
|---|---|
|ORIGIN_AIRPORT|Аэропорт вылета|
|DESTINATION_AIRPORT|Аэропорт прибытия|
|tail_count|Число рейсов по маршруту (TAIL_NUMBER)|
|avg_air_time|среднее время в небе по маршруту|

In [55]:
top_routes = df \
        .groupBy(df['ORIGIN_AIRPORT'], df.DESTINATION_AIRPORT) \
        .agg(F.count('TAIL_NUMBER'), F.avg('AIR_TIME')) \
        .select(F.col('ORIGIN_AIRPORT'),
                F.col('DESTINATION_AIRPORT'),
                F.col('count(TAIL_NUMBER)'),
                F.col('avg(AIR_TIME)').alias('avg_air_time')) \
        .orderBy(F.col('count(TAIL_NUMBER)').desc())\
        .limit(10) \
        .withColumnRenamed('count(TAIL_NUMBER)', 'tail_count')

In [56]:
top_routes.show()

+--------------+-------------------+----------+------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|tail_count|      avg_air_time|
+--------------+-------------------+----------+------------------+
|           LAX|                SFO|       693|  54.8698224852071|
|           SFO|                LAX|       646| 55.98098256735341|
|           LAX|                JFK|       593|290.49658703071674|
|           JFK|                LAX|       580|330.28745644599303|
|           LAX|                LAS|       515|42.916179337231966|
|           LAS|                LAX|       513|42.473684210526315|
|           ORD|                LGA|       503| 98.01022494887526|
|           LGA|                ORD|       471|115.42701525054466|
|           LGA|                ATL|       438|112.55421686746988|
|           OGG|                HNL|       435|23.379310344827587|
+--------------+-------------------+----------+------------------+



### **Задача №3**  
Аналитик попросил определить список аэропортов у которых самые больше проблемы с задержкой на вылет рейса. Для этого необходимо вычислить среднее, минимальное, максимальное время задержки и выбрать аэропорты только те где максимальная задержка (DEPARTURE_DELAY) 1000 секунд и больше. Дополнительно посчитать корреляцию между временем задержки и днем недели (DAY_OF_WEEK)

**Требуемые поля:**  

|Поле|Описание|
|-|-|
|ORIGIN_AIRPORT|Код аэропорта отправления|
|avg_delay|Среднее время задержки для аэропорта|
|min_delay|Минимальное время задержки для аэропорта|
|max_delay|Максимальное время задержки для аэропорта|
|corr_delay2day_of_week|Корреляция между временем задержки и днем недели|

In [79]:
worse_airports = df \
        .groupBy(df['ORIGIN_AIRPORT']) \
        .agg(F.avg('DEPARTURE_DELAY').alias('avg_delay'), 
             F.min('DEPARTURE_DELAY').alias('min_delay'),
             F.max('DEPARTURE_DELAY').alias('max_delay'),
             F.corr('DEPARTURE_DELAY','DAY_OF_WEEK').alias('corr_delay2day_of_week')) \
        .select(F.col('ORIGIN_AIRPORT'),
                F.col('avg_delay'),
                F.col('min_delay'),
                F.col('max_delay'),
                F.col('corr_delay2day_of_week')) \
        .filter(F.col('max_delay')>1000) \
        .orderBy(F.col('max_delay').desc())

In [80]:
worse_airports.show()

+--------------+------------------+---------+---------+----------------------+
|ORIGIN_AIRPORT|         avg_delay|min_delay|max_delay|corr_delay2day_of_week|
+--------------+------------------+---------+---------+----------------------+
|         11612|  68.9090909090909|    -13.0|   1544.0|   -0.2456865414476189|
|           MEM|10.732793522267206|    -23.0|   1291.0|  -0.02786418460192...|
|           CMH| 9.476110645431685|    -20.0|   1181.0|  0.008794367556119355|
|           SLC| 4.323445171849428|    -23.0|   1155.0|  0.002307544458450...|
|           STT| 24.36283185840708|    -25.0|   1131.0|  0.010873233344995976|
|           EWR|13.905347264904732|    -19.0|   1119.0|  -0.01418841503228...|
|           ROC| 10.03116883116883|    -20.0|   1113.0|  -0.02529636399949557|
|           BWI|12.397660818713451|    -19.0|   1102.0|  -0.02138018634258...|
|           PBI| 13.60600706713781|    -15.0|   1072.0|  -0.03893203165548...|
|           SFO| 11.37895602137279|    -22.0|   1054

### **Задача №4**  
Для дашборда с отображением выполненных рейсов требуется собрать таблицу на основе наших данных.  
Никакой дополнительной фильтрации данных не требуется.

#### **Требуемые поля:**  

|Поле|Описание|
|-|-|
|AIRLINE_NAME|	Название авиалинии (airlines.AIRLINE)|
|TAIL_NUMBER|	Номер рейса (flights.TAIL_NUMBER)|
|ORIGIN_COUNTRY|	Страна отправления (airports.COUNTRY)|
|ORIGIN_AIRPORT_NAME|	Полное название аэропорта отправления (airports.AIRPORT)|
|ORIGIN_LATITUDE|	Широта аэропорта отправления (airports.LATITUDE)|
|ORIGIN_LONGITUDE|	Долгота аэропорта отправления (airports.LONGITUDE)|
|DESTINATION_COUNTRY|	Страна прибытия (airports.COUNTRY)|
|DESTINATION_AIRPORT_NAME|	Полное название аэропорта прибытия (airports.AIRPORT)|
|DESTINATION_LATITUDE|	Широта аэропорта прибытия (airports.LATITUDE)|
|DESTINATION_LONGITUDE|	Долгота аэропорта прибытия (airports.LONGITUDE)|

**Параметры запуска задачи:**  
flights_path - путь к файлу с данными о авиарейсах  
airlines_path - путь к файлу с данными о авиалиниях  
airports_path - путь к файлу с данными о аэропортах  
esult_path - путь куда будет сохранен результат

IATA_CODE	String	Идентификатор авиалинии airlines  
AIRLINE	String	Код авиалиний flights  

IATA_CODE	String	Идентификатор аэропорта airports  
ORIGIN_AIRPORT	String	Код аэропорта отправления flights  

In [39]:
df_f = spark.read.parquet('flights.parquet')
df_a = spark.read.parquet('airports.parquet')
df_l = spark.read.parquet('airlines.parquet')

df = df_f \
    .select(F.col('AIRLINE').alias('IATA_CODE'),
            F.col('TAIL_NUMBER'),
            F.col('ORIGIN_AIRPORT'),
            F.col('DESTINATION_AIRPORT'))

df = df.join(df_l, on = 'IATA_CODE', how = 'inner')

df = df.drop('IATA_CODE')

df = df.withColumnRenamed('ORIGIN_AIRPORT', 'IATA_CODE')

df = df.join(df_a, on = 'IATA_CODE', how = 'inner')

df = df \
    .select(F.col('AIRLINE').alias('AIRLINE_NAME'),
            F.col('TAIL_NUMBER'),
            F.col('COUNTRY').alias('ORIGIN_COUNTRY'),
            F.col('AIRPORT').alias('ORIGIN_AIRPORT_NAME'),
            F.col('LATITUDE').alias('ORIGIN_LATITUDE'),
            F.col('LONGITUDE').alias('ORIGIN_LONGITUDE'),
            F.col('DESTINATION_AIRPORT').alias('IATA_CODE'))

df = df.join(df_a, on = 'IATA_CODE', how = 'inner')

df = df \
    .select(F.col('AIRLINE_NAME'),
            F.col('TAIL_NUMBER'),
            F.col('ORIGIN_COUNTRY'),
            F.col('ORIGIN_AIRPORT_NAME'),
            F.col('ORIGIN_LATITUDE'),
            F.col('ORIGIN_LONGITUDE'),
            F.col('COUNTRY').alias('DESTINATION_COUNTRY'),
            F.col('AIRPORT').alias('DESTINATION_AIRPORT_NAME'),
            F.col('LATITUDE').alias('DESTINATION_LATITUDE'),
            F.col('LONGITUDE').alias('DESTINATION_LONGITUDE'))

In [40]:
df.count()

266474

In [41]:
df.show(5)

+--------------------+-----------+--------------+--------------------+---------------+----------------+-------------------+------------------------+--------------------+---------------------+
|        AIRLINE_NAME|TAIL_NUMBER|ORIGIN_COUNTRY| ORIGIN_AIRPORT_NAME|ORIGIN_LATITUDE|ORIGIN_LONGITUDE|DESTINATION_COUNTRY|DESTINATION_AIRPORT_NAME|DESTINATION_LATITUDE|DESTINATION_LONGITUDE|
+--------------------+-----------+--------------+--------------------+---------------+----------------+-------------------+------------------------+--------------------+---------------------+
|American Airlines...|     N787AA|           USA|John F. Kennedy I...|       40.63975|       -73.77893|                USA|    Los Angeles Inter...|            33.94254|           -118.40807|
|American Airlines...|     N794AA|           USA|Los Angeles Inter...|       33.94254|      -118.40807|                USA|    John F. Kennedy I...|            40.63975|            -73.77893|
|American Airlines...|     N3ALAA|      

### **Задача №5**  
Отдел аналитики интересует статистика по компаниям о возникших проблемах. Пришла задача построить сводную таблицу о всех авиакомпаниях содержащую следующие данные:

|Колонка|	Описание|
|-|-|
|AIRLINE_NAME|	название авиалинии [airlines.AIRLINE]|
|correct_count|	число выполненных рейсов|
|diverted_count| 	число рейсов выполненных с задержкой|
|cancelled_count|	число отмененных рейсов|
|avg_distance|	средняя дистанция рейсов|
|avg_air_time|	среднее время в небе|
|airline_issue_count|	число задержек из-за проблем с самолетом [CANCELLATION_REASON]|
|weather_issue_count|	число задержек из-за погодных условий [CANCELLATION_REASON]|
|nas_issue_count|	число задержек из-за проблем NAS [CANCELLATION_REASON]|
|security_issue_count|	число задержек из-за службы безопасности [CANCELLATION_REASON]|

In [44]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [45]:
airlines_schema = StructType([
    StructField('IATA_CODE', StringType(), True),
    StructField('AIRLINE', StringType(), True)
])

flights_schema = StructType([
    StructField('YEAR', IntegerType(), True),
    StructField('MONTH', IntegerType(), True),
    StructField('DAY', IntegerType(), True),
    StructField('DAY_OF_WEEK', IntegerType(), True),
    StructField('AIRLINE', StringType(), True),
    StructField('FLIGHT_NUMBER', IntegerType(), True),
    StructField('TAIL_NUMBER', StringType(), True),
    StructField('ORIGIN_AIRPORT', StringType(), True),
    StructField('DESTINATION_AIRPORT', StringType(), True),
    StructField('SCHEDULED_DEPARTURE', IntegerType(), True),
    StructField('DEPARTURE_TIME', IntegerType(), True),
    StructField('DEPARTURE_DELAY', DoubleType(), True),
    StructField('TAXI_OUT', IntegerType(), True),
    StructField('WHEELS_OFF', IntegerType(), True),
    StructField('SCHEDULED_TIME', IntegerType(), True),
    StructField('ELAPSED_TIME', IntegerType(), True),
    StructField('AIR_TIME', DoubleType(), True),
    StructField('DISTANCE', IntegerType(), True),
    StructField('WHEELS_ON', IntegerType(), True),
    StructField('TAXI_IN', IntegerType(), True),
    StructField('SCHEDULED_ARRIVAL', IntegerType(), True),
    StructField('ARRIVAL_TIME', IntegerType(), True),
    StructField('ARRIVAL_DELAY', IntegerType(), True),
    StructField('DIVERTED', IntegerType(), True),
    StructField('CANCELLED', IntegerType(), True),
    StructField('CANCELLATION_REASON', StringType(), True),
    StructField('AIR_SYSTEM_DELAY', IntegerType(), True),
    StructField('SECURITY_DELAY', IntegerType(), True),
    StructField('AIRLINE_DELAY', IntegerType(), True),
    StructField('LATE_AIRCRAFT_DELAY', IntegerType(), True),
    StructField('WEATHER_DELAY', IntegerType(), True)
])

In [57]:
    airlines = spark.read \
        .option("header", "true") \
        .schema(airlines_schema) \
        .parquet('airlines.parquet')

    airlines = airlines \
        .select(F.col('IATA_CODE'),
                F.col('AIRLINE').alias('AIRLINE_NAME'))

    flights = spark.read \
        .option("header", "true") \
        .schema(flights_schema) \
        .parquet('flights.parquet')

    flights = flights \
        .groupby(F.col('AIRLINE')) \
        .agg(F.sum(F.when((F.col('CANCELLED')!=1) & (F.col('DIVERTED')!=1), 1).otherwise(0)).alias('correct_count'),
             F.sum(F.when(F.col('DIVERTED') == 1, 1).otherwise(0)).alias('diverted_count'),
             F.sum(F.when(F.col('CANCELLED') == 1, 1).otherwise(0)).alias('cancelled_count'),
             F.avg(F.when(flights['DISTANCE'].isNotNull(), flights['DISTANCE'])).alias('avg_distance'),
             F.avg(F.when(flights['AIR_TIME'].isNotNull(), flights['AIR_TIME'])).alias('avg_air_time'),
             F.sum(F.when(F.col('CANCELLATION_REASON') == 'A', 1).otherwise(0)).alias('airline_issue_count'),
             F.sum(F.when(F.col('CANCELLATION_REASON') == 'B', 1).otherwise(0)).alias('weather_issue_count'),
             F.sum(F.when(F.col('CANCELLATION_REASON') == 'C', 1).otherwise(0)).alias('nas_issue_count'),
             F.sum(F.when(F.col('CANCELLATION_REASON') == 'D', 1).otherwise(0)).alias('security_issue_count')) \
        .select(F.col('AIRLINE'),
                F.col('correct_count'),
                F.col('diverted_count'),
                F.col('cancelled_count'),
                F.col('avg_distance'),
                F.col('avg_air_time'),
                F.col('airline_issue_count'),
                F.col('weather_issue_count'),
                F.col('nas_issue_count'),
                F.col('security_issue_count'))

    result_dataset = airlines \
        .join(flights, flights['AIRLINE'] == airlines['IATA_CODE'], 'inner') \
        .select(airlines['AIRLINE_NAME'],
                flights['correct_count'],
                flights['diverted_count'],
                flights['cancelled_count'],
                flights['avg_distance'],
                flights['avg_air_time'],
                flights['airline_issue_count'],
                flights['weather_issue_count'],
                flights['nas_issue_count'],
                flights['security_issue_count']) \
        .orderBy(airlines['AIRLINE_NAME'])

    result_dataset.show(truncate=False, n=5)
    #result_dataset.write.mode('overwrite').parquet(result_path)

+----------------------------+-------------+--------------+---------------+------------------+------------------+-------------------+-------------------+---------------+--------------------+
|AIRLINE_NAME                |correct_count|diverted_count|cancelled_count|avg_distance      |avg_air_time      |airline_issue_count|weather_issue_count|nas_issue_count|security_issue_count|
+----------------------------+-------------+--------------+---------------+------------------+------------------+-------------------+-------------------+---------------+--------------------+
|Alaska Airlines Inc.        |8584         |18            |37             |1194.5375622178492|157.80929636533085|22                 |14                 |1              |0                   |
|American Airlines Inc.      |35587        |114           |546            |1045.4510166358596|140.21229662517212|158                |348                |40             |0                   |
|American Eagle Airlines Inc.|13934        |4