# <center> Introduction to Spark In-memory Computing via Python PySpark </center>

In [1]:
import sys
import os
import pyspark

print(os.environ['SPARK_ROOT'])
print(os.environ['SPARK_CONFIG_FILE'])
print(os.environ['SPARK_ROOT'])
print(os.environ['SPARK_MASTER_HOST'])
print(os.environ['SPARK_MASTER_PORT'])
print(os.environ['SPARK_MASTER_WEBUI_PORT'])

/software/spackages/linux-rocky8-x86_64/gcc-9.5.0/spark-3.1.1-3takuhnpd3av65aoge5ark5gligt6usb
/home/amann3/ondemand/data/sys/dashboard/batch_connect/sys/ood_jupyter_spark/output/04a1b626-170a-4ead-8f11-0b0661301574/spark-defaults.conf
/software/spackages/linux-rocky8-x86_64/gcc-9.5.0/spark-3.1.1-3takuhnpd3av65aoge5ark5gligt6usb
node0655.palmetto.clemson.edu
6304
3727


### Airlines Data

In [2]:
!cp -R /zfs/citi/airlines /scratch1/$USER/

In [3]:
!ls -lh /scratch1/$USER/airlines/data/

total 2.6G
-rw-r--r-- 1 amann3 cuuser 544M May 31 11:42 2000.csv
-rw-r--r-- 1 amann3 cuuser 573M May 31 11:43 2001.csv
-rw-r--r-- 1 amann3 cuuser 506M May 31 11:43 2002.csv
-rw-r--r-- 1 amann3 cuuser 598M May 31 11:44 2003.csv
-rw-r--r-- 1 amann3 cuuser 639M May 31 11:45 2004.csv
-rw-r--r-- 1 amann3 cuuser 640M May 31 11:45 2005.csv
-rw-r--r-- 1 amann3 cuuser 641M May 31 11:46 2006.csv
-rw-r--r-- 1 amann3 cuuser 671M May 31 11:47 2007.csv
-rw-r--r-- 1 amann3 cuuser 658M May 31 11:47 2008.csv


In [4]:
sqlContext = pyspark.SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x14b30d86c4c0>

In [5]:
import os
uid = os.getenv('USER')
airline_data = "/scratch1/" + uid + "/airlines/data/"

In [6]:
airlines = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load(airline_data)\
    .cache()

2023-05-31 11:50:46,952 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
%%time
airlines.count()



CPU times: user 60.3 ms, sys: 18.1 ms, total: 78.4 ms
Wall time: 3min 40s


                                                                                

59285457

In [8]:
%%time
airlines.count()

CPU times: user 2.42 ms, sys: 1.08 ms, total: 3.5 ms
Wall time: 722 ms


59285457

In [9]:
airlines.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

You can interact with a DataFrame via SQLContext using SQL statements by registering the DataFrame as a table

In [10]:
airlines.registerTempTable("airlines")

*How many unique airlines are there?*

In [11]:
uniqueAirline = sqlContext.sql("SELECT DISTINCT UniqueCarrier \
                                FROM airlines")
uniqueAirline.show()

                                                                                

+-------------+
|UniqueCarrier|
+-------------+
|           UA|
|           AA|
|           NW|
|           EV|
|           B6|
|           HP|
|           TW|
|           DL|
|           OO|
|           F9|
|           YV|
|           TZ|
|           US|
|           AQ|
|           MQ|
|           OH|
|           HA|
|           XE|
|           DH|
|           AS|
+-------------+
only showing top 20 rows



*Calculate how many flights completed by each carrier over time*

In [12]:
%%time
carrierFlightCount = sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount \
                                    FROM airlines GROUP BY UniqueCarrier")
carrierFlightCount.show()

                                                                                

+-------------+-----------+
|UniqueCarrier|FlightCount|
+-------------+-----------+
|           UA|    5094635|
|           AA|    6318386|
|           NW|    4280049|
|           EV|    1697172|
|           B6|     811341|
|           HP|    1208561|
|           TW|     511852|
|           DL|    5912486|
|           OO|    3090853|
|           F9|     336958|
|           YV|     854056|
|           TZ|     208420|
|           US|    4650400|
|           AQ|     154381|
|           MQ|    3954895|
|           OH|    1464176|
|           HA|     274265|
|           XE|    2350309|
|           DH|     693047|
|           AS|    1427189|
+-------------+-----------+
only showing top 20 rows

CPU times: user 13 ms, sys: 2.26 ms, total: 15.3 ms
Wall time: 3.39 s


*How do you display full carrier names?*

In [13]:
carriers = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("/zfs/citi/airlines/metadata/carriers.csv")\
    .cache()
carriers.registerTempTable("carriers")

In [14]:
carriers.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Description: string (nullable = true)



In [15]:
%%time
carrierFlightCountFullName = sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier, c.Description \
                                    ORDER BY a.UniqueCarrier")
carrierFlightCountFullName.show()



+--------------------+-------------+-----------+
|         Description|UniqueCarrier|FlightCount|
+--------------------+-------------+-----------+
|Pinnacle Airlines...|           9E|     521059|
|American Airlines...|           AA|    6318386|
| Aloha Airlines Inc.|           AQ|     154381|
|Alaska Airlines Inc.|           AS|    1427189|
|     JetBlue Airways|           B6|     811341|
|Continental Air L...|           CO|    2925290|
|    Independence Air|           DH|     693047|
|Delta Air Lines Inc.|           DL|    5912486|
|Atlantic Southeas...|           EV|    1697172|
|Frontier Airlines...|           F9|     336958|
|AirTran Airways C...|           FL|    1265138|
|Hawaiian Airlines...|           HA|     274265|
|America West Airl...|           HP|    1208561|
|American Eagle Ai...|           MQ|    3954895|
|Northwest Airline...|           NW|    4280049|
|         Comair Inc.|           OH|    1464176|
|Skywest Airlines ...|           OO|    3090853|
|Trans World Airwa..

                                                                                

*What is the averaged departure delay time for each airline?*

In [16]:
%%time
avgDepartureDelay = sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier \
                                    ORDER BY a.UniqueCarrier")
avgDepartureDelay.show()



+--------------------+--------------------+-------------------+
|  first(Description)|first(UniqueCarrier)|        AvgDepDelay|
+--------------------+--------------------+-------------------+
|Pinnacle Airlines...|                  9E| 7.9279144892173035|
|American Airlines...|                  AA|   9.31183542798288|
| Aloha Airlines Inc.|                  AQ| 1.5993176899118409|
|Alaska Airlines Inc.|                  AS|   9.59814943329714|
|     JetBlue Airways|                  B6| 11.262714178314551|
|Continental Air L...|                  CO|  7.772605178542145|
|    Independence Air|                  DH|  9.612639389688926|
|Delta Air Lines Inc.|                  DL| 7.4335417135660515|
|Atlantic Southeas...|                  EV| 13.483736343326541|
|Frontier Airlines...|                  F9|  6.096932123645889|
|AirTran Airways C...|                  FL|  10.27801937883596|
|Hawaiian Airlines...|                  HA|-0.5165400834606493|
|America West Airl...|                  

                                                                                

In [17]:
airlines.unpersist()

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: int, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]