In [40]:
from pyspark.sql import SparkSession

"""
The SparkSession, introduced in Spark 2.0,
provides a unified entry point for programming Spark with the Structured APIs.
You can use a SparkSession to access Spark functionality:
just import the class and create an instance in your code. """

spark = SparkSession.builder.getOrCreate()


# Read and create a temporary view
# Infer schema (note that for larger files you # may want to specify the schema)
df = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .load('airlinedelaycauses_DelayedFlights.csv')

21/11/25 21:34:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [46]:
x = df.createOrReplaceTempView("us_delay_flights_tbl")

In [45]:
# Get the details of schema 
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (

In [50]:
x[0]

TypeError: 'NoneType' object is not subscriptable

In [53]:
df.head(5)

21/11/26 10:27:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Year, Month, DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay
 Schema: _c0, Year, Month, DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay
Expected: _c0 but found: 
CSV file: file:///Users/ankurmahiwal/Documents/GitHub/spark_practice/airlinedelaycauses_DelayedFlights.csv


[Row(_c0='0', Year='2008', Month='1', DayofMonth='3', DayOfWeek='4', DepTime='2003.0', CRSDepTime='1955', ArrTime='2211.0', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128.0', CRSElapsedTime='150.0', AirTime='116.0', ArrDelay='-14.0', DepDelay='8.0', Origin='IAD', Dest='TPA', Distance='810', TaxiIn='4.0', TaxiOut='8.0', Cancelled='0', CancellationCode='N', Diverted='0', CarrierDelay=None, WeatherDelay=None, NASDelay=None, SecurityDelay=None, LateAircraftDelay=None),
 Row(_c0='1', Year='2008', Month='1', DayofMonth='3', DayOfWeek='4', DepTime='754.0', CRSDepTime='735', ArrTime='1002.0', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128.0', CRSElapsedTime='145.0', AirTime='113.0', ArrDelay='2.0', DepDelay='19.0', Origin='IAD', Dest='TPA', Distance='810', TaxiIn='5.0', TaxiOut='10.0', Cancelled='0', CancellationCode='N', Diverted='0', CarrierDelay=None, WeatherDelay=None, NASDelay=None, Securi

[Row(date='2015-12-12', time='13:42:10', size=257886, r_version='3.2.2', r_arch='i386', r_os='mingw32', package='HistData', version='0.7-6', country='CZ', ip_id=1)]

In [90]:
# We’ll find all flights whose distance is greater than 1,000 miles:

spark.sql("""SELECT Distance,Origin,Dest
             FROM us_delay_flights_tbl where distance > 1000
             ORDER BY Distance desc""").show(20)

[Stage 23:>                                                         (0 + 8) / 8]

+--------+------+----+
|Distance|Origin|Dest|
+--------+------+----+
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   HNL| EWR|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   HNL| EWR|
|    4962|   EWR| HNL|
|    4962|   HNL| EWR|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   HNL| EWR|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
+--------+------+----+
only showing top 20 rows



                                                                                

In [95]:
# DATAFRAME API Code

df.select("Distance", "Origin", "Dest"). \
   where(df["Distance"] > 1000). \
   orderBy('Distance',ascending=False).show(10)
#.orderBy(df["Distance"].desc).show(10)

+--------+------+----+
|Distance|Origin|Dest|
+--------+------+----+
|    4962|   EWR| HNL|
|    4962|   HNL| EWR|
|    4962|   HNL| EWR|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
|    4962|   EWR| HNL|
+--------+------+----+
only showing top 10 rows



[Stage 26:>                                                         (0 + 8) / 8]                                                                                

In [60]:
# Find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour of arrival delay:

spark.sql("""select Year,Month,ArrDelay,Origin,Dest 
             FROM us_delay_flights_tbl
             WHERE ArrDelay > 120 AND Origin = 'SFO' AND Dest = 'ORD'
            ORDER by ArrDelay Desc""").show(10)

[Stage 16:>                                                         (0 + 8) / 8]

+----+-----+--------+------+----+
|Year|Month|ArrDelay|Origin|Dest|
+----+-----+--------+------+----+
|2008|    9|   519.0|   SFO| ORD|
|2008|   10|   419.0|   SFO| ORD|
|2008|    7|   415.0|   SFO| ORD|
|2008|    1|   388.0|   SFO| ORD|
|2008|    1|   380.0|   SFO| ORD|
|2008|    9|   380.0|   SFO| ORD|
|2008|    1|   352.0|   SFO| ORD|
|2008|    2|   346.0|   SFO| ORD|
|2008|    7|   328.0|   SFO| ORD|
|2008|    2|   325.0|   SFO| ORD|
+----+-----+--------+------+----+
only showing top 10 rows



                                                                                

In [116]:
# Results of above query using pyspark syntax
df.select("Year","Month","ArrDelay","Origin","Dest"). \
   where((df["ArrDelay"] > 120) & (df["Origin"] == "SFO") & (df["Dest"] == "ORD")). \
   orderBy("ArrDelay",ascending=False). \
   show(10)



+----+-----+--------+------+----+
|Year|Month|ArrDelay|Origin|Dest|
+----+-----+--------+------+----+
|2008|    9|   519.0|   SFO| ORD|
|2008|   10|   419.0|   SFO| ORD|
|2008|    7|   415.0|   SFO| ORD|
|2008|    1|   388.0|   SFO| ORD|
|2008|    9|   380.0|   SFO| ORD|
|2008|    1|   380.0|   SFO| ORD|
|2008|    1|   352.0|   SFO| ORD|
|2008|    2|   346.0|   SFO| ORD|
|2008|    7|   328.0|   SFO| ORD|
|2008|    2|   325.0|   SFO| ORD|
+----+-----+--------+------+----+
only showing top 10 rows



21/11/26 17:41:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 974549 ms exceeds timeout 120000 ms
21/11/26 17:41:23 WARN SparkContext: Killing executors is not supported by current scheduler.


In [108]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: string, Year: string, Month: string, DayofMonth: string, DayOfWeek: string, DepTime: string, CRSDepTime: string, ArrTime: string, CRSArrTime: string, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: string, CancellationCode: string, Diverted: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]>

In [100]:

df['ArrDelay'].

Column<'ArrDelay'>

In [38]:
df['age'].isNull().c

Column<'(age IS NULL)'>

In [64]:
"""Label all US flights, 
   regardless of origin and destination,
   with an indication of the delays they experienced: 
   Very Long Delays (> 6 hours), Long Delays (2–6 hours), etc """


spark.sql("""SELECT ArrDelay, Origin, Dest,
              CASE
                  WHEN ArrDelay > 360 THEN 'Very Long Delays'
                  WHEN ArrDelay > 120 AND ArrDelay < 360 THEN 'Long Delays'
                  WHEN ArrDelay > 60 AND ArrDelay < 120 THEN 'Short Delays'
                  WHEN ArrDelay > 0 and ArrDelay < 60  THEN  'Tolerable Delays'
                  WHEN ArrDelay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY Origin, Arrdelay DESC""").show(10)

[Stage 17:>                                                         (0 + 8) / 8]

+--------+------+----+-------------+
|ArrDelay|Origin|Dest|Flight_Delays|
+--------+------+----+-------------+
|    99.0|   ABE| ATL| Short Delays|
|    99.0|   ABE| ORD| Short Delays|
|    98.0|   ABE| ORD| Short Delays|
|    98.0|   ABE| ORD| Short Delays|
|    98.0|   ABE| ORD| Short Delays|
|    98.0|   ABE| ATL| Short Delays|
|    98.0|   ABE| ORD| Short Delays|
|    96.0|   ABE| ORD| Short Delays|
|    96.0|   ABE| ATL| Short Delays|
|    95.0|   ABE| ATL| Short Delays|
+--------+------+----+-------------+
only showing top 10 rows



