# Spark Example
## 1. Libraries

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("pyspark-jupyter").getOrCreate()

In [3]:
# Define the path to the data file within the "sparkdata" volume
data_path = "../data/taxi_zone_lookup.csv"

# Load the data into a PySpark DataFrame
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [5]:
# Show the first few rows of the DataFrame
df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


In [6]:
df.groupby("Borough").count().orderBy("count", ascending=False).show()

+-------------+-----+
|      Borough|count|
+-------------+-----+
|       Queens|   69|
|    Manhattan|   69|
|     Brooklyn|   61|
|        Bronx|   43|
|Staten Island|   20|
|      Unknown|    2|
|          EWR|    1|
+-------------+-----+



In [8]:
df.groupby("Borough").count().orderBy("count", ascending=False).write.csv("taxi_results_2.csv")

# Example Rita

In [14]:
data_path = "../data/rita/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2024_1.csv"

# Load the data into a PySpark DataFrame
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [15]:
df.count()

7079061

In [16]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: date (nullable = true)
 |-- Reporting_Airline: string (nullable = true)
 |-- DOT_ID_Reporting_Airline: integer (nullable = true)
 |-- IATA_CODE_Reporting_Airline: string (nullable = true)
 |-- Tail_Number: string (nullable = true)
 |-- Flight_Number_Reporting_Airline: integer (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- OriginAirportSeqID: integer (nullable = true)
 |-- OriginCityMarketID: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- OriginState: string (nullable = true)
 |-- OriginStateFips: integer (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- OriginWac: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DestAirportSeqID: 

In [17]:
df.select(
    "FlightDate",
    "OriginAirportID",
    "DestAirportID",
    "OriginCityName",
    "DestCityName",
    "Distance"
).dropDuplicates().orderBy("FlightDate").show(10)

+----------+---------------+-------------+---------------+---------------+--------+
|FlightDate|OriginAirportID|DestAirportID| OriginCityName|   DestCityName|Distance|
+----------+---------------+-------------+---------------+---------------+--------+
|2024-01-01|          14635|        10721| Fort Myers, FL|     Boston, MA|  1249.0|
|2024-01-01|          13487|        10299|Minneapolis, MN|  Anchorage, AK|  2519.0|
|2024-01-01|          13487|        13495|Minneapolis, MN|New Orleans, LA|  1039.0|
|2024-01-01|          11977|        13487|  Green Bay, WI|Minneapolis, MN|   252.0|
|2024-01-01|          13495|        11292|New Orleans, LA|     Denver, CO|  1062.0|
|2024-01-01|          11503|        10397|      Eagle, CO|    Atlanta, GA|  1312.0|
|2024-01-01|          10713|        13487|      Boise, ID|Minneapolis, MN|  1142.0|
|2024-01-01|          11057|        13487|  Charlotte, NC|Minneapolis, MN|   930.0|
|2024-01-01|          11292|        14747|     Denver, CO|    Seattle, WA|  

In [None]:
df.select(
    "FlightDate",
    "OriginAirportID",
    "DestAirportID",
    "OriginCityName",
    "DestCityName",
    "Distance"
).dropDuplicates().orderBy("FlightDate").write.csv("rita_results.csv")