# Pyspark on Colab

This Colab notebook provides a *free* implementation of Pyspark and is <u>much kinder to your GCP budget.</u>

The implementation works but it is not implemented using the type of infrastructure that AWS or GCP provide.

To begin,

1. *Copy this notebook into Google drive,*
2. *Make a fresh copy for every project or assignment,*
3. Runtime | Run all to validate its functioning,
4. <u><b>Only then</b></u> should you add your own code.

## Shell Gymnastics

Some commands to exercise the shell

In [1]:
shell = "$SHELL"
!echo {shell}
!printf "My shell is %s\n" "{shell}"
now = "$(date)"
!printf "The date is %s\n" "{now}"

/bin/bash
My shell is /bin/bash
The date is Wed Mar  5 09:45:44 PM UTC 2025


### JDK Setup

In [2]:
#Installing java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# -q, quiet level 2: no output except for errors
#> /dev/null on the end of any command where you want to redirect all the stdout into nothingness
#Checking the installed Java version
!java -version

openjdk version "11.0.26" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26+4-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.26+4-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


### Install Spark


In [3]:
!pip install -q findspark
!pip install pyspark



In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
sc = spark.sparkContext

#### Test PySpark RDD operations.



In [6]:
sc = spark.sparkContext

#### Pyspark Wide Transformations

![](https://media.licdn.com/dms/image/v2/D4D12AQFvdf6KAsGdiA/article-cover_image-shrink_423_752/article-cover_image-shrink_423_752/0/1682015360609?e=1734566400&v=beta&t=sC7wrA8KDuW1os3ZTTXqzYniViDI4L7BkgLjsxG8J5w)

In [7]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
cogroup = str(list((x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))))

assert cogroup == "[('a', ([1], [2])), ('b', ([4], []))]", "cogroup operation fails"
cogroup

"[('a', ([1], [2])), ('b', ([4], []))]"

In [8]:
join = str(x.join(y).collect())

assert join == "[('a', (1, 2))]", "join operation fails"
join

"[('a', (1, 2))]"

In [9]:
from pyspark import SparkContext

# Create an RDD with key-value pairs
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

# Group the RDD by key
grouped_rdd = rdd.groupByKey()

# Print the result
print(grouped_rdd.collect())
# Output: [('a', [1, 1]), ('b', [1])]
for key, value in grouped_rdd.collect():
    print(key, list(value))

[('b', <pyspark.resultiterable.ResultIterable object at 0x787d50178250>), ('a', <pyspark.resultiterable.ResultIterable object at 0x787d39c3e910>)]
b [1]
a [1, 1]


#### Test PySpark Dataframe Creation

In [10]:
data = [('James',3000),('Anna',4001),('Robert',6200)]
df = spark.createDataFrame(data,["name","salary"])
# df.show()

#converts DataFrame to rdd
rdd=df.rdd

assert str(rdd.collect()) == "[Row(name='James', salary=3000), Row(name='Anna', salary=4001), Row(name='Robert', salary=6200)]", "rdd.collect fails"
str(rdd.collect())
rdd.collect()

[Row(name='James', salary=3000),
 Row(name='Anna', salary=4001),
 Row(name='Robert', salary=6200)]

#### Create Employees and Departments Tables

