In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
                    .appName("Analyzing Airline Data")\
                    .getOrCreate()

In [3]:
from pyspark.sql.types import Row
from datetime import datetime

In [4]:
record = sc.parallelize([Row(id = 1,
                             name = "Jill",
                             active = True,
                             clubs = ['chess', 'hockey'],
                             subjects = {"math": 80, 'english': 56},
                             enrolled = datetime(2014, 8, 1, 14, 1, 5)),
                         Row(id = 2,
                             name = "George",
                             active = False,
                             clubs = ['chess', 'soccer'],
                             subjects = {"math": 60, 'english': 96},
                             enrolled = datetime(2015, 3, 21, 8, 2, 5))
])

In [5]:
record_df = record.toDF()
record_df.show()

+---+------+------+---------------+--------------------+-------------------+
| id|  name|active|          clubs|            subjects|           enrolled|
+---+------+------+---------------+--------------------+-------------------+
|  1|  Jill|  true|[chess, hockey]|{english -> 56, m...|2014-08-01 14:01:05|
|  2|George| false|[chess, soccer]|{english -> 96, m...|2015-03-21 08:02:05|
+---+------+------+---------------+--------------------+-------------------+



In [6]:
record_df.createOrReplaceTempView("records")

In [7]:
all_records_df = sqlContext.sql('SELECT * FROM records')

all_records_df.show()

+---+------+------+---------------+--------------------+-------------------+
| id|  name|active|          clubs|            subjects|           enrolled|
+---+------+------+---------------+--------------------+-------------------+
|  1|  Jill|  true|[chess, hockey]|{english -> 56, m...|2014-08-01 14:01:05|
|  2|George| false|[chess, soccer]|{english -> 96, m...|2015-03-21 08:02:05|
+---+------+------+---------------+--------------------+-------------------+



In [8]:
sqlContext.sql('SELECT id, clubs[1], subjects["english"] FROM records').show()

+---+--------+-----------------+
| id|clubs[1]|subjects[english]|
+---+--------+-----------------+
|  1|  hockey|               56|
|  2|  soccer|               96|
+---+--------+-----------------+



In [9]:
sqlContext.sql('SELECT id, NOT active FROM records').show()

+---+------------+
| id|(NOT active)|
+---+------------+
|  1|       false|
|  2|        true|
+---+------------+



In [10]:
sqlContext.sql('SELECT * FROM records where active').show()

+---+----+------+---------------+--------------------+-------------------+
| id|name|active|          clubs|            subjects|           enrolled|
+---+----+------+---------------+--------------------+-------------------+
|  1|Jill|  true|[chess, hockey]|{english -> 56, m...|2014-08-01 14:01:05|
+---+----+------+---------------+--------------------+-------------------+



In [11]:
sqlContext.sql('SELECT * FROM records where subjects["english"] > 90').show()

+---+------+------+---------------+--------------------+-------------------+
| id|  name|active|          clubs|            subjects|           enrolled|
+---+------+------+---------------+--------------------+-------------------+
|  2|George| false|[chess, soccer]|{english -> 96, m...|2015-03-21 08:02:05|
+---+------+------+---------------+--------------------+-------------------+



In [14]:
#record_df.createGlobalTempView("global_records")
#sqlContext.sql('SELECT * FROM global_temp.global_records').show()

In [15]:
airlinesPath = "../datasets/airlines.csv"
flightsPath = "../datasets/flights.csv"
airportsPath = "../datasets/airports.csv"

In [16]:
airlines = spark.read\
            .format("csv")\
            .option("header", "true")\
            .load(airlinesPath)

In [17]:
airlines.createOrReplaceTempView("airlines")

In [18]:
airlines = spark.sql("SELECT * FROM airlines")
airlines.columns

['Code', 'Description']

In [19]:
airlines.show(5)

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
+-----+--------------------+
only showing top 5 rows



In [20]:
flights = spark.read\
            .format("csv")\
            .option("header", "true")\
            .load(flightsPath)

In [21]:
flights.createOrReplaceTempView("flights")

flights.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [22]:
flights.show(5)

+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|      date|airlines|flight_number|origin|destination|departure|departure_delay|arrival|arrival_delay|air_time|distance|
+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|2014-04-01|   19805|            1|   JFK|        LAX|     0854|          -6.00|   1217|         2.00|  355.00| 2475.00|
|2014-04-01|   19805|            2|   LAX|        JFK|     0944|          14.00|   1736|       -29.00|  269.00| 2475.00|
|2014-04-01|   19805|            3|   JFK|        LAX|     1224|          -6.00|   1614|        39.00|  371.00| 2475.00|
|2014-04-01|   19805|            4|   LAX|        JFK|     1240|          25.00|   2028|       -27.00|  264.00| 2475.00|
|2014-04-01|   19805|            5|   DFW|        HNL|     1300|          -5.00|   1650|        15.00|  510.00| 3784.00|
+----------+--------+-----------

In [23]:
flights.count(), airlines.count()

(476881, 1579)

In [24]:
flights_count = spark.sql("SELECT COUNT(*) FROM flights")
airlines_count = spark.sql("SELECT COUNT(*) FROM airlines")

In [25]:
flights_count, airlines_count

(DataFrame[count(1): bigint], DataFrame[count(1): bigint])

In [26]:
flights_count.collect()[0][0], airlines_count.collect()[0][0]

(476881, 1579)

In [33]:
total_distance_df = spark.sql("SELECT distance FROM flights")\
                        .agg({"distance":"sum"})\
                        .withColumnRenamed("sum(distance)","total_distance")

In [34]:
total_distance_df.show()

+--------------+
|total_distance|
+--------------+
|  3.79052917E8|
+--------------+



In [32]:
"""
total_distance_df = flights.select("distance")\
                        .agg({"distance":"sum"})\
                        .withColumnRenamed("sum(distance)","total_distance")

total_distance_df.show()
"""

'\ntotal_distance_df = flights.select("distance")                        .agg({"distance":"sum"})                        .withColumnRenamed("sum(distance)","total_distance")\n\ntotal_distance_df.show()\n'

In [35]:
all_delays_2012 = spark.sql(
    "SELECT date, airlines, flight_number, departure_delay " +
    "FROM flights WHERE departure_delay > 0 and year(date) = 2012")

In [36]:
all_delays_2012.show(5)

+----+--------+-------------+---------------+
|date|airlines|flight_number|departure_delay|
+----+--------+-------------+---------------+
+----+--------+-------------+---------------+



In [37]:
all_delays_2014 = spark.sql(
    "SELECT date, airlines, flight_number, departure_delay " +
    "FROM flights WHERE departure_delay > 0 and year(date) = 2014")

all_delays_2014.show(5)

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-01|   19805|            2|          14.00|
|2014-04-01|   19805|            4|          25.00|
|2014-04-01|   19805|            6|         126.00|
|2014-04-01|   19805|            7|         125.00|
|2014-04-01|   19805|            8|           4.00|
+----------+--------+-------------+---------------+
only showing top 5 rows



In [38]:
all_delays_2014.createOrReplaceTempView("all_delays")

In [39]:
all_delays_2014.orderBy(all_delays_2014.departure_delay.desc()).show(5)

+----------+--------+-------------+---------------+
|      date|airlines|flight_number|departure_delay|
+----------+--------+-------------+---------------+
|2014-04-28|   19393|          388|          99.00|
|2014-04-28|   19393|          768|          99.00|
|2014-04-29|   20409|         1780|          99.00|
|2014-04-29|   19805|         2311|          99.00|
|2014-04-29|   20366|         4187|          99.00|
+----------+--------+-------------+---------------+
only showing top 5 rows



In [40]:
delay_count = spark.sql("SELECT COUNT(departure_delay) FROM all_delays")

In [41]:
delay_count.show()

+----------------------+
|count(departure_delay)|
+----------------------+
|                179015|
+----------------------+



In [42]:
delay_count.collect()[0][0]

179015

In [43]:
delay_percent = delay_count.collect()[0][0] / flights_count.collect()[0][0] * 100
delay_percent

37.53871510922012

In [45]:
delay_per_airline = spark.sql("SELECT airlines, departure_delay FROM flights")\
                        .groupBy("airlines")\
                        .agg({"departure_delay":"avg"})\
                        .withColumnRenamed("avg(departure_delay)", "departure_delay")

In [46]:
delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



In [47]:
delay_per_airline.createOrReplaceTempView("delay_per_airline")

In [48]:
delay_per_airline = spark.sql("SELECT * FROM delay_per_airline ORDER BY departure_delay DESC")

In [49]:
delay_per_airline.show(5)

+--------+------------------+
|airlines|   departure_delay|
+--------+------------------+
|   19393|13.429567657134724|
|   20366|12.296210112379818|
|   19977| 8.818392620527979|
|   20436| 8.716275167785234|
|   20409|  8.31110357194785|
+--------+------------------+
only showing top 5 rows



In [50]:
delay_per_airline = spark.sql("SELECT * FROM delay_per_airline " +
                              "JOIN airlines ON airlines.code = delay_per_airline.airlines " +
                              "ORDER BY departure_delay DESC")

delay_per_airline.show(5)

+--------+------------------+-----+--------------------+
|airlines|   departure_delay| Code|         Description|
+--------+------------------+-----+--------------------+
|   19393|13.429567657134724|19393|Southwest Airline...|
|   20366|12.296210112379818|20366|ExpressJet Airlin...|
|   19977| 8.818392620527979|19977|United Air Lines ...|
|   20436| 8.716275167785234|20436|Frontier Airlines...|
|   20409|  8.31110357194785|20409| JetBlue Airways: B6|
+--------+------------------+-----+--------------------+
only showing top 5 rows



In [51]:
delay_per_airline.drop("code").show(5)

+--------+------------------+--------------------+
|airlines|   departure_delay|         Description|
+--------+------------------+--------------------+
|   19393|13.429567657134724|Southwest Airline...|
|   20366|12.296210112379818|ExpressJet Airlin...|
|   19977| 8.818392620527979|United Air Lines ...|
|   20436| 8.716275167785234|Frontier Airlines...|
|   20409|  8.31110357194785| JetBlue Airways: B6|
+--------+------------------+--------------------+
only showing top 5 rows

