In [2]:
 #cначала создадим Spark сессию
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ByteType, ShortType

spark = SparkSession.builder.appName('home_work').getOrCreate()

In [3]:
#считываем airlines.csv
schema = StructType([
    StructField("IATA_CODE", StringType(), True),
    StructField("AIRLINE", StringType(), True),
])
dfal = spark.read.csv("source_data/airlines.csv", header=True, schema=schema)

In [4]:
#считываем airports.csv
schema = StructType([
    StructField("IATA_CODE", StringType(), True),
    StructField("AIRPORT", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("COUNTRY", StringType(), True),
    StructField("LATITUDE", DoubleType(), True),
    StructField("LONGITUDE", DoubleType(), True),
])
dfap = spark.read.csv("source_data/airports.csv", header=True, schema=schema)

In [5]:
#считываем flights.csv
schema = StructType([
    StructField("YEAR", StringType(), True),
    StructField("MONTH", StringType(), True),
    StructField("DAY", StringType(), True),
    StructField("DAY_OF_WEEK", StringType(), True),
    StructField("AIRLINE", StringType(), True),
    StructField("FLIGHT_NUMBER", StringType(), True),
    StructField("TAIL_NUMBER", StringType(), True),
    StructField("ORIGIN_AIRPORT", StringType(), True),
    StructField("DESTINATION_AIRPORT", StringType(), True),
    StructField("SCHEDULED_DEPARTURE", StringType(), True),
    StructField("DEPARTURE_TIME", StringType(), True),
    StructField("DEPARTURE_DELAY", ShortType(), True),
    StructField("TAXI_OUT", ByteType(), True),
    StructField("WHEELS_OFF", StringType(), True),
    StructField("SCHEDULED_TIME", StringType(), True),
    StructField("ELAPSED_TIME", StringType(), True),
    StructField("AIR_TIME", StringType(), True),
    StructField("DISTANCE", StringType(), True),          
    StructField("WHEELS_ON", StringType(), True),
    StructField("TAXI_IN", StringType(), True),
    StructField("SCHEDULED_ARRIVAL", StringType(), True),
    StructField("ARRIVAL_TIME", StringType(), True),
    StructField("ARRIVAL_DELAY", StringType(), True),
    StructField("DIVERTED", StringType(), True),             
    StructField("CANCELLED", StringType(), True),
    StructField("CANCELLATION_REASON", StringType(), True),
    StructField("AIR_SYSTEM_DELAY", StringType(), True),
    StructField("SECURITY_DELAY", StringType(), True),
    StructField("AIRLINE_DELAY", StringType(), True),
    StructField("LATE_AIRCRAFT_DELAY", StringType(), True),
    StructField("WEATHER_DELAY", StringType(), True),
])
dff = spark.read.csv("source_data/flights.csv", header=True, schema=schema, sep=";").distinct()

In [12]:
query = '''
    WITH cte AS (
        SELECT 
            dfal.AIRLINE,
            dff.TAXI_OUT,
            dff.DEPARTURE_DELAY,
            CASE WHEN dff.MONTH IN (3, 4, 5) THEN 'spring' 
                 WHEN dff.MONTH IN (6, 7, 8) THEN 'summer'
                 WHEN dff.MONTH IN (9, 10, 11) THEN 'autumn'
                 ELSE 'winter' 
            END AS SEASON,
            CASE WHEN dfap.LATITUDE < 32.8 THEN 'south'
                 WHEN dfap.LATITUDE > 52 THEN 'north'
                 ELSE 'middle'
            END AS REGION 
        FROM {dff} AS dff
        JOIN {dfal} AS dfal ON dfal.IATA_CODE = dff.AIRLINE
        JOIN {dfap} AS dfap ON dfap.IATA_CODE = dff.ORIGIN_AIRPORT
        WHERE dfal.IATA_CODE = 'WN'
        AND dff.TAXI_OUT IS NOT NULL
        AND dff.DEPARTURE_DELAY IS NOT NULL
    )
 
    SELECT DISTINCT
        cte.AIRLINE, 
        CAST((SELECT (AVG(c.TAXI_OUT * c.DEPARTURE_DELAY) - (AVG(c.TAXI_OUT) * AVG(c.DEPARTURE_DELAY))) / (STDDEV_POP(c.TAXI_OUT) * STDDEV_POP(c.DEPARTURE_DELAY)) FROM cte as c) AS decimal(14,3)) AS CORRELATION_WITH_DEPARTURE_DELAY,
        MAX(cte.TAXI_OUT) OVER(PARTITION BY NULL) AS MAX_ALL,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY NULL) AS MEDIANA_ALL,
        PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY NULL) AS 90_PERCENTILE_ALL,
        cte.SEASON, 
        MAX(cte.TAXI_OUT) OVER(PARTITION BY cte.SEASON) AS MAX_SEASON,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY cte.SEASON) AS MEDIANA_SEASON,
        PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY cte.SEASON) AS 90_PERCENTILE_SEASON,
        cte.REGION,
        MAX(cte.TAXI_OUT) OVER(PARTITION BY cte.SEASON, cte.REGION) AS MAX_REGION,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY cte.SEASON, cte.REGION) AS MEDIANA_REGION,
        PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY cte.TAXI_OUT) OVER (PARTITION BY cte.SEASON, cte.REGION) AS 90_PERCENTILE_REGION
    FROM cte AS cte
'''
df_total = spark.sql(query, dff=dff, dfal=dfal, dfap=dfap)

In [13]:
df_total.show()

+--------------------+--------------------------------+-------+-----------+-----------------+------+----------+--------------+--------------------+------+----------+--------------+--------------------+
|             AIRLINE|CORRELATION_WITH_DEPARTURE_DELAY|MAX_ALL|MEDIANA_ALL|90_PERCENTILE_ALL|SEASON|MAX_SEASON|MEDIANA_SEASON|90_PERCENTILE_SEASON|REGION|MAX_REGION|MEDIANA_REGION|90_PERCENTILE_REGION|
+--------------------+--------------------------------+-------+-----------+-----------------+------+----------+--------------+--------------------+------+----------+--------------+--------------------+
|Southwest Airline...|                           0.046|    127|       10.0|             18.0|winter|       127|          10.0|                19.0|middle|       127|          11.0|                20.0|
|Southwest Airline...|                           0.046|    127|       10.0|             18.0|winter|       127|          10.0|                19.0| south|       122|          10.0|            

In [22]:
#запишем данные во внутреннюю БД
df_total.write \
    .mode("overwrite") \
    .saveAsTable("new_test_table")

In [23]:
#проверим наличие данных
spark.read.table("new_test_table").show()

+--------------------+--------------------------------+-------+-----------+-----------------+------+----------+--------------+--------------------+------+----------+--------------+--------------------+
|             AIRLINE|CORRELATION_WITH_DEPARTURE_DELAY|MAX_ALL|MEDIANA_ALL|90_PERCENTILE_ALL|SEASON|MAX_SEASON|MEDIANA_SEASON|90_PERCENTILE_SEASON|REGION|MAX_REGION|MEDIANA_REGION|90_PERCENTILE_REGION|
+--------------------+--------------------------------+-------+-----------+-----------------+------+----------+--------------+--------------------+------+----------+--------------+--------------------+
|Southwest Airline...|                           0.046|    127|       10.0|             18.0|spring|       123|          10.0|                17.0|middle|       123|          10.0|                18.0|
|Southwest Airline...|                           0.046|    127|       10.0|             18.0|spring|       123|          10.0|                17.0| south|       122|          10.0|            

In [101]:
#остановим сессию
spark.stop()