<a href="https://colab.research.google.com/github/hong126-ch/CIS5450/blob/main/20_Module_4_Part_II_SQL_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Streams in a High-Level System


## Basic Setup

In [1]:
%set_env SPARK_VERSION=3.5.7

env: SPARK_VERSION=3.5.7


## Data Stream Processing

We will be using Spark Streaming (since everyone is familiar with Spark) as well as StreamParse (which is a bit lower-level).

### Spark Installation

As a preliminary step, we'll first set Apache Spark up on the current machine.

In [2]:
## Let's install Apache Spark on Colab

!wget -nc https://downloads.apache.org/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz
!tar xf spark-$SPARK_VERSION-bin-hadoop3.tgz
!pip install findspark

import os

os.environ["SPARK_HOME"] = "/content/spark-" + os.environ['SPARK_VERSION'] + "-bin-hadoop3"

--2025-12-02 00:46:44--  https://downloads.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.208.237, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400914067 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.7-bin-hadoop3.tgz’


2025-12-02 00:47:06 (17.3 MB/s) - ‘spark-3.5.7-bin-hadoop3.tgz’ saved [400914067/400914067]

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SQLContext

In [4]:
# Let's set up a connection to local Spark

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession.builder.appName("Flights").getOrCreate()
sc = spark.sparkContext

### Downloading Data from Google Drive

