In [63]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
import time

# New API
spark = SparkSession\
        .builder\
        .master("spark://host-192-168-1-153-ldsa:7077")\
        .appName("team22")\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.minExecutors", 4)\
        .config("spark.dynamicAllocation.maxExecutors", 4)\
        .config("spark.dynamicAllocation.executorIdleTimeout","60s")\
        .config("spark.executor.cores",8)\
        .getOrCreate()
        
# Old API (RDD)
sc = spark.sparkContext

In [None]:
flight_data = spark\
    .read\
    .format("csv")\
    .option("header","true")\
    .load("hdfs://192.168.1.153:9000/team22/data/*.csv")

flight_data = flight_data.drop("CancellationCode","Distance","AirTime","CRSElapsedTime","Diverted","SecurityDelay","LateAircraftDelay","UniqueCarrier","TaxiIn", "TaxiOut", "FlightNum", "TailNum","ActualElapsedTime","CarrierDelay" ,"WeatherDelay","NASDelay")

# Grandma Question

Filter Origin and Destination

In [60]:
start_time = time.time()

########################

airports_Q1 = flight_data.filter(flight_data["Origin"]=="LAX").filter(flight_data["Dest"]=="HNL")

# Week day delay

week_day_delay = airports_Q1.select("DayOfWeek", "ArrDelay", "DepDelay")
week_day_delay = week_day_delay.withColumn("TotalDelay", week_day_delay["ArrDelay"].cast("float")+week_day_delay["DepDelay"].cast("float")).orderBy("DayOfWeek")
week_day_delay = week_day_delay.select(week_day_delay["DayOfWeek"].cast(IntegerType()),"TotalDelay")
week_day_delay = week_day_delay.groupBy("DayOfWeek").agg(F.mean("TotalDelay"))

week_day_delay.orderBy("avg(TotalDelay)").show()

# Monthly delay

month_delay = airports_Q1.select("DayOfMonth", "ArrDelay", "DepDelay")
month_delay = month_delay.withColumn("TotalDelay", month_delay["ArrDelay"].cast("float")+month_delay["DepDelay"].cast("float"))
month_delay = month_delay.select(month_delay["DayOfMonth"].cast(IntegerType()),"TotalDelay")
month_delay = month_delay.groupBy("DayOfMonth").agg(F.mean("TotalDelay"))

month_delay.orderBy("avg(TotalDelay)").show(5)

########################

end_time = time.time()
print("Time taken:", end_time-start_time)

+---------+------------------+
|DayOfWeek|   avg(TotalDelay)|
+---------+------------------+
|        1|14.553268765133172|
|        2|16.411973918197983|
|        7|17.523692307692308|
|        3| 20.41650671785029|
|        5|23.048989589712185|
|        4|23.234423195558296|
|        6|23.413401476433844|
+---------+------------------+

+----------+------------------+
|DayOfMonth|   avg(TotalDelay)|
+----------+------------------+
|        10| 9.489583333333334|
|         8|11.816489361702128|
|        11|12.013192612137203|
|         9|13.067204301075268|
|        23| 14.71576227390181|
+----------+------------------+
only showing top 5 rows

Time taken: 101.80658030509949


# Least delays/flights

In [61]:
start_time = time.time()

########################

airports_Q2 = flight_data.select("Origin", "DepDelay")
airports_Q2 = airports_Q2.groupBy("Origin").agg(F.mean("DepDelay"))
airports_Q2.orderBy("avg(DepDelay)").show(5)

########################

end_time = time.time()
print("Time taken:", end_time-start_time)

+------+-------------------+
|Origin|      avg(DepDelay)|
+------+-------------------+
|   BQN|-1.9230769230769231|
|   ROP|-1.6528716216216217|
|   GRB|-0.8628618693134822|
|   AZO|-0.5115480649188514|
|   VPS|-0.2964519140989729|
+------+-------------------+
only showing top 5 rows

Time taken: 34.814409494400024


In [62]:
spark.stop()

In [None]:
spark = SparkSession\
        .builder\
        .master("spark://host-192-168-1-153-ldsa:7077")\
        .appName("team22")\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.minExecutors", 4)\
        .config("spark.dynamicAllocation.maxExecutors", 4)\
        .config("spark.dynamicAllocation.executorIdleTimeout","60s")\
        .config("spark.executor.cores",8)\
        .getOrCreate()
        
spark = SparkSession\
        .builder\
        .master("local[3]")\
        .appName("team22")\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","60s")\
        .config("spark.executor.cores",8)\
        .getOrCreate()

[1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008]
