In [1]:
from pyspark.sql import SparkSession

In [2]:
df = spark.read.option("inferSchema", "true").option("header", "true").csv("/mydata/hflight.csv")

In [3]:
df.head(3)

[Row(Year=2011, Month=1, DayofMonth=1, DayOfWeek=6, DepTime='1400', ArrTime='1500', UniqueCarrier='AA', FlightNum=428, TailNum='N576AA', ActualElapsedTime='60', AirTime='40', ArrDelay='-10', DepDelay='0', Origin='IAH', Dest='DFW', Distance=224, TaxiIn='7', TaxiOut='13', Cancelled=0, CancellationCode=None, Diverted=0),
 Row(Year=2011, Month=1, DayofMonth=2, DayOfWeek=7, DepTime='1401', ArrTime='1501', UniqueCarrier='AA', FlightNum=428, TailNum='N557AA', ActualElapsedTime='60', AirTime='45', ArrDelay='-9', DepDelay='1', Origin='IAH', Dest='DFW', Distance=224, TaxiIn='6', TaxiOut='9', Cancelled=0, CancellationCode=None, Diverted=0),
 Row(Year=2011, Month=1, DayofMonth=3, DayOfWeek=1, DepTime='1352', ArrTime='1502', UniqueCarrier='AA', FlightNum=428, TailNum='N541AA', ActualElapsedTime='70', AirTime='48', ArrDelay='-8', DepDelay='-8', Origin='IAH', Dest='DFW', Distance=224, TaxiIn='5', TaxiOut='17', Cancelled=0, CancellationCode=None, Diverted=0)]

In [5]:
df.show(3)

+----+-----+----------+---------+-------+-------+-------------+---------+-------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|ArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|
+----+-----+----------+---------+-------+-------+-------------+---------+-------+-----------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+
|2011|    1|         1|        6|   1400|   1500|           AA|      428| N576AA|               60|     40|     -10|       0|   IAH| DFW|     224|     7|     13|        0|            null|       0|
|2011|    1|         2|        7|   1401|   1501|           AA|      428| N557AA|               60|     45|      -9|       1|   IAH| DFW|     224|     6|      9|        0|            null|       0|
|2011|    

In [6]:
df.count()

227496

In [7]:
from pyspark.sql.functions import count

In [9]:
df.select(count("TailNum")).show()

+--------------+
|count(TailNum)|
+--------------+
|        226701|
+--------------+



In [10]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)



In [11]:
from pyspark.sql.functions import min, max

In [12]:
df.select(max('DepTime'), min('DepTime')).show()

+------------+------------+
|max(DepTime)|min(DepTime)|
+------------+------------+
|          NA|           1|
+------------+------------+



In [13]:
from pyspark.sql.functions import sum, count, avg, expr, col, when

In [14]:
rt = df.groupBy("Origin", "Dest").agg( avg('DepDelay'), avg('ArrDelay'))

In [16]:
rt.show()

+------+----+------------------+------------------+
|Origin|Dest|     avg(DepDelay)|     avg(ArrDelay)|
+------+----+------------------+------------------+
|   HOU| PHL| 33.19124423963134|23.541666666666668|
|   IAH| LIT| 7.740771812080537| 6.764063811922754|
|   IAH| GSP|  9.08272506082725| 9.617359413202934|
|   IAH| DEN| 10.37734288864388| 7.441828254847645|
|   IAH| TUS| 7.783870967741936| 7.801679586563307|
|   IAH| ABQ| 6.703891708967851| 7.928692699490663|
|   HOU| LIT|7.4404432132963985| 3.533333333333333|
|   IAH| ANC|            24.952|26.080645161290324|
|   IAH| SMF| 9.093966369930762|  4.66271018793274|
|   HOU| TUL|11.208174904942966| 7.855238095238096|
|   HOU| MDW|13.204150579150578|3.8126815101645692|
|   HOU| OKC|12.509865824782953| 9.441897233201582|
|   IAH| DSM|12.796850393700787|15.951104100946372|
|   IAH| DFW| 8.764620938628159| 5.510741008930727|
|   IAH| BOS|10.519141531322505| 6.559045956951716|
|   HOU| MAF|10.507042253521126| 6.918367346938775|
|   IAH| RIC

### 목적지 공항에 대해 연착 건수를 구하고, 연착 건수가 2000회 이상인 공항을 출력하세요

In [17]:
from pyspark.sql.types import IntegerType
df = df.withColumn("DepDelay", df["DepDelay"].cast(IntegerType()))
cnt_cond = lambda x : sum( when(x, 1).otherwise(0))
rt = df.groupBy("Dest").agg( cnt_cond(col('DepDelay') > 5).alias("result") )

In [18]:
rt.show()

+----+------+
|Dest|result|
+----+------+
| MSY|  2145|
| SNA|   441|
| GRR|   194|
| GSO|   159|
| OAK|   348|
| DCA|   604|
| MLU|    62|
| CID|   140|
| LEX|   204|
| ORF|   186|
| CRW|   100|
| SAV|   286|
| CMH|   336|
| MOB|   388|
| PNS|   434|
| HNL|   163|
| SHV|   199|
| SJC|   269|
| CVG|   380|
| LGA|   935|
+----+------+
only showing top 20 rows



In [19]:
test = rt.toPandas()

In [20]:
test

Unnamed: 0,Dest,result
0,MSY,2145
1,SNA,441
2,GRR,194
3,GSO,159
4,OAK,348
...,...,...
111,PHL,942
112,DSM,230
113,SAT,1611
114,SLC,534
