In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Flight Delays Analysis") \
    .getOrCreate()

In [12]:
# 파이썬 예제

# 파일 경로 설정
from pyspark.sql.functions import expr

tripdelaysFilePath = "departuredelays.csv"
airportsnaFilePath = "airport-codes-na.txt"

# 공항 데이터세트를 읽어오기
airportsna = (
    spark.read
    .format("csv")
    .options(header="true", inferSchema="true", sep="\t")
    .load(airportsnaFilePath)
)

airportsna.createOrReplaceTempView("airports_na")

# 출발 지연 데이터세트를 읽어오기
departureDelays = (
    spark.read
    .format("csv")
    .options(header="true")
    .load(tripdelaysFilePath)
)

departureDelays = (
    departureDelays
    .withColumn("delay", expr("CAST(delay as INT) as delay"))
    .withColumn("distance", expr("CAST(distance as INT) as distance"))
)

departureDelays.createOrReplaceTempView("departureDelays")

# 임시 작은 테이블 생성
foo = (
    departureDelays
    .filter(expr("origin == 'SEA' AND destination == 'SFO' AND date like '01010%' AND delay > 0"))
)

foo.createOrReplaceTempView("foo")


In [13]:
foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [5]:
# foo2 데이터프레임 생성 및 status 컬럼 추가
foo2 = (
    foo.withColumn(
        "status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
    )
)
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [6]:
# delay 컬럼 제거
foo3 = foo2.drop("delay")

# foo3 데이터프레임 출력
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [7]:
# 파이썬 예제

# foo3 데이터프레임에서 status 컬럼명을 flight_status로 변경
foo4 = foo3.withColumnRenamed("status", "flight_status")

# foo4 데이터프레임 출력
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



In [14]:
spark.sql("""
    SELECT * FROM (
        SELECT destination, 
               CAST(SUBSTRING(date, 0, 2) AS int) AS month, 
               delay
        FROM departureDelays 
        WHERE origin = 'SEA'
    ) 
    PIVOT (
        CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, 
        MAX(delay) AS MaxDelay 
        FOR month IN (1 AS JAN, 2 AS FEB)
    )
    ORDER BY destination
""").show(10)

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        NULL|        NULL|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        NULL|        NULL|
+-----------+------------+------------+------------+------------+
only showing top 10 rows



In [10]:
spark.sql("""
SELECT 
    destination, 
    CAST(SUBSTRING(date, 0, 2) AS int) AS month, 
    delay 
FROM 
    departureDelays 
WHERE 
    origin = 'SEA'
""").show()

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
|        ORD|    1|    0|
|        DFW|    1|   23|
|        DFW|    1|   36|
|        ORD|    1|  298|
|        JFK|    1|    4|
|        DFW|    1|    0|
|        MIA|    1|    2|
|        DFW|    1|    0|
|        DFW|    1|    0|
|        ORD|    1|   83|
+-----------+-----+-----+
only showing top 20 rows