In [11]:
# Employees Table
emp = [(1,"Smith",-1,33,"2018","10","M",3000), \
    (2,"Rose",1,24,"2010","20","M",4000), \
    (3,"Williams",1,20,"2010","10","M",1000), \
    (4,"Jones",2,22,"2005","10","F",2000), \
    (5,"Brown",2,18,"2010","40","",-1), \
      (6,"Brown",2,19,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","boss_id","age","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- boss_id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+-------+---+-----------+-----------+------+------+
|emp_id|name    |boss_id|age|year_joined|emp_dept_id|gender|salary|
+------+--------+-------+---+-----------+-----------+------+------+
|1     |Smith   |-1     |33 |2018       |10         |M     |3000  |
|2     |Rose    |1      |24 |2010       |20         |M     |4000  |
|3     |Williams|1      |20 |2010       |10         |M     |1000  |
|4     |Jones   |2      |22 |2005       |10         |F     |2000  |
|5     |Brown   |2      |18 |2010       |40         |      |-1    |
|6     |Brown   |2      |19 |2010       |50         |      |-1    |
+------+--------+-------+---+-----------+-----------+------+------+



In [12]:
# Average age by department
grouped_df = empDF.groupBy("emp_dept_id").avg("age")
grouped_df.show()

+-----------+--------+
|emp_dept_id|avg(age)|
+-----------+--------+
|         20|    24.0|
|         10|    25.0|
|         40|    18.0|
|         50|    19.0|
+-----------+--------+



In [13]:
# Departments Table
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40), \
    ("Services",50) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
|Services |50     |
+---------+-------+



#### Test Table Joins

In [14]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

+------+--------+-------+---+-----------+-----------+------+------+---------+-------+
|emp_id|name    |boss_id|age|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+-------+---+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1     |33 |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1      |20 |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2      |22 |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1      |24 |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2      |18 |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2      |19 |2010       |50         |      |-1    |Services |50     |
+------+--------+-------+---+-----------+-----------+------+------+---------+-------+

+------+--------+-------+----+-----------+-----------+------+------+---------+-------+
|emp_id|name    |boss_id|age |year_joined|emp_dept_i

#### Test Reading a JSON file from the web

In [15]:
import pandas as pd
zipDF = pd.read_json('https://github.com/qmmr/mongodb/raw/master/zips.json', lines=True)
zipDF

Unnamed: 0,city,loc,pop,state,_id
0,ACMAR,"[-86.51557, 33.584132]",6055,AL,35004
1,ADAMSVILLE,"[-86.959727, 33.588437]",10616,AL,35005
2,ADGER,"[-87.167455, 33.434277]",3205,AL,35006
3,KEYSTONE,"[-86.812861, 33.236868]",14218,AL,35007
4,NEW SITE,"[-85.951086, 32.941445]",19942,AL,35010
...,...,...,...,...,...
29465,FREEDOM,"[-111.029178, 43.017167]",212,WY,83120
29466,GROVER,"[-110.924392, 42.796472]",335,WY,83122
29467,LA BARGE,"[-110.210865, 42.24734]",606,WY,83123
29468,SMOOT,"[-110.922351, 42.619238]",414,WY,83126


In [16]:
!rm -rf big-data-repo/
!git clone https://github.com/singhj/big-data-repo.git

Cloning into 'big-data-repo'...
remote: Enumerating objects: 630, done.[K
remote: Counting objects: 100% (207/207), done.[K
remote: Compressing objects: 100% (76/76), done.[K
remote: Total 630 (delta 163), reused 166 (delta 131), pack-reused 423 (from 1)[K
Receiving objects: 100% (630/630), 56.67 MiB | 33.98 MiB/s, done.
Resolving deltas: 100% (341/341), done.


# HW 4 - Airplane Analysis

In [136]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, minute, to_timestamp, when, lit, expr, avg, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType

In [139]:
from pyspark.sql.types import StructType, StructField, StringType

# Create a schema with all StringType fields
flight_schema = StructType([
    StructField("Carrier", StringType(), True),                        # A
    StructField("FlightNumber", StringType(), True),                   # B
    StructField("DepartureAirport", StringType(), True),               # C
    StructField("ArrivalAirport", StringType(), True),                 # D
    StructField("FlightDate", StringType(), True),                     # E
    StructField("DayOfWeek", StringType(), True),                      # F
    StructField("ScheduledDepartureTimeOAG", StringType(), True),      # G
    StructField("ScheduledDepartureTimeCRS", StringType(), True),      # H
    StructField("ActualDepartureTime", StringType(), True),            # I
    StructField("ScheduledArrivalTimeOAG", StringType(), True),        # J
    StructField("ScheduledArrivalTimeCRS", StringType(), True),        # K
    StructField("ActualArrivalTime", StringType(), True),              # L
    StructField("OAGvsCRSDepartureTimeDiff", StringType(), True),      # M
    StructField("OAGvsCRSArrivalTimeDiff", StringType(), True),        # N
    StructField("ScheduledElapsedTimeCRS", StringType(), True),        # O
    StructField("ActualGateToGateTime", StringType(), True),           # P
    StructField("DepartureDelay", StringType(), True),                 # Q
    StructField("ArrivalDelay", StringType(), True),                   # R
    StructField("ElapsedTimeDifference", StringType(), True),          # S
    StructField("WheelsOffTime", StringType(), True),                  # T
    StructField("WheelsOnTime", StringType(), True),                   # U
    StructField("AircraftTailNumber", StringType(), True),             # V
    StructField("CancellationCode", StringType(), True),               # W
    StructField("DelayMinutesCodeE", StringType(), True),              # X
    StructField("DelayMinutesCodeF", StringType(), True),              # Y
    StructField("DelayMinutesCodeG", StringType(), True),              # Z
    StructField("DelayMinutesCodeH", StringType(), True)               # AA
])

def clean_df(df):
    # Select the specific columns we need, skipping the undocumented ones
    columns_to_keep = [
        df.columns[0],   # A: Carrier
        df.columns[1],   # B: Flight Number
        df.columns[6],   # C: Departure Airport (skipping columns 2-5)
        df.columns[7],   # D: Arrival Airport
        df.columns[8],   # E: Flight Date
        df.columns[9],   # F: Day of Week
        df.columns[10],  # G: Scheduled Departure (OAG)
        # Continue with the remaining columns...
        df.columns[11], df.columns[12], df.columns[13], df.columns[14],
        df.columns[15], df.columns[16], df.columns[17], df.columns[18],
        df.columns[19], df.columns[20], df.columns[21], df.columns[22],
        df.columns[23], df.columns[24], df.columns[25], df.columns[26],
        df.columns[27], df.columns[28], df.columns[29], df.columns[30]
    ]

    # Select only these columns
    cleaned_df = df.select(columns_to_keep)

    return cleaned_df

In [140]:
import os
os.getcwd()

'/content'

In [141]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [142]:
file_path_month1 = "/content/drive/My Drive/Colab Notebooks/072024.asc"
file_path_month2 = "/content/drive/My Drive/Colab Notebooks/062024.asc"

In [143]:
print("file1 :" , os.path.exists(file_path_month1))
print("file2 :" , os.path.exists(file_path_month2))

file1 : True
file2 : True


In [144]:
flight_df_month1 = spark.read.option("delimiter", "|") \
    .option("header", "false") \
    .csv(file_path_month1)
    # .schema(flight_schema) \ -- to check the format first (as instructed the file actually might have 4 more columns than stated in the provided schema )

In [51]:
flight_df_month1.show(3)

+---+----+----+----+---+----+---+---+--------+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+----+
|_c0| _c1| _c2| _c3|_c4| _c5|_c6|_c7|     _c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|  _c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c49|_c50|_c51|_c52|_c53|_c54|_c55|_c56|_c57|_c58|_c59|_c60|_c61|_c62|_c63|_c64|_c65|_c66|_c67|_c68|  _c69|_c70|
+---+----+----+----+---+----+---+---+--------+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---

In [145]:
flight_df_month2 = spark.read.option("delimiter", "|") \
    .option("header", "false") \
    .csv(file_path_month2)

In [147]:
flight_df_month1_clean = clean_df(flight_df_month1)
flight_df_month2_clean = clean_df(flight_df_month2)
combined_df = flight_df_month1_clean.unionAll(flight_df_month2_clean)

# Apply the schema to your dataframe
combined_df_with_schema = spark.createDataFrame(combined_df.rdd, flight_schema)

In [148]:
# Now you should be able to view it
combined_df_with_schema.show(3)

+-------+------------+----------------+--------------+----------+---------+-------------------------+-------------------------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------------+-----------------------+--------------------+--------------+------------+---------------------+-------------+------------+------------------+----------------+-----------------+-----------------+-----------------+-----------------+
|Carrier|FlightNumber|DepartureAirport|ArrivalAirport|FlightDate|DayOfWeek|ScheduledDepartureTimeOAG|ScheduledDepartureTimeCRS|ActualDepartureTime|ScheduledArrivalTimeOAG|ScheduledArrivalTimeCRS|ActualArrivalTime|OAGvsCRSDepartureTimeDiff|OAGvsCRSArrivalTimeDiff|ScheduledElapsedTimeCRS|ActualGateToGateTime|DepartureDelay|ArrivalDelay|ElapsedTimeDifference|WheelsOffTime|WheelsOnTime|AircraftTailNumber|CancellationCode|DelayMinutesCodeE|DelayMinutesCodeF|DelayMinutesCodeG|DelayMinutesCodeH|
+-------+---

In [149]:
print('\nCombined DataFrame with Schema:')
combined_df_with_schema.printSchema()


Combined DataFrame with Schema:
root
 |-- Carrier: string (nullable = true)
 |-- FlightNumber: string (nullable = true)
 |-- DepartureAirport: string (nullable = true)
 |-- ArrivalAirport: string (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- ScheduledDepartureTimeOAG: string (nullable = true)
 |-- ScheduledDepartureTimeCRS: string (nullable = true)
 |-- ActualDepartureTime: string (nullable = true)
 |-- ScheduledArrivalTimeOAG: string (nullable = true)
 |-- ScheduledArrivalTimeCRS: string (nullable = true)
 |-- ActualArrivalTime: string (nullable = true)
 |-- OAGvsCRSDepartureTimeDiff: string (nullable = true)
 |-- OAGvsCRSArrivalTimeDiff: string (nullable = true)
 |-- ScheduledElapsedTimeCRS: string (nullable = true)
 |-- ActualGateToGateTime: string (nullable = true)
 |-- DepartureDelay: string (nullable = true)
 |-- ArrivalDelay: string (nullable = true)
 |-- ElapsedTimeDifference: string (nullable = true)
 |-- WheelsOffTi

In [159]:
from pyspark.sql.functions import avg, col, when, lit, sum


In [150]:
delay_analysis = combined_df_with_schema.withColumn(
    "ArrivalDelayMinutes", col("ArrivalDelay").cast("int")
)

In [152]:
delay_analysis.show(3)

+-------+------------+----------------+--------------+----------+---------+-------------------------+-------------------------+-------------------+-----------------------+-----------------------+-----------------+-------------------------+-----------------------+-----------------------+--------------------+--------------+------------+---------------------+-------------+------------+------------------+----------------+-----------------+-----------------+-----------------+-----------------+-------------------+
|Carrier|FlightNumber|DepartureAirport|ArrivalAirport|FlightDate|DayOfWeek|ScheduledDepartureTimeOAG|ScheduledDepartureTimeCRS|ActualDepartureTime|ScheduledArrivalTimeOAG|ScheduledArrivalTimeCRS|ActualArrivalTime|OAGvsCRSDepartureTimeDiff|OAGvsCRSArrivalTimeDiff|ScheduledElapsedTimeCRS|ActualGateToGateTime|DepartureDelay|ArrivalDelay|ElapsedTimeDifference|WheelsOffTime|WheelsOnTime|AircraftTailNumber|CancellationCode|DelayMinutesCodeE|DelayMinutesCodeF|DelayMinutesCodeG|DelayMinute

In [157]:
# average delays by carrier
carrier_delays = delay_analysis.groupBy("Carrier").agg(avg("ArrivalDelayMinutes").alias("AverageDelay")).sort(col("AverageDelay").asc())
carrier_delays.show(5)

+-------+------------------+
|Carrier|      AverageDelay|
+-------+------------------+
|     HA| 4.851308042036223|
|     AS| 6.690976365074471|
|     WN|11.552964896861914|
|     UA| 14.03860860718458|
|     DL| 14.15234606099134|
+-------+------------------+
only showing top 5 rows



In [156]:
delay_analysis = delay_analysis.withColumn(
    "ActualDepartureTime", col("ActualDepartureTime").cast("int")
)

delay_by_departure_analysis = delay_analysis.withColumn(
    "DepartureTimeSeg",
    when((600 <= col("ActualDepartureTime")) & (col("ActualDepartureTime") < 1000) , "morning")
    .when((1000 <= col("ActualDepartureTime")) & (col("ActualDepartureTime") < 1400) , "mid-day")
    .when((1400 <= col("ActualDepartureTime")) & (col("ActualDepartureTime") < 1800) , "afternoon")
    .when((1800 <= col("ActualDepartureTime")) & (col("ActualDepartureTime") < 2200) , "evening")
    .otherwise("night")
)

delay_by_departure = delay_by_departure_analysis.groupBy("DepartureTimeSeg").agg(\
                                                                                 avg("ArrivalDelay")\
                                                                                 .alias("AverageDelay"))\
                                                                                 .sort(col("AverageDelay").asc())
delay_by_departure.show()

+----------------+------------------+
|DepartureTimeSeg|      AverageDelay|
+----------------+------------------+
|         morning| 1.365927678617355|
|         mid-day|  9.47185567625246|
|       afternoon|18.127534559007593|
|           night|25.276052812942073|
|         evening|28.788535376021247|
+----------------+------------------+



In [162]:
delay_analysis_by_airports = delay_analysis.withColumn(
    "DepartureDelayMinutes", col("DepartureDelay").cast("int")
)

delay_time_by_airports_dep = delay_analysis_by_airports.groupBy("DepartureAirport").agg(\
                                                                                        sum(col("DepartureDelayMinutes"))\
                                                                                        .alias("TotalDepartureDelay"))\
                                                                                        .withColumnRenamed("DepartureAirport", "Airport")
delay_cnt_by_airports_dep = delay_analysis_by_airports.groupBy("DepartureAirport").agg(\
                                                                                       sum(when(col("DepartureDelayMinutes") > 0, 1))\
                                                                                       .alias("DepartureDelayCnt"))\
                                                                                       .withColumnRenamed("DepartureAirport", "Airport")


delay_time_by_airports_arr = delay_analysis_by_airports.groupBy("ArrivalAirport").agg(\
                                                                                        sum(col("ArrivalDelayMinutes"))\
                                                                                        .alias("TotalArrivalDelay"))\
                                                                                        .withColumnRenamed("ArrivalAirport", "Airport")
delay_cnt_by_airports_arr = delay_analysis_by_airports.groupBy("ArrivalAirport").agg(\
                                                                                       sum(when(col("ArrivalDelayMinutes") > 0, 1))\
                                                                                       .alias("ArrivalDelayCnt"))\                                                                                       .withColumnRenamed("ArrivalAirport", "Airport")

delay_by_airports = delay_time_by_airports_dep.join(delay_cnt_by_airports_dep, "Airport", "left").join(delay_time_by_airports_arr, "Airport", "left").join(delay_cnt_by_airports_arr, "Airport", "left")

delay_by_airports.show()

+-------+-------------------+-----------------+-----------------+---------------+
|Airport|TotalDepartureDelay|DepartureDelayCnt|TotalArrivalDelay|ArrivalDelayCnt|
+-------+-------------------+-----------------+-----------------+---------------+
|    BGM|               1221|               18|              357|             20|
|    DLG|                -56|               30|               42|             31|
|    PSE|               2865|               63|             4168|             85|
|    MSY|             186132|             4107|           139062|           4113|
|    PPG|               1289|               14|              232|             14|
|    GEG|              38291|             1400|            50205|           1908|
|    SNA|              97340|             2673|            54809|           2688|
|    BUR|              66411|             2329|            49690|           2300|
|    GRB|              14601|              263|            17217|            416|
|    GTF|       

In [164]:
delay_analysis_complete = delay_by_airports.withColumn(
    "TotalDelayMinutes", col("TotalDepartureDelay") + col("TotalArrivalDelay")
).withColumn(
    "TotalDelayCount", col("DepartureDelayCnt") + col("ArrivalDelayCnt")
).withColumn(
    "AverageDelayDuration",
    when(col("TotalDelayCount") > 0, col("TotalDelayMinutes") / col("TotalDelayCount")).otherwise(0)

)

most_frequent_delays = delay_analysis_complete.orderBy(col("TotalDelayCount").desc())
most_frequent_delays.select(
    "Airport",
    "TotalDelayCount",
    "TotalDelayMinutes",
    "AverageDelayDuration"
).show(10)

+-------+---------------+-----------------+--------------------+
|Airport|TotalDelayCount|TotalDelayMinutes|AverageDelayDuration|
+-------+---------------+-----------------+--------------------+
|    DFW|          56661|          2775378|  48.982157039233336|
|    DEN|          49853|          1643310|   32.96311154795097|
|    ORD|          48382|          2220672|  45.898722665454095|
|    ATL|          48017|          2137658|   44.51877460066227|
|    CLT|          40421|          2165592|   53.57591351030405|
|    SEA|          33792|           779207|  23.058919270833332|
|    LAS|          31974|          1091102|   34.12466378932883|
|    IAH|          28461|          1435387|   50.43347036295281|
|    LAX|          28119|           926724|   32.95721753974181|
|    PHX|          27419|           966771|    35.2591633538787|
+-------+---------------+-----------------+--------------------+
only showing top 10 rows



In [167]:
dep_airport_cnt = delay_analysis_by_airports.groupBy("DepartureAirport").agg(\
                                                                             count("*")\
                                                                             .alias("DepartureCnt"))\
                                                                             .withColumnRenamed("DepartureAirport", "Airport")

arr_airport_cnt = delay_analysis_by_airports.groupBy("ArrivalAirport").agg(\
                                                                           count("*")\
                                                                           .alias("ArrivalCnt"))\
                                                                           .withColumnRenamed("ArrivalAirport", "Airport")

busy_airport = dep_airport_cnt.join(arr_airport_cnt, "Airport", "outer")


busy_airport.withColumn("TotalCnt", col("DepartureCnt") + col("ArrivalCnt")).sort(col("TotalCnt").desc()).show(10)

+-------+------------+----------+--------+
|Airport|DepartureCnt|ArrivalCnt|TotalCnt|
+-------+------------+----------+--------+
|    ATL|       59567|     59561|  119128|
|    DFW|       57164|     57155|  114319|
|    DEN|       55986|     55976|  111962|
|    ORD|       55735|     55734|  111469|
|    CLT|       43837|     43834|   87671|
|    SEA|       35554|     35560|   71114|
|    LAX|       34465|     34467|   68932|
|    LAS|       32205|     32213|   64418|
|    PHX|       31251|     31242|   62493|
|    IAH|       30165|     30167|   60332|
+-------+------------+----------+--------+
only showing top 10 rows



#Question 5

In [173]:
%cd big-data-repo/text-proc/poe-stories

/content/big-data-repo/text-proc/poe-stories


In [174]:
!ls

A_DESCENT_INTO_THE_MAELSTROM	      THE_FALL_OF_THE_HOUSE_OF_USHER
BERENICE			      THE_IMP_OF_THE_PERVERSE
ELEONORA			      THE_ISLAND_OF_THE_FAY
LANDORS_COTTAGE			      THE_MASQUE_OF_THE_RED_DEATH
MESMERIC_REVELATION		      THE_PIT_AND_THE_PENDULUM
SILENCE-A_FABLE			      THE_PREMATURE_BURIAL
THE_ASSIGNATION			      THE_PURLOINED_LETTER
THE_BLACK_CAT			      THE_THOUSAND-AND-SECOND_TALE_OF_SCHEHERAZADE
THE_CASK_OF_AMONTILLADO		      VON_KEMPELEN_AND_HIS_DISCOVERY
THE_DOMAIN_OF_ARNHEIM		      WILLIAM_WILSON
THE_FACTS_IN_THE_CASE_OF_M._VALDEMAR


In [175]:
import glob

In [180]:
text_files = glob.glob('*')

for file_name in text_files:
  print(f"-{file_name}")

-THE_ISLAND_OF_THE_FAY
-MESMERIC_REVELATION
-THE_FACTS_IN_THE_CASE_OF_M._VALDEMAR
-THE_BLACK_CAT
-THE_MASQUE_OF_THE_RED_DEATH
-THE_PIT_AND_THE_PENDULUM
-THE_IMP_OF_THE_PERVERSE
-THE_FALL_OF_THE_HOUSE_OF_USHER
-WILLIAM_WILSON
-ELEONORA
-THE_THOUSAND-AND-SECOND_TALE_OF_SCHEHERAZADE
-BERENICE
-THE_CASK_OF_AMONTILLADO
-THE_PURLOINED_LETTER
-THE_PREMATURE_BURIAL
-THE_DOMAIN_OF_ARNHEIM
-A_DESCENT_INTO_THE_MAELSTROM
-THE_ASSIGNATION
-VON_KEMPELEN_AND_HIS_DISCOVERY
-LANDORS_COTTAGE
-SILENCE-A_FABLE


In [182]:
for file_name in text_files:
  with open(file_name, 'r') as file:
    content = file.read()
    print(f"{file_name} : {len(content)} characters, {len(content.split())} words")

THE_ISLAND_OF_THE_FAY : 12254 characters, 1939 words
MESMERIC_REVELATION : 23764 characters, 3679 words
THE_FACTS_IN_THE_CASE_OF_M._VALDEMAR : 22522 characters, 3481 words
THE_BLACK_CAT : 23807 characters, 3848 words
THE_MASQUE_OF_THE_RED_DEATH : 15020 characters, 2393 words
THE_PIT_AND_THE_PENDULUM : 37276 characters, 6050 words
THE_IMP_OF_THE_PERVERSE : 15209 characters, 2392 words
THE_FALL_OF_THE_HOUSE_OF_USHER : 45711 characters, 7061 words
WILLIAM_WILSON : 50710 characters, 7935 words
ELEONORA : 14713 characters, 2399 words
THE_THOUSAND-AND-SECOND_TALE_OF_SCHEHERAZADE : 35023 characters, 5606 words
BERENICE : 20280 characters, 3132 words
THE_CASK_OF_AMONTILLADO : 308902 characters, 48593 words
THE_PURLOINED_LETTER : 44623 characters, 6974 words
THE_PREMATURE_BURIAL : 34951 characters, 5373 words
THE_DOMAIN_OF_ARNHEIM : 38089 characters, 5815 words
A_DESCENT_INTO_THE_MAELSTROM : 41752 characters, 6923 words
THE_ASSIGNATION : 28012 characters, 4354 words
VON_KEMPELEN_AND_HIS_DISCOVE

In [183]:
import requests, re
stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
stopwords = list(set(stopwords_list.decode().splitlines()))


def remove_stopwords(words):
    list_ = re.sub(r'[^a-zA-Z0-9]', " ", words.lower()).split()
    return [itm for itm in list_ if itm not in stopwords]

import re, string
stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
stopwords = list(set(stopwords_list.decode().splitlines()))


def clean_text(text):
    text = text.lower()
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub(r'[%s]' % re.escape(string.punctuation), ' ', text)
    text = re.sub(r'[\d]+', ' ', text)
    return ' '.join(remove_stopwords(text))


In [189]:
cleaned_text = {}

for file_name in text_files:
  with open(file_name, 'r') as file:
    content = file.read()
    clean_content = clean_text(content)
    cleaned_text[file_name] = clean_content

cleaned_text[text_files[0]]

'nullus enim locus sine genio servius musique marmontel contes moraux translations insisted calling moral tales mockery spirit musique seul des talents qui jouissent lui tous autres veulent des temoins confounds pleasure derivable sweet sounds capacity creating talent music susceptible complete enjoyment party exercise common talents produces effects fully enjoyed solitude idea raconteur failed entertain sacrificed expression national love point doubtless tenable higher order music estimated exclusively proposition form admitted love lyre sake spiritual pleasure reach fallen mortality owes music accessory sentiment seclusion happiness experienced contemplation natural scenery truth man behold aright glory god earth solitude behold glory presence human life life form green things grow soil voiceless stain landscape war genius scene love regard dark valleys gray rocks waters silently smile forests sigh uneasy slumbers watchful mountains love regard colossal members vast animate sentient 

In [322]:
txt_df = spark.createDataFrame(cleaned_text.items(), ["file_name", "text"])
txt_df.show(truncate=False)

+--------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [206]:
select_df = txt_df.filter(col("file_name") == "A_DESCENT_INTO_THE_MAELSTROM")
paragraph = select_df.collect()[0]["text"]

In [209]:
nltk.download('averaged_perceptron_tagger_eng')

[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger_eng.zip.


True

In [210]:
import nltk
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger')
from nltk.tokenize import sent_tokenize, word_tokenize
sent_text = nltk.sent_tokenize(paragraph) # this gives us a list of sentences
# now loop over each sentence and tokenize it separately
all_tagged = [nltk.pos_tag(nltk.word_tokenize(sent)) for sent in sent_text]

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [221]:
nltk.download('tagsets')
from nltk.data import load
tag_dict = load('help/tagsets/upenn_tagset.pickle')

[nltk_data] Downloading package tagsets to /root/nltk_data...
[nltk_data]   Unzipping help/tagsets.zip.


In [231]:
tag_dict.keys()

dict_keys(['PRP$', 'VBG', 'FW', 'VB', 'POS', "''", 'VBP', 'VBN', 'JJ', 'WP', 'VBZ', 'DT', 'RP', '$', 'NN', ')', '(', 'RBR', 'VBD', ',', '.', 'TO', 'LS', 'RB', ':', 'NNS', 'NNP', '``', 'WRB', 'CC', 'PDT', 'RBS', 'PRP', 'CD', 'EX', 'IN', 'WP$', 'MD', 'NNPS', '--', 'JJS', 'JJR', 'SYM', 'UH', 'WDT'])

In [301]:
def tag_pos(text):
  pos_dict = {}

  sent_text = nltk.sent_tokenize(text)
  tagged = [nltk.pos_tag(nltk.word_tokenize(sent)) for sent in sent_text]

  for sentence in tagged:
    for pos_pair in sentence:
      word, pos = pos_pair
      # the first two letters of POS
      pos = pos[:2]
      if pos in pos_dict:
        pos_dict[pos].append(word)
      else:
        pos_dict[pos] = [word]

  return pos_dict

In [272]:
malestrom_tag_dict = tag_pos(paragraph)

In [273]:
print(malestrom_tag_dict)

{'NNS': ['ways', 'ways', 'models', 'minutes', 'sons', 'years', 'hours', 'limbs', 'nerves', 'feet', 'crags', 'yards', 'foundations', 'winds', 'fancies', 'waters', 'ramparts', 'lines', 'miles', 'miles', 'barren', 'intervals', 'rocks', 'rocks', 'norwegians', 'flimen', 'names', 'places', 'minutes', 'buffaloes', 'minutes', 'waters', 'channels', 'vortices', 'descents', 'minutes', 'whirlpools', 'streaks', 'streaks', 'vortices', 'degrees', 'winds', 'lifts', 'norwegians', 'accounts', 'jonas', 'confounds', 'details', 'fathoms', 'rocks', 'cataracts', 'vortices', 'pits', 'pieces', 'fragments', 'intervals', 'boats', 'yachts', 'ships', 'howlings', 'bellowings', 'struggles', 'stocks', 'trees', 'bristles', 'rocks', 'hours', 'stones', 'houses', 'fathoms', 'portions', 'records', 'anecdotes', 'bears', 'attempts', 'vortices', 'islands', 'waves', 'shelves', 'confines', 'experiments', 'norwegians', 'brothers', 'tons', 'islands', 'eddies', 'opportunities', 'coastmen', 'islands', 'grounds', 'hours', 'places',

In [275]:
print(malestrom_tag_dict['VB'])

['raise', 'timid', 'morrow', 'watch', 'deck', 'elder', 'shake', 'slack', 'keel', 'hold', 'slow', 'channel']


In [307]:
for key in malestrom_tag_dict.keys():
  if key not in tag_dict.keys():
    print(key)

In [311]:
penn_keys = tag_dict.keys()

def penn_tag_pos(text):
    # Create a dictionary with a NEW empty list for each key
    pos_dict = {key: [] for key in penn_keys}

    sent_text = nltk.sent_tokenize(text)
    tagged = [nltk.pos_tag(nltk.word_tokenize(sent)) for sent in sent_text]

    for sentence in tagged:
        for pos_pair in sentence:
            word, pos = pos_pair

            if pos in pos_dict:
                pos_dict[pos].append(word)

    return pos_dict

In [312]:
malestrom_tag_dict_penn = penn_tag_pos(paragraph)
print(malestrom_tag_dict_penn)

{'PRP$': [], 'VBG': ['everlasting', 'rushing', 'crashing', 'everlasting', 'writhing', 'rustling', 'falling', 'longing', 'rustling', 'lightning'], 'FW': [], 'VB': ['wind', 'tall'], 'POS': [], "''": [], 'VBP': ['slumber', 'crags', 'river', 'river', 'palpitate', 'oozy', 'sigh', 'rock', 'strange', 'lie', 'roll', 'river', 'stone', 'shore', 'stone', 'discover', 'face', 'sorrow', 'murmur', 'wind', 'sunk', 'rock', 'rock', 'magi', 'magi', 'heaven', 'holy', 'hold'], 'VBN': ['agitated', 'fallen', 'leaned', 'called', 'tormented', 'crumbled', 'heaven', 'ceased', 'changed'], 'JJ': ['silent', 'listen', 'demon', 'dreary', 'quiet', 'saffron', 'sickly', 'red', 'tumultuous', 'convulsive', 'bed', 'pale', 'gigantic', 'solitude', 'nod', 'indistinct', 'murmur', 'subterrene', 'sigh', 'boundary', 'boundary', 'horrible', 'lofty', 'low', 'heaven', 'tall', 'sound', 'high', 'poisonous', 'slumber', 'overhead', 'loud', 'cataract', 'heaven', 'quiet', 'mist', 'huge', 'light', 'tall', 'gray', 'walked', 'read', 'turned'

In [313]:
from pyspark.sql import Row

In [315]:
all_results = []

for file_name in text_files:
  select_df = txt_df.filter(col("file_name") == file_name)
  paragraph = select_df.collect()[0]["text"]
  pos_dict = penn_tag_pos(paragraph)


  row_dict = {
      "file_name": file_name,
      "paragraph": paragraph,
  }

  for pos, words in pos_dict.items():
    prefix =pos[:2]
    if prefix in row_dict:
      row_dict[prefix].extend(words)
    else:
      row_dict[prefix] = list(words)

  all_results.append(Row(**row_dict))

pos_df = spark.createDataFrame(all_results)

PySparkValueError: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.

In [317]:
all_results = []

# Get all unique prefixes from penn_keys
all_prefixes = set(key[:2] for key in penn_keys)

for file_name in text_files:
    select_df = txt_df.filter(col("file_name") == file_name)
    paragraph = select_df.collect()[0]["text"]
    pos_dict = penn_tag_pos(paragraph)

    # Initialize row dictionary with file info
    row_dict = {
        "file_name": file_name,
        "paragraph": paragraph
    }

    # Initialize all prefix keys with empty lists
    for prefix in all_prefixes:
        row_dict[prefix] = []

    # Now populate with actual words from this file
    for pos, words in pos_dict.items():
        prefix = pos[:2]
        row_dict[prefix].extend(words)

    all_results.append(Row(**row_dict))

pos_df = spark.createDataFrame(all_results)

PySparkValueError: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.

In [319]:
# Track prefixes by file
file_to_prefixes = {}

for file_name in text_files:
    select_df = txt_df.filter(col("file_name") == file_name)
    paragraph = select_df.collect()[0]["text"]
    pos_dict = penn_tag_pos(paragraph)

    # Collect unique prefixes from this file
    file_prefixes = {pos[:2] for pos in pos_dict.keys()}
    file_to_prefixes[file_name] = file_prefixes

# Analyze differences
all_prefixes = set()
for prefixes in file_to_prefixes.values():
    all_prefixes.update(prefixes)

print(f"Total unique prefixes across all files: {len(all_prefixes)}")
print(f"All prefixes: {sorted(all_prefixes)}")

# Check which files are missing which prefixes
print("\nPrefix distribution across files:")
for file_name, prefixes in file_to_prefixes.items():
    missing = all_prefixes - prefixes
    if missing:
        print(f"File {file_name} is missing {len(missing)} prefixes: {sorted(missing)}")
    else:
        print(f"File {file_name} has all prefixes")

Total unique prefixes across all files: 31
All prefixes: ['$', "''", '(', ')', ',', '--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'LS', 'MD', 'NN', 'PD', 'PO', 'PR', 'RB', 'RP', 'SY', 'TO', 'UH', 'VB', 'WD', 'WP', 'WR', '``']

Prefix distribution across files:
File THE_ISLAND_OF_THE_FAY has all prefixes
File MESMERIC_REVELATION has all prefixes
File THE_FACTS_IN_THE_CASE_OF_M._VALDEMAR has all prefixes
File THE_BLACK_CAT has all prefixes
File THE_MASQUE_OF_THE_RED_DEATH has all prefixes
File THE_PIT_AND_THE_PENDULUM has all prefixes
File THE_IMP_OF_THE_PERVERSE has all prefixes
File THE_FALL_OF_THE_HOUSE_OF_USHER has all prefixes
File WILLIAM_WILSON has all prefixes
File ELEONORA has all prefixes
File THE_THOUSAND-AND-SECOND_TALE_OF_SCHEHERAZADE has all prefixes
File BERENICE has all prefixes
File THE_CASK_OF_AMONTILLADO has all prefixes
File THE_PURLOINED_LETTER has all prefixes
File THE_PREMATURE_BURIAL has all prefixes
File THE_DOMAIN_OF_ARNHEIM has all prefixes
File A_D

In [321]:
all_results = []

# Define which prefixes to keep (traditional POS tags only)
valid_prefixes = {'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'LS', 'MD', 'NN', 'PD', 'PO', 'PR', 'RB', 'RP', 'TO', 'UH', 'VB', 'WD', 'WP', 'WR'}

for file_name in text_files:
    select_df = txt_df.filter(col("file_name") == file_name)
    paragraph = select_df.collect()[0]["text"]
    pos_dict = penn_tag_pos(paragraph)

    row_dict = {
        "file_name": file_name,
        "paragraph": paragraph
    }

    # Initialize all valid prefix keys with empty lists
    for prefix in valid_prefixes:
        row_dict[prefix] = []

    # Fill in the words, skipping non-standard prefixes
    for pos, words in pos_dict.items():
        prefix = pos[:2]
        if prefix in valid_prefixes:
            row_dict[prefix].extend(words)

    all_results.append(Row(**row_dict))

# Create DataFrame
pos_df = spark.createDataFrame(all_results)

PySparkValueError: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.

In [323]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Define valid prefixes (traditional POS tags only)
valid_prefixes = {'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'LS', 'MD', 'NN', 'PD', 'PO', 'PR', 'RB', 'RP', 'TO', 'UH', 'VB', 'WD', 'WP', 'WR'}

# Create schema
schema_fields = [
    StructField("file_name", StringType(), True),
    StructField("paragraph", StringType(), True)
]

for prefix in valid_prefixes:
    schema_fields.append(StructField(prefix, ArrayType(StringType()), True))

schema = StructType(schema_fields)

# Process files
all_results = []
for file_name in text_files:
    select_df = txt_df.filter(col("file_name") == file_name)
    paragraph = select_df.collect()[0]["text"]
    pos_dict = penn_tag_pos(paragraph)

    row_dict = {
        "file_name": file_name,
        "paragraph": paragraph
    }

    # Initialize all valid prefix keys with empty lists
    for prefix in valid_prefixes:
        row_dict[prefix] = []

    # Fill in the words, skipping non-standard prefixes
    for pos, words in pos_dict.items():
        prefix = pos[:2]
        if prefix in valid_prefixes:
            # Ensure all elements are strings
            string_words = [str(word) for word in words]
            row_dict[prefix].extend(string_words)

    all_results.append(row_dict)  # Use dict instead of Row with explicit schema

# Create DataFrame with explicit schema
pos_df = spark.createDataFrame(all_results, schema=schema)

In [325]:
pos_df.show(1,truncate=False)

+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [326]:
valid_prefixes = {'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'LS', 'MD', 'NN', 'PD', 'PO', 'PR', 'RB', 'RP', 'TO', 'UH', 'VB', 'WD', 'WP', 'WR'}


# 1. Calculate the total number of words in each story for each POS type
pos_counts_df = pos_df.select("file_name", *[size(col(prefix)).alias(f"{prefix}_count") for prefix in valid_prefixes])

# 2. Calculate the total words in each story
from pyspark.sql.functions import expr
total_words_expr = " + ".join([f"{prefix}_count" for prefix in valid_prefixes])
pos_counts_df = pos_counts_df.withColumn("total_words", expr(total_words_expr))

# 3. Calculate proportions (per 1000 words)
for prefix in valid_prefixes:
    pos_counts_df = pos_counts_df.withColumn(
        f"{prefix}_per_1000",
        (col(f"{prefix}_count") * 1000 / col("total_words"))
    )

# 4. Select just the file names and the proportions
result_df = pos_counts_df.select(
    "file_name",
    "total_words",
    *[f"{prefix}_per_1000" for prefix in valid_prefixes]
)

# 5. Show the results
result_df.show()

# 6. Calculate statistics to test the conjecture
from pyspark.sql.functions import avg, stddev, min, max

summary_stats = result_df.select(
    [avg(f"{prefix}_per_1000").alias(f"{prefix}_avg") for prefix in valid_prefixes] +
    [stddev(f"{prefix}_per_1000").alias(f"{prefix}_stddev") for prefix in valid_prefixes] +
    [min(f"{prefix}_per_1000").alias(f"{prefix}_min") for prefix in valid_prefixes] +
    [max(f"{prefix}_per_1000").alias(f"{prefix}_max") for prefix in valid_prefixes]
)

summary_stats.show()

# CV values < 0.1 indicate high consistency, > 0.2 indicate substantial variation
cv_stats = summary_stats.select(
    *[
        (col(f"{prefix}_stddev") / col(f"{prefix}_avg")).alias(f"{prefix}_cv")
        for prefix in valid_prefixes
    ]
)

cv_stats.show()

+--------------------+-----------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+-------------------+------------------+-------------------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+-----------+-----------+-------------------+-------------------+------------------+
|           file_name|total_words|        PR_per_1000|        RP_per_1000|        WR_per_1000|        MD_per_1000|       NN_per_1000|      PD_per_1000|        DT_per_1000|       VB_per_1000|        CD_per_1000|       EX_per_1000|PO_per_1000|       JJ_per_1000|LS_per_1000|       IN_per_1000|TO_per_1000|        WP_per_1000|WD_per_1000|UH_per_1000|        FW_per_1000|        CC_per_1000|       RB_per_1000|
+--------------------+-----------+-------------------+-------------------+-------------------+-------------------+------------------+-----------------+-------------------+---------------