Next, we'll be analyzing **airline flight info** in streaming fashion.  This info started off in a giant data file from the US Department of Transportation's [Bureau of Transportation Statistics](https://www.transtats.bts.gov/Tables.asp?DB_ID=120&DB_Name=Airline%20On-Time%20Performance%20Data&DB_Short_Name=On-Time).

First we need to download it from where it's being publicly shared in Google Drive.

In [5]:
!wget -nc https://storage.googleapis.com/penn-cis5450/airlines.csv
!wget -nc https://storage.googleapis.com/penn-cis5450/airports.csv
!wget -nc https://storage.googleapis.com/penn-cis5450/ontime.csv
!wget -nc https://storage.googleapis.com/penn-cis5450/2015-ontime.csv

--2025-12-02 00:48:17--  https://storage.googleapis.com/penn-cis5450/airlines.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 173.194.202.207, 173.194.203.207, 74.125.199.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|173.194.202.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 359 [text/csv]
Saving to: ‘airlines.csv’


2025-12-02 00:48:17 (118 MB/s) - ‘airlines.csv’ saved [359/359]

--2025-12-02 00:48:17--  https://storage.googleapis.com/penn-cis5450/airports.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 173.194.202.207, 173.194.203.207, 74.125.199.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|173.194.202.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23867 (23K) [text/csv]
Saving to: ‘airports.csv’


2025-12-02 00:48:17 (100 MB/s) - ‘airports.csv’ saved [23867/23867]

--2025-12-02 00:48:18--  https://storage.googleapis.com/penn-cis5450/ontim

Let's see what we have downloaded...

In [6]:
!ls -l /content/*.csv

-rw-r--r-- 1 root root 592406591 Nov 15  2024 /content/2015-ontime.csv
-rw-r--r-- 1 root root       359 Sep  5  2022 /content/airlines.csv
-rw-r--r-- 1 root root     23867 Sep  5  2022 /content/airports.csv
-rw-r--r-- 1 root root  28805770 Nov 15  2024 /content/ontime.csv


In [7]:
# Now, to demonstrate Spark's incremental processing, we'll break the one-month
# ontime.csv file into segments of 10000 lines each
! split -n 10000 ontime.csv

!head ontime.csv

"YEAR","MONTH","DAY_OF_MONTH","AIRLINE_ID","CARRIER","FL_NUM","ORIGIN","DEST","ARR_DELAY_NEW","CANCELLED",
2018,1,2,19393,"WN","1325","SJU","MCO",0.00,0.00,
2018,1,2,19393,"WN","5159","SJU","MCO",0.00,0.00,
2018,1,2,19393,"WN","5890","SJU","MCO",9.00,0.00,
2018,1,2,19393,"WN","6618","SJU","MCO",0.00,0.00,
2018,1,2,19393,"WN","1701","SJU","MDW",8.00,0.00,
2018,1,2,19393,"WN","844","SJU","TPA",23.00,0.00,
2018,1,2,19393,"WN","4679","SJU","TPA",0.00,0.00,
2018,1,2,19393,"WN","6294","SLC","BUR",20.00,0.00,
2018,1,2,19393,"WN","5245","SLC","DAL",0.00,0.00,


In [8]:
!head airports.csv

IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
ABE,Lehigh Valley International Airport,Allentown,PA,USA,40.65236,-75.44040
ABI,Abilene Regional Airport,Abilene,TX,USA,32.41132,-99.68190
ABQ,Albuquerque International Sunport,Albuquerque,NM,USA,35.04022,-106.60919
ABR,Aberdeen Regional Airport,Aberdeen,SD,USA,45.44906,-98.42183
ABY,Southwest Georgia Regional Airport,Albany,GA,USA,31.53552,-84.19447
ACK,Nantucket Memorial Airport,Nantucket,MA,USA,41.25305,-70.06018
ACT,Waco Regional Airport,Waco,TX,USA,31.61129,-97.23052
ACV,Arcata Airport,Arcata/Eureka,CA,USA,40.97812,-124.10862
ACY,Atlantic City International Airport,Atlantic City,NJ,USA,39.45758,-74.57717


In [9]:
!head airlines.csv

IATA_CODE,AIRLINE
UA,United Air Lines Inc.
AA,American Airlines Inc.
US,US Airways Inc.
F9,Frontier Airlines Inc.
B6,JetBlue Airways
OO,Skywest Airlines Inc.
AS,Alaska Airlines Inc.
NK,Spirit Air Lines
WN,Southwest Airlines Co.


In [10]:
!ls

2015-ontime.csv		     xbqn  xdhh  xeyb  xgov  xifp  xjwj  xlnd  xndx
airlines.csv		     xbqo  xdhi  xeyc  xgow  xifq  xjwk  xlne  xndy
airports.csv		     xbqp  xdhj  xeyd  xgox  xifr  xjwl  xlnf  xndz
ontime.csv		     xbqq  xdhk  xeye  xgoy  xifs  xjwm  xlng  xnea
sample_data		     xbqr  xdhl  xeyf  xgoz  xift  xjwn  xlnh  xneb
spark-3.5.7-bin-hadoop3      xbqs  xdhm  xeyg  xgpa  xifu  xjwo  xlni  xnec
spark-3.5.7-bin-hadoop3.tgz  xbqt  xdhn  xeyh  xgpb  xifv  xjwp  xlnj  xned
xaaa			     xbqu  xdho  xeyi  xgpc  xifw  xjwq  xlnk  xnee
xaab			     xbqv  xdhp  xeyj  xgpd  xifx  xjwr  xlnl  xnef
xaac			     xbqw  xdhq  xeyk  xgpe  xify  xjws  xlnm  xneg
xaad			     xbqx  xdhr  xeyl  xgpf  xifz  xjwt  xlnn  xneh
xaae			     xbqy  xdhs  xeym  xgpg  xiga  xjwu  xlno  xnei
xaaf			     xbqz  xdht  xeyn  xgph  xigb  xjwv  xlnp  xnej
xaag			     xbra  xdhu  xeyo  xgpi  xigc  xjww  xlnq  xnek
xaah			     xbrb  xdhv  xeyp  xgpj  xigd  xjwx  xlnr  xnel
xaai			     xbrc  xdhw  xeyq  xgpk  xige  xjwy

### From the filesystem to Spark's HDFS filesystem

Spark can't directly read local files on the server.  Instead they need to be copied to the Hadoop distributed filesystem.  Let's start with a connection to HDFS...

In [11]:
# Next, for Spark we will need to copy the files to HDFS
# So first we need to connect to the HDFS filesystem

######
# From https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/
#
# Get fs handler from java gateway
######
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(sc._jsc.hadoopConfiguration())

# Make sure we have an empty directory in HDFS
fs.delete(Path('/in'), True)
fs.mkdirs(Path('/in'))


True

### Creating a Streaming (Microbatched) Query

As a first part of this notebook, we will look at doing *incremental computation* over data as it arrives.  To do this we'll use Spark's **microbatch** capability, where it incrementally re-runs a computation based on a "triggering" event (typically a delay).

Here's how this will work

1. We'll define a "stream query" that gets periodically executed, tracking, for each carrier and flight number, how many flights we've seen so far and the average delay.
1. The query will look for csv files as they are added into the `/in` directory, as a series of chunks in a stream.
1. We'll parse the CSV format into appropriate columns, arriving at a Spark dataframe.
1. This will receive an SQL table name, so we can use it in SparkSQL.


In [12]:
# We need this to set a schema
from pyspark.sql.types import StructType

# Here's the basic schema of ontime.csv (and its splits)
flightSchema = StructType().add("YEAR", "integer").add("MONTH", "integer")\
  .add("DAY_OF_MONTH", "integer").add("AIRLINE_ID", "integer")\
  .add("CARRIER", "string").add("FL_NUM", "integer").add("ORIGIN", "string")\
  .add("DEST", "string").add("ARR_DELAY", "double").add("CANCELLED", "double")
airlineSchema = StructType().add("IATA_CODE", "string").add("AIRLINE", "string")
airportSchema = StructType().add("IATA_CODE", "string").add("AIRPORT", "string")\
  .add("CITY", "string").add("STATE", "string").add("COUNTRY", "string")\
  .add("LATITUDE", "double").add("LONGITUDE", "double")

# The airlines
airlinesDF = spark.read.option("sep", ",").option("header", "true").\
  schema(airlineSchema).csv('airlines.csv')
airlinesDF.createOrReplaceTempView("airlines")

# The airports
airportsDF = spark.read.option("sep", ",").option("header", "true").\
  schema(airportSchema).csv('airports.csv')
airportsDF.createOrReplaceTempView("airports")

# This will be a stream, and in each microbatch we'll read one file at a time
flightsStreamDF = spark.readStream.option("sep", ",").option("header", "true").\
  option("maxFilesPerTrigger", 1).\
  schema(flightSchema).csv("/in/")
flightsStreamDF.createOrReplaceTempView("flights")


avg_delay = spark.sql("""select CARRIER,FL_NUM, ORIGIN, DEST, org.LATITUDE AS from_lat, org.LONGITUDE AS from_long,
                          dst.LATITUDE AS to_lat, dst.LONGITUDE AS to_long,
                          count(ARR_DELAY) as NbrFlights,
                          avg(ARR_DELAY) as avg_delay
                        from (flights f join airports org on f.origin=org.IATA_CODE) join airports dst on f.dest=dst.IATA_CODE
                        GROUP BY CARRIER, FL_NUM, ORIGIN, DEST, org.LATITUDE, org.LONGITUDE, dst.LATITUDE, dst.LONGITUDE
                        ORDER BY CARRIER, FL_NUM, ORIGIN, DEST""")


### Launching the Stream Query

Now we will have to do two things:
1. Launch the streaming query (which initially has no data)
1. Periodically add new data into the input stream (by copying a file into `/in`).
1. Show the updated query results.

This code below will only show the first 4 entries of the stream output, but you should be able to see that evolve over time.  You can press the Stop button to end the query (then execute `query.stop()` in the next cell) or let it run for a long time until it completes.

In [13]:
# We'll need this to periodically sleep
import time

# Start the query, run every 1 second, recompute the complete output, and in
# each case store it in-memory in a table called flight_info
query = avg_delay.writeStream.outputMode("complete").queryName("flight_info").format("memory").\
    trigger(processingTime='1 seconds').start()

# As the query is running, start copying files from /content into /in
# Then wait 3 sec for Spark to process them, and display the updated output
max = 25
cur = 0
for filename in os.listdir('/content/'):
  if filename.startswith('xaa'):
    cur += 1
    fs.copyFromLocalFile(Path('/content/' + filename),Path('/in'))
    time.sleep(3)
    print(filename)
    display(spark.sql("select * from flight_info").limit(4).toPandas())
    if cur >= max:
      break

query.stop()

xaaa


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaae


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaav


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaaf


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaab


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaaq


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaag


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaad


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaai


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaap


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaam


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaac


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaar


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaal


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaat


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay


xaax


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaan


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaaj


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaas


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaaw


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaau


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaay


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaao


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaah


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
1,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0
2,WN,910,SMF,DEN,38.69542,-121.59077,39.85841,-104.667,1,0.0
3,WN,925,SMF,GEG,38.69542,-121.59077,47.61986,-117.53384,1,0.0


xaaz


Unnamed: 0,CARRIER,FL_NUM,ORIGIN,DEST,from_lat,from_long,to_lat,to_long,NbrFlights,avg_delay
0,WN,668,TPA,BNA,27.97547,-82.53325,36.12448,-86.67818,1,1.0
1,WN,704,TPA,BHM,27.97547,-82.53325,33.56294,-86.75355,1,3.0
2,WN,792,SMF,BUR,38.69542,-121.59077,34.20062,-118.3585,1,6.0
3,WN,844,SJU,TPA,18.43942,-66.00183,27.97547,-82.53325,1,23.0


## Time-Windowed Processing

To this point, we've only looked at Spark Streaming from the context of incremental recomputation.  Of course, in many cases you want to do computation over temporal aspects of the data.

For this one we'll use the much bigger longitudinal dataset for 2015 on-time performance.  The schema is considerably bigger than the simpler `ontime.csv`.

For simplicity we will load the whole file into a single dataframe, without streaming.

In [14]:
!head 2015-ontime.csv

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
2015,1,1,4,AS,98,N407AS,ANC,SEA,0005,2354,-11,21,0015,205,194,169,1448,0404,4,0430,0408,-22,0,0,,,,,,
2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,0010,0002,-8,12,0014,280,279,263,2330,0737,4,0750,0741,-9,0,0,,,,,,
2015,1,1,4,US,840,N171US,SFO,CLT,0020,0018,-2,16,0034,286,293,266,2296,0800,11,0806,0811,5,0,0,,,,,,
2015,1,1,4,AA,258,N3HYAA,LAX,MIA,0020,0015,-5,15,0030,285,281,258,2342,0748,8,0805,0756,-9,0,0,,,,,,
2015,1,1,4,AS,135,N527AS,SEA,ANC,0025,0024,-1,11,0035,235,215,199,1448,0254,5,0320,0259,-21,0,0,,,,,,
2015,1,1,4,DL,806,N3730B,SFO,MSP,0025,0020,-5,18,0038,217,230,206,1589,0604,6,0602,0610,8,0,0,,,,

In [15]:
# We need this to set a schema
from pyspark.sql.types import StructType
from pyspark.sql.functions import window

fs.copyFromLocalFile(Path('/content/2015-ontime.csv'),Path('/2015-ontime.csv'))

# Here's the basic schema of ontime.csv (and its splits)
ontimeSchema = StructType().add("YEAR", "integer").add("MONTH", "integer")\
  .add("DAY", "integer").add("DAY_OF_WEEK", "integer").add("AIRLINE_ID", "string")\
  .add("FL_NUM", "integer").add("TAIL_NUM", "string").add("ORIGIN", "string")\
  .add("DEST", "string").add("SCH_DEPARTURE", "integer").add("DEPARTURE", "integer")\
  .add("DEP_DELAY","integer").add("TAXI_OUT","integer").add("WHEELS_OFF","integer")\
  .add("SCH_TIME","integer").add("ELAPSED_TIME","integer").add("AIR_TIME","integer")\
  .add("DISTANCE","integer").add("WHEELS_ON","integer").add("TAXI_IN","integer")\
  .add("SCH_ARRIVAL","integer").add("ARRIVAL_TIME","integer")\
  .add("ARR_DELAY", "integer").add("DIVERTED", "integer").add("CANCELLED", "integer")\
  .add("CANCELLATION_REASON","string").add("AIR_SYSTEM_DELAY", "integer")\
  .add("SECURITY_DELAY", "integer").add("AIRLINE_DELAY", "integer")\
  .add("LATE_AIRCRAFT_DELAY", "integer").add("WEATHER_DELAY", "integer")

# This will be a stream, and in each microbatch we'll read one file at a time
ontimeDF = spark.read.option("sep", ",").option("header", "true").\
  schema(ontimeSchema).csv("/2015-ontime.csv")
ontimeDF.createOrReplaceTempView("ontime")

display(ontimeDF.take(2))

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE_ID='AS', FL_NUM=98, TAIL_NUM='N407AS', ORIGIN='ANC', DEST='SEA', SCH_DEPARTURE=5, DEPARTURE=2354, DEP_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCH_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCH_ARRIVAL=430, ARRIVAL_TIME=408, ARR_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE_ID='AA', FL_NUM=2336, TAIL_NUM='N3KUAA', ORIGIN='LAX', DEST='PBI', SCH_DEPARTURE=10, DEPARTURE=2, DEP_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCH_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCH_ARRIVAL=750, ARRIVAL_TIME=741, ARR_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None)]

In [16]:
simplerDF = spark.sql("""select cast(concat(cast(YEAR as string), '-',
                      cast (MONTH as string), '-', CAST (DAY as string), ' ',
                      cast(cast (SCH_DEPARTURE / 100 as integer) as string), ':',
                           cast (SCH_DEPARTURE % 100 as string) ) as timestamp) as YMD,
                      AIRLINE_ID, ORIGIN, DEST, DISTANCE, ARR_DELAY,
                      (SCH_ARRIVAL - SCH_DEPARTURE + 2400) % 2400 as SCH_DURATION
                      from ontime""")
simplerDF.createOrReplaceTempView("simpler")

w = simplerDF.groupBy(
    window(simplerDF.YMD, "1 day", "1 day"),
    simplerDF.ORIGIN,
    simplerDF.DEST
).avg()

w

DataFrame[window: struct<start:timestamp,end:timestamp>, ORIGIN: string, DEST: string, avg(DISTANCE): double, avg(ARR_DELAY): double, avg(SCH_DURATION): double]

In [17]:
from pyspark.sql.functions import window, month, dayofmonth, hour


windowedDelays = w.select(month(w.window.start).alias("month"),\
                          dayofmonth(w.window.start).alias("day"),\
                          hour(w.window.start).alias("hour"),\
                          w.ORIGIN, w.DEST,\
               w['avg(DISTANCE)'].alias("distance"),
               w['avg(SCH_DURATION)'].alias("duration"),
               w['avg(ARR_DELAY)'].alias("delay"))

delayDF = windowedDelays.filter(windowedDelays.month == 2)\
  .orderBy(windowedDelays.delay.desc()).toPandas()

In [18]:
delayDF

Unnamed: 0,month,day,hour,ORIGIN,DEST,distance,duration,delay
0,2,9,0,JFK,HNL,4983.0,615.000000,1467.0
1,2,22,0,EGE,ORD,1007.0,338.000000,1460.0
2,2,8,0,HNL,JFK,4983.0,1425.000000,1391.0
3,2,27,0,DFW,HNL,3784.0,461.000000,1295.0
4,2,28,0,ORD,EGE,1007.0,190.000000,1235.0
...,...,...,...,...,...,...,...,...
104440,2,21,0,EWR,GSP,594.0,204.000000,
104441,2,21,0,DCA,CAK,274.0,155.000000,
104442,2,23,0,MLU,DFW,293.0,146.000000,
104443,2,23,0,GRK,DFW,134.0,90.555556,


In [19]:
import pandas as pd
import numpy as np

# Take the dataframe and one-hot encode the airport origins and destinations
main_df = pd.concat([delayDF, pd.get_dummies(delayDF[['ORIGIN']],prefix='ORIGIN', drop_first=True),
          pd.get_dummies(delayDF[['DEST']],prefix='DEST', drop_first=True)], axis=1)

main_df.drop(['ORIGIN','DEST'], axis=1, inplace=True)

# Drop outliers (more than an hour) and mark things that arrive early as having 0 delay
main_df['delay'] = main_df['delay'].apply(lambda x: x if x >= 0 and x < 60 else 0 if x < 0 else np.nan)

# Some entries, such as delay, show up with NaN
main_df.dropna(how='any', axis=0, inplace=True)

main_df

Unnamed: 0,month,day,hour,distance,duration,delay,ORIGIN_ABI,ORIGIN_ABQ,ORIGIN_ABR,ORIGIN_ABY,...,DEST_TYR,DEST_TYS,DEST_UST,DEST_VEL,DEST_VLD,DEST_VPS,DEST_WRG,DEST_XNA,DEST_YAK,DEST_YUM
5960,2,26,0,108.0,97.500000,59.888889,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
5961,2,2,0,762.0,220.461538,59.857143,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
5962,2,8,0,475.0,266.000000,59.833333,False,False,False,False,...,False,True,False,False,False,False,False,False,False,False
5963,2,20,0,1325.0,289.500000,59.833333,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
5964,2,25,0,997.0,303.833333,59.833333,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
102001,2,8,0,2520.0,885.000000,0.000000,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
102002,2,7,0,1504.0,370.000000,0.000000,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
102003,2,22,0,2607.0,428.000000,0.000000,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
102004,2,19,0,2520.0,885.000000,0.000000,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False


In [20]:
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

y = main_df['delay']
X = main_df.drop(['delay'], axis=1)

delay_X_train, delay_X_test, delay_y_train, delay_y_test = train_test_split(\
  X, y, test_size=0.20, random_state=42)
regr = linear_model.LinearRegression()

regr.fit(delay_X_train, delay_y_train)

In [21]:
X.columns

Index(['month', 'day', 'hour', 'distance', 'duration', 'ORIGIN_ABI',
       'ORIGIN_ABQ', 'ORIGIN_ABR', 'ORIGIN_ABY', 'ORIGIN_ACT',
       ...
       'DEST_TYR', 'DEST_TYS', 'DEST_UST', 'DEST_VEL', 'DEST_VLD', 'DEST_VPS',
       'DEST_WRG', 'DEST_XNA', 'DEST_YAK', 'DEST_YUM'],
      dtype='object', length=633)

In [22]:
y_pred = regr.predict(delay_X_test)

What do the errors look like?

In [23]:
y_pred - delay_y_test

Unnamed: 0,delay
52753,3.417698
31759,-3.500301
49075,5.828158
50030,2.927529
97828,4.715536
...,...
74926,5.469873
21647,-11.156651
27981,-1.449696
66845,3.146580


Let's analyze more systematically, using mean squared error and variance...

In [24]:
# The coefficients
print("# coefficients: \n", len(regr.coef_))
print('Coefficients: \n', regr.coef_)
# The mean squared error
print("Mean squared error: %.2f"
      % mean_squared_error(delay_y_test, y_pred))
# Explained variance score: 1 is perfect prediction
print('Variance score: %.2f' % r2_score(delay_y_test, y_pred))

# coefficients: 
 633
Coefficients: 
 [-1.00760127e-13  1.38388851e-01  9.20787752e-13 -4.15497318e-04
 -1.76129686e-03 -7.56785429e+00 -5.10192640e+00 -6.98906566e+00
 -5.60652894e+00 -6.94716771e+00 -5.39554104e+00 -3.82209604e+00
  2.20633020e+00 -9.92247795e+00 -3.13373076e+00 -1.34742466e+00
 -5.76914933e+00 -7.01590586e+00 -5.33075946e+00 -7.05234857e+00
 -3.04114202e+00 -3.27898236e+00 -4.35918828e+00 -5.58127036e+00
 -4.77540477e+00 -8.35488323e+00 -7.50383773e+00 -5.19446415e+00
 -3.35048582e+00 -3.70657082e+00 -8.29625668e+00  3.10726583e+00
 -3.14503607e+00 -5.78197463e+00 -7.78811496e+00 -3.19714586e+00
 -1.04445061e+01 -7.68179712e+00 -3.01575677e+00 -4.69474007e+00
 -7.28075213e+00  2.67339262e+00 -8.36236565e-01 -6.84787742e+00
 -6.79253404e+00 -7.76998073e+00 -6.00406621e+00 -4.02522344e+00
 -9.24055373e+00 -2.35391570e+00 -1.15162081e+00 -3.86191738e+00
 -4.96735007e+00 -2.79549435e+00 -6.31699990e+00 -1.31908958e+00
 -7.00200299e+00 -7.85182743e+00 -6.74958438e+00 -6.

## Exercise

See if you can combine the incremental stream processing portion of the notebook with the time window-based computation!

### Autograder setup

In [25]:
#PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO
#TO ASSIGN POINTS TO YOU IN OUR BACKEND
STUDENT_ID = 64660501 # YOUR PENN-ID GOES HERE AS AN INTEGER##PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO

In [26]:
%%writefile notebook-config.yaml

grader_api_url: 'https://23whrwph9h.execute-api.us-east-1.amazonaws.com/default/Grader23'
grader_api_key: 'flfkE736fA6Z8GxMDJe2q8Kfk8UDqjsG3GVqOFOa'

Writing notebook-config.yaml


In [27]:
%set_env HW_ID=cis5450_25f_HW9

env: HW_ID=cis5450_25f_HW9


In [28]:
!pip3 install penngrader-client

Collecting penngrader-client
  Downloading penngrader_client-0.5.2-py3-none-any.whl.metadata (15 kB)
Downloading penngrader_client-0.5.2-py3-none-any.whl (10 kB)
Installing collected packages: penngrader-client
Successfully installed penngrader-client-0.5.2


In [29]:
import os
from penngrader.grader import *

grader = PennGrader('notebook-config.yaml', os.environ['HW_ID'], STUDENT_ID, STUDENT_ID)

PennGrader initialized with Student ID: 64660501

Make sure this correct or we will not be able to store your grade


In [30]:
delay_X_train

Unnamed: 0,month,day,hour,distance,duration,ORIGIN_ABI,ORIGIN_ABQ,ORIGIN_ABR,ORIGIN_ABY,ORIGIN_ACT,...,DEST_TYR,DEST_TYS,DEST_UST,DEST_VEL,DEST_VLD,DEST_VPS,DEST_WRG,DEST_XNA,DEST_YAK,DEST_YUM
73587,2,14,0,1634.0,530.000000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
79526,2,27,0,632.0,241.200000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
78556,2,26,0,733.0,294.000000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
21978,2,10,0,125.0,87.333333,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
66452,2,11,0,569.0,261.750000,False,True,False,False,False,...,False,False,False,False,False,False,False,False,False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12225,2,16,0,1506.0,537.000000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
60846,2,24,0,216.0,105.000000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
82780,2,20,0,574.0,120.500000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
6820,2,19,0,833.0,315.500000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False


In [31]:
delay_X_test

Unnamed: 0,month,day,hour,distance,duration,ORIGIN_ABI,ORIGIN_ABQ,ORIGIN_ABR,ORIGIN_ABY,ORIGIN_ACT,...,DEST_TYR,DEST_TYS,DEST_UST,DEST_VEL,DEST_VLD,DEST_VPS,DEST_WRG,DEST_XNA,DEST_YAK,DEST_YUM
52753,2,16,0,1927.0,601.333333,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
31759,2,8,0,1008.0,374.750000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
49075,2,12,0,533.0,252.500000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
50030,2,18,0,2640.0,765.250000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
97828,2,12,0,846.0,222.000000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
74926,2,13,0,1139.0,118.600000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
21647,2,19,0,1067.0,102.500000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
27981,2,10,0,1250.0,269.142857,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
66845,2,5,0,493.0,261.500000,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False


# Let's train over the training set

Use the VectorAssembler to convert inputCols into a vector, stored in a column called `features`.  Apply to the trainign data.

Then use MLLib's LinearRegression to train a model, much as SciKit would do it.

In [33]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# TODO: Use the VectorAssembler to convert inputCols into a vector in a column called `features`
# and be careful to not include the otuput you're predicting
vecAssembler = VectorAssembler(
    inputCols=list(delay_X_train.columns),
    outputCol="features")

if "features" in delay_X_train.columns:
  delay_X_train.drop(columns=["features"], inplace=True)

labeled_X_train = delay_X_train
labeled_X_train['result'] = delay_y_train
training = vecAssembler.transform(spark.createDataFrame(labeled_X_train))

# TODO: create an MLLib LinearRegression with the appropriate features and predictions
lr = LinearRegression(featuresCol="features", labelCol="result")

lr.fit(training)


LinearRegressionModel: uid=LinearRegression_ce554dee131c, numFeatures=633

In [34]:
df2 = training[['features','result']].take(3)

grader.grade('ml_features', str(df2))

Correct! You earned 1/1 points. You are a star!

Your submission has been successfully recorded in the gradebook.


Let's see if prediction works!

In [35]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
import traceback

def convert_to_batch(series: np.ndarray) -> np.ndarray:
  """
  Helper function to convert from PySpark (which sends a pd.Series of components)
  to an ndarray.
  """
  X_batch_list = []
  model_local = bc_model.value # This is our shared ML model, broadcast

  for i, v_raw in enumerate(series):
      # Expecting v_raw to be a numpy.ndarray of shape (4,)
      # containing [type_code, size, indices_array, values_array]
      if isinstance(v_raw, np.ndarray) and v_raw.shape == (4,):
          vector_type_code = v_raw[0]
          vector_size = v_raw[1]
          indices = v_raw[2]
          values = v_raw[3]

          if vector_type_code == 0: # Sparse vector
              if not isinstance(indices, np.ndarray) or not isinstance(values, np.ndarray):
                  X_batch_list.append(np.full(model_local.n_features_in_, np.nan))
                  continue

              dense_vector = np.zeros(vector_size, dtype=float)
              # Populate the dense vector from sparse components
              for idx, val in zip(indices, values):
                  dense_vector[idx] = val
              X_batch_list.append(dense_vector)
          elif vector_type_code == 1: # Dense vector (values is already the dense array)
              if isinstance(values, np.ndarray):
                  X_batch_list.append(values)
              else:
                    X_batch_list.append(np.full(model_local.n_features_in_, np.nan))
          else:
              X_batch_list.append(np.full(model_local.n_features_in_, np.nan))
      else:
          # Fallback if feature count can be determined
          if hasattr(model_local, 'n_features_in_'):
              X_batch_list.append(np.full(model_local.n_features_in_, np.nan))
          else:
              raise TypeError("Cannot process vector elements and model feature count is unknown.")

  return np.vstack(X_batch_list)


@pandas_udf("float")
def predict_delays(features_series: pd.Series) -> pd.Series:
    """
    Pandas UDF to run scikit-learn linear regression on a batch of Spark Vectors.
    """
    try:
        if features_series.empty:
            return pd.Series([np.nan] * 1)

        X_batch = convert_to_batch(features_series.values)
        if X_batch.size == 0:
            return pd.Series([np.nan] * len(features_series))

        model = bc_model.value
        if not hasattr(model, 'coef_') or not hasattr(model, 'n_features_in_'):
            raise AttributeError("Broadcasted model does not have expected 'coef_' or 'n_features_in_' attributes.")

        if X_batch.shape[1] != model.n_features_in_:
            raise ValueError(f"Feature count mismatch: X_batch has {X_batch.shape[1]} features, but model expects {model.n_features_in_}.")

        predictions = model.predict(X_batch)
        return pd.Series(predictions)

    except Exception as e:
        return pd.Series([np.nan] * len(features_series))

bc_model = spark.sparkContext.broadcast(regr)

df = data.withColumn("predicted_y", predict_delays(data['features']))
df.show()

NameError: name 'data' is not defined