In [15]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("NYC_Taxi_Analysis")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

In [14]:
spark.stop()

##  ***2022 Schema***



In [3]:
#df_2022 = spark.read.parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips/2022")

In [4]:
#print(f"number of records of 2023 data is : {df_2022.count()}")

In [5]:
#df_2022.printSchema()

##  ***2023 Schema***



In [6]:
#df_2023 = spark.read.parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips/2023")

In [7]:
#print(f"number of records of 2023 data is : {df_2023.count()}")


In [8]:
#df_2023.printSchema()


## ***2024 Schema***

In [9]:
#df_2024 = spark.read.parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips/2024")

In [10]:
#print(f"Number of records of 2024 data is : {df_2024.count()}")

In [11]:
#df_2024.printSchema()


##  ***2025 Schema***

In [12]:
#df_2025 = spark.read.parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips/2025")

In [13]:
#print(f"Number of records of 2025 data is : {df_2025.count()}")

In [14]:
#df_2025.printSchema()

In [15]:

#print(df_2022.schema==df_2023.schema == df_2024.schema == df_2025.schema)  

#the result is False , so there is a schema mismatch , and to resolve this we should use mergeschema in spark 

In [16]:
#df = spark.read.option("mergeSchema", "true").parquet(
#    "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2023",
#    "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2024",
#    "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2025"
#)

#the mergeschema in spark can not work , because it can not merge data types "Failed to merge incompatible data types "BIGINT" and "INT" 

# **Standard Schema**

### ⚠️ Observed Schema Mismatches

- **Data type inconsistencies across years**
  - `VendorID`: `long` (2022-2023) vs `integer` (2024–2025)
  - `passenger_count`: `double` (2022-2023) vs `long` (2024–2025)
  - `RatecodeID`: `double` (2022-2023) vs `long` (2024–2025)
  - `PULocationID`, `DOLocationID`: `long` (2022-2023) vs `integer` (2024–2025)

- **Column name variation**
  - `airport_fee` (2022-2023) vs `Airport_fee` (2024–2025)

- **Schema evolution**
  - `cbd_congestion_fee` present only in 2025


In [3]:
#test for minio 
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)


In [1]:
spark.stop()

NameError: name 'spark' is not defined

In [4]:
#Create a Standard Schema For all Years
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, StringType, TimestampType ,TimestampNTZType

standard_schema = StructType([
    StructField('VendorID', IntegerType(), True),
    StructField('tpep_pickup_datetime', TimestampNTZType(), True),
    StructField('tpep_dropoff_datetime', TimestampNTZType(), True),
    StructField('passenger_count', LongType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', IntegerType(), True),
    StructField('DOLocationID', IntegerType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('Airport_fee', DoubleType(), True), 
    StructField('cbd_congestion_fee', DoubleType(), True),
])


## Enforce Function

In [5]:
#Create an enforce function for compitability
from pyspark.sql.functions import lit, col

def enforce_schema(df, schema):
    # Rename existing lower-case column if needed
    if "airport_fee" in df.columns:
        df = df.withColumnRenamed("airport_fee", "Airport_fee")
    
    # Add missing columns and cast
    for field in schema.fields:
        if field.name not in df.columns:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))
        else:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))
    
    # Reorder columns
    df = df.select([field.name for field in schema.fields])
    return df


### Read 2022 Parquet Files and Union them

In [6]:
#read each parquet file of specified year individuall and then union all parqyet files of same year
base_path = "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2022"

dfs_2022_std = {}

for m in range(1, 13):
    month = f"{m:02d}"
    path = f"{base_path}/yellow_tripdata_2022-{month}.parquet"
    
    df = spark.read.parquet(path)

    df_std = enforce_schema(df, standard_schema)
    
    dfs_2022_std[month] = df_std


In [7]:
from functools import reduce
from pyspark.sql import DataFrame

# List of all 12 DataFrames
dfs_list = [dfs_2022_std[month] for month in sorted(dfs_2022_std.keys())]

# Union all months into one DataFrame
df_2022_std = reduce(lambda df1, df2: df1.unionByName(df2), dfs_list)


In [6]:
#df_2022_std.count()

39656098

### Read 2023 Parquet Files and Union them

In [8]:
#read each parquet file of specified year individuall and then union all parqyet
base_path = "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2023"

dfs_2023_std = {}

for m in range(1, 13):
    month = f"{m:02d}"
    path = f"{base_path}/yellow_tripdata_2023-{month}.parquet"
    
    df = spark.read.parquet(path)
    df_std = enforce_schema(df, standard_schema)
    
    dfs_2023_std[month] = df_std


In [9]:
from functools import reduce
from pyspark.sql import DataFrame

# List of all 12 DataFrames
dfs_list = [dfs_2023_std[month] for month in sorted(dfs_2023_std.keys())]

# Union all months into one DataFrame
df_2023_std = reduce(lambda df1, df2: df1.unionByName(df2), dfs_list)


In [24]:
#df_2023_std.count()

### Read 2024 Parquet Files and Union them

In [10]:
#read each parquet file of specified year individuall and then union all parqyet
base_path = "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2024"

dfs_2024_std = {}

for m in range(1, 13):
    month = f"{m:02d}"
    path = f"{base_path}/yellow_tripdata_2024-{month}.parquet"
    
    df = spark.read.parquet(path)
    df_std = enforce_schema(df, standard_schema)
    
    dfs_2024_std[month] = df_std


In [11]:
from functools import reduce
from pyspark.sql import DataFrame

# List of all 12 DataFrames
dfs_list = [dfs_2024_std[month] for month in sorted(dfs_2024_std.keys())]

# Union all months into one DataFrame
df_2024_std = reduce(lambda df1, df2: df1.unionByName(df2), dfs_list)


In [27]:
#df_2024_std.count()

### Read 2025 Parquet Files and Union them

In [12]:
#read each parquet file of specified year individuall and then union all parqyet
base_path = "/opt/spark/resources/NYC_Yellow_Taxi_Trips/2025"

dfs_2025_std = {}

for m in range(1, 11):
    month = f"{m:02d}"
    path = f"{base_path}/yellow_tripdata_2025-{month}.parquet"
    
    df = spark.read.parquet(path)
    df_std = enforce_schema(df, standard_schema)
    
    dfs_2025_std[month] = df_std


In [13]:
from functools import reduce
from pyspark.sql import DataFrame

# List of all 12 DataFrames
dfs_list = [dfs_2025_std[month] for month in sorted(dfs_2025_std.keys())]

# Union all months into one DataFrame
df_2025_std = reduce(lambda df1, df2: df1.unionByName(df2), dfs_list)


In [30]:
#df_2025_std.count()


# **Union All df Years**

In [14]:
#Check schema match
print(df_2022_std.schema==df_2023_std.schema==df_2024_std.schema==df_2025_std.schema)




True


In [15]:
df = (
    
     df_2024_std
    .unionByName(df_2025_std)
    .unionByName(df_2023_std)
    .unionByName(df_2022_std)
    
)


In [16]:
df.write.mode("overwrite") \
  .parquet("s3a://staging-output/taxi/")


Py4JJavaError: An error occurred while calling o10552.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:454)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:530)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 25 more


In [33]:
#print(f"number of records of full df is : {df.count()}")

In [34]:
#df.printSchema()

In [35]:
#df.show(10)

# **Data Cleaning**

### **Rename Columns**

In [17]:
from pyspark.sql import functions as f

df_col_renamed = (
    df
    .withColumnRenamed("VendorID", "Vendor_ID")
    .withColumnRenamed("RatecodeID", "Ratecode_ID")
    .withColumnRenamed("PULocationID", "Pickup_Location_ID")
    .withColumnRenamed("DOLocationID", "Dropoff_Location_ID")
    .withColumnRenamed("extra", "extra_charges")
    .withColumnRenamed("tpep_pickup_datetime","Trip_Pickup_DateTime")
    .withColumnRenamed("tpep_dropoff_datetime","Trip_Dropoff_DateTime")
    .withColumn(
        'trip_duration_min',
        f.round(
            (f.unix_timestamp('Trip_Dropoff_DateTime') - f.unix_timestamp('Trip_Pickup_DateTime')) / 60,
            2
        )
    )
)


In [37]:
#df_col_renamed.printSchema()

### **Dealing With Nulls**

#### Handling NULL Values

The dataset is cleaned by filling NULL values in specific columns with appropriate defaults:

1. **Vendor_ID:** Replace NULLs with `99`.

2. **Ratecode_ID:** Replace NULLs with `99`.

3. **store_and_fwd_flag:** Replace NULLs with `'Unknown'`.

4. **fare_amount:** Replace NULLs with `0.0`.

5. **extra_charges:** Replace NULLs with `0.0`.

6. **mta_tax:** Replace NULLs with `0.0`.

7. **tip_amount:** Replace NULLs with the **mean value of the column** (`mean_tip`).

8. **cbd_congestion_fee:** Replace NULLs with `0.0`.


In [18]:
mean_tip = df_col_renamed.select(f.mean("tip_amount")).collect()[0][0]

df_no_nulls = df_col_renamed.fillna({
    'Vendor_ID': 99,
    'Ratecode_ID': 99,
    'store_and_fwd_flag':'Unknown',
    'fare_amount': 0.0,
    'extra_charges': 0.0,
    'mta_tax': 0.0,
    'tip_amount': mean_tip,
    'cbd_congestion_fee': 0.0
})


In [39]:
#df_no_nulls.count()

### **Filter bad Data**

#### Filter Conditions

The dataset is filtered based on the following conditions:

1. **Passenger count:** Keep trips where the number of passengers is between **1 and 6** (inclusive).

2. **Trip distance:** Keep trips where the distance is between **0 .1and 200**.

3. **Pickup and dropoff locations:** Exclude trips where the pickup and dropoff locations are the same.

4. **Trip duration:** Only keep trips with duration between **0.01 minutes and 120.1 minutes** (i.e., less than 2 hours).

5. **Non-negative fees and amounts:** Exclude rows where any of the following columns are negative:
   - `cbd_congestion_fee`
   - `Airport_fee`
   - `total_amount`
   - `tip_amount`
   - `fare_amount`
   - `extra_charges`
   - `improvement_surcharge`
   - `mta_tax`


In [19]:
df_filtered = df_no_nulls

In [41]:
#df_filtered.printSchema()

In [20]:
from pyspark.sql import functions as f

df_filtered = df_no_nulls.filter(
    (f.col("passenger_count").between(1, 6)) &
    (f.col("trip_distance").between(0.1,200)) &
    (f.col("Pickup_Location_ID") != f.col("Dropoff_Location_ID")) &
    (f.col("trip_duration_min").between(0.01,120.1))&
    (f.col("cbd_congestion_fee")>=0.0)&
    (f.col("Airport_fee")>=0.0)&
    (f.col("total_amount")>=0.0)&
    (f.col("tip_amount")>=0.0)&
    (f.col("fare_amount")>=0.0)&
    (f.col("extra_charges")>=0.0)&
    (f.col("improvement_surcharge")>=0.0)&
    (f.col("mta_tax")>=0.0)
    
    
)


In [43]:
#check applying of filter condition
#from pyspark.sql import functions as f

#df_filtered.where ( (f.col("passenger_count") == 0) | (f.col("trip_distance")>200) |(f.col("trip_duration_min") >120.1) ).show()

    

In [44]:
#df_filtered.printSchema()

### **Creating new columns**

#### - Columns From Existing Ones

#### Enriching Trip Data with Descriptive Columns

This code adds and transforms several columns in the trip DataFrame:  

- **Vendor_Name** – human-readable vendor names based on `Vendor_ID`.  
- **Ratecode_Description** – descriptive text for `Ratecode_ID`.  
- **Payment_Method** – descriptive text for `payment_type`.  
- **Trip_Distance_Km** – converts `trip_distance` from miles to kilometers.  
- **Year** – extracts the pickup year from `Trip_Pickup_DateTime`.  
- **Month** – extracts the pickup month (abbreviated) from `Trip_Pickup_DateTime`.  
- Renames `trip_distance` to `trip_distance_miles`.  

All mappings use conditional logic (`when/otherwise`) to handle known values and assign `"Unknown"` for others.


In [21]:
from pyspark.sql import functions as f

df_derived_col = (
    df_filtered
    .withColumn(
        "Vendor_Name",
        f.when(f.col("Vendor_ID") == 1, f.lit("Creative Mobile Technologies, LLC"))
         .when(f.col("Vendor_ID") == 2, f.lit("Curb Mobility, LLC"))
         .when(f.col("Vendor_ID") == 6, f.lit("Myle Technologies Inc"))
         .when(f.col("Vendor_ID") == 7, f.lit("Helix"))
         .otherwise(f.lit("Unknown"))
    )
    .withColumn(
        "Ratecode_Description",
        f.when(f.col("Ratecode_ID") == 1, f.lit("Standard rate"))
         .when(f.col("Ratecode_ID") == 2, f.lit("JFK"))
         .when(f.col("Ratecode_ID") == 3, f.lit("Newark"))
         .when(f.col("Ratecode_ID") == 4, f.lit("Nassau or Westchester"))
         .when(f.col("Ratecode_ID") == 5, f.lit("Negotiated fare"))
         .when(f.col("Ratecode_ID") == 6, f.lit("Group ride"))
         .otherwise(f.lit("Unknown"))
    )
    .withColumn(
        "Payment_Method",
        f.when(f.col("payment_type") == 0, f.lit("Flex Fare trip"))
         .when(f.col("payment_type") == 1, f.lit("Credit Card"))
         .when(f.col("payment_type") == 2, f.lit("Cash"))
         .when(f.col("payment_type") == 3, f.lit("No Charge"))
         .when(f.col("payment_type") == 4, f.lit("Dispute"))
         .when(f.col("payment_type") == 5, f.lit("Unknown"))
         .when(f.col("payment_type") == 6, f.lit("Voided trip"))
         .otherwise(f.lit("Unknown"))
    )
    .withColumn(
        "Trip_Distance_Km",
        f.round(f.col("trip_distance") * 1.609 ,2)
    )

    .withColumn(
        "Year", 
        f.year(f.col('Trip_Pickup_DateTime'))
    )

   .withColumn(
        "Month",
         f.date_format(f.col("Trip_Pickup_DateTime"), "MMM")
)
    
    
    .withColumnRenamed('trip_distance','trip_distance_miles')
)


In [46]:
#df_derived_col.show()

#### Columns from lookup_zones_table

#### Adding Zone Information

This  adds new columns to the DataFrame by left-joining with `lookup_zones_df` on `Pickup_Location_ID` and `Dropoff_Location_ID`.  

**Columns added:**  
- `Pickup_Borough`, `Pickup_Zone`, `Pickup_Service_Zone`  
- `Dropoff_Borough`, `Dropoff_Zone`, `Dropoff_Service_Zone`  


In [22]:
lookup_zones_df = (
    spark.read
         .option("header", "true")
         .option("inferschema","True")
         .csv("/opt/spark/resources/taxi_zone_lookup.csv")
)


In [48]:
#lookup_zones_df.show()

In [49]:
#lookup_zones_df.printSchema()

In [23]:
from pyspark.sql import functions as f

pickup_zone = lookup_zones_df.alias('pickup')
dropoff_zone = lookup_zones_df.alias('dropoff')

df_joined = df_derived_col \
    .join(
        pickup_zone,
        df_derived_col['Pickup_Location_ID'] == pickup_zone['LocationID'],
        how='left'
    ) \
    .join(
        dropoff_zone,
        df_derived_col['Dropoff_Location_ID'] == dropoff_zone['LocationID'],
        how='left'
    ) \
    .select(
        df_derived_col['*'],  # all original columns
        f.col('pickup.Borough').alias('Pickup_Borough'),
        f.col('pickup.Zone').alias('Pickup_Zone'),
        f.col('pickup.service_zone').alias('Pickup_Service_Zone'),
        f.col('dropoff.Borough').alias('Dropoff_Borough'),
        f.col('dropoff.Zone').alias('Dropoff_Zone'),
        f.col('dropoff.service_zone').alias('Dropoff_Service_Zone')
    )


In [51]:
#df_joined.count()
#df_joined.show()
#df_joined.printSchema()

# **Final df**


In [24]:
final_df = df_joined.select(
    'Vendor_ID',
    'Vendor_Name',
    'Trip_Pickup_DateTime',
    'Trip_Dropoff_DateTime',
    'passenger_count',
    'Pickup_Location_ID',
    'Pickup_Borough',
    'Pickup_Zone',
    'Pickup_Service_Zone',
    'Dropoff_Location_ID',
    'Dropoff_Borough',
    'Dropoff_Zone',
    'Dropoff_Service_Zone',
    'Trip_Distance_Km',
    'trip_distance_miles',
    'Ratecode_ID',
    'Ratecode_Description',
    'payment_type',
    'Payment_Method',
    'trip_duration_min',
    'fare_amount',
    'extra_charges',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'congestion_surcharge',
    'Airport_fee',
    'cbd_congestion_fee',
    'Year',
    'Month'
)


In [25]:
# Store the original count
original_count = df.count()

# Store the filtered count
final_count = final_df.count()

# Print results
print(f"Number of records of original df: {original_count}")
print(f"Number of records after Cleaning Data: {final_count}")
print(f"Number of filtered records: {original_count - final_count}")
print(f"Percentage of bad data from whole data: {(original_count - final_count) / original_count * 100:.2f}%")



Number of records of original df: 159372196
Number of records after Cleaning Data: 130902515
Number of filtered records: 28469681
Percentage of bad data from whole data: 17.86%


In [1]:
#final_df.show(200)

In [56]:
#final_df.write.mode("overwrite").parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips_Cleaned")


In [57]:
#df_cleaned=spark.read.parquet("/opt/spark/resources/NYC_Yellow_Taxi_Trips_Cleaned")

In [58]:
#df_cleaned.count()

In [2]:
from functools import reduce
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import (
    StructType, StructField, IntegerType, LongType, DoubleType,
    StringType, TimestampNTZType,
)
from pyspark.sql.functions import lit, col

standard_schema = StructType([
    StructField('VendorID', IntegerType(), True),
    StructField('tpep_pickup_datetime', TimestampNTZType(), True),
    StructField('tpep_dropoff_datetime', TimestampNTZType(), True),
    StructField('passenger_count', LongType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', IntegerType(), True),
    StructField('DOLocationID', IntegerType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('Airport_fee', DoubleType(), True),
    StructField('cbd_congestion_fee', DoubleType(), True),
])

def enforce_schema(df: DataFrame, schema: StructType) -> DataFrame:
    if "airport_fee" in df.columns:
        df = df.withColumnRenamed("airport_fee", "Airport_fee")
    for field in schema.fields:
        if field.name not in df.columns:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))
        else:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))
    return df.select([field.name for field in schema.fields])

def load_year_standardized(spark: SparkSession, year: int, max_month: int) -> DataFrame:
    base_path = f"/opt/spark/resources/NYC_Yellow_Taxi_Trips/{year}"
    dfs_std = {}
    for m in range(1, max_month + 1):
        month = f"{m:02d}"
        path = f"{base_path}/yellow_tripdata_{year}-{month}.parquet"
        df = spark.read.parquet(path)
        dfs_std[month] = enforce_schema(df, standard_schema)
    dfs_list = [dfs_std[month] for month in sorted(dfs_std.keys())]
    return reduce(lambda df1, df2: df1.unionByName(df2), dfs_list)

def main():
    spark = (
        SparkSession.builder
        .appName("NYC_Taxi_Union_All_Years")
        .master("spark://spark-master:7077")
        .getOrCreate()
    )

    try:
        print("Loading year 2022...")
        df_2022_std = load_year_standardized(spark, 2022, 12)
        count_2022 = df_2022_std.count()
        print(f"2022 loaded: {count_2022} records")
        
        print("Loading year 2023...")
        df_2023_std = load_year_standardized(spark, 2023, 12)
        count_2023 = df_2023_std.count()
        print(f"2023 loaded: {count_2023} records")
        
        print("Loading year 2024...")
        df_2024_std = load_year_standardized(spark, 2024, 12)
        count_2024 = df_2024_std.count()
        print(f"2024 loaded: {count_2024} records")
        
        print("Loading year 2025...")
        df_2025_std = load_year_standardized(spark, 2025, 10)
        count_2025 = df_2025_std.count()
        print(f"2025 loaded: {count_2025} records")

        # Schema check
        schema_match = (df_2022_std.schema == df_2023_std.schema == 
                       df_2024_std.schema == df_2025_std.schema)
        print(f"Schema match: {schema_match}")

        # Union all years
        print("Unioning all years...")
        df_all_years_staged = (
            df_2024_std
            .unionByName(df_2025_std)
            .unionByName(df_2023_std)
            .unionByName(df_2022_std)
        )

        # Write to staging (this is the action - no need for count after)
        print("Writing to staging location...")
        df_all_years_staged.write.mode("overwrite").parquet("/opt/spark/resources/Staging_output")
        
        print("SUCCESS: Staging write completed")
        print(f"Total records: {count_2022 + count_2023 + count_2024 + count_2025}")
        
    except Exception as e:
        print(f"ERROR: {str(e)}")
        import traceback
        traceback.print_exc()
        raise
    finally:
        spark.stop()

if __name__ == "__main__":
    main()

Loading year 2022...


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=42>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip

ERROR: An error occurred while calling o10802.parquet


Py4JError: An error occurred while calling o10802.parquet

In [5]:
spark.stop()