In [1]:
import os
import pyspark
# Point to your Java installation
os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@17"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [2]:
# 1️⃣ Setup PySpark
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("NYC Yellow Taxi 2025 Analysis") \
    .getOrCreate()

# Optional: improve performance for local testing
spark.conf.set("spark.sql.shuffle.partitions", "4")

print("✅ Spark session created")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/14 18:40:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Spark session created


In [3]:
# 2️⃣ Load Parquet files
import os

data_dir = "yellow_taxi_2025"  # folder where files are downloaded
parquet_files = [os.path.join(data_dir, f) 
                 for f in 
                 os.listdir(data_dir) 
                 if f.endswith(".parquet")]

print(f"Found {len(parquet_files)} files")

# Load all files into a single DataFrame
df = spark.read.parquet(*parquet_files)
print("✅ Data loaded into DataFrame")

# Save the full dataset as a Parquet file
# df.write.mode("overwrite").parquet("yellow_taxi_2025_full.parquet")

# Show schema
df.printSchema()

# Preview data
df.show(5)

Found 9 files


                                                                                

✅ Data loaded into DataFrame
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

+--------+--------------------+---------------

                                                                                

In [4]:
print(df.rdd.getNumPartitions())


18


In [5]:
# Show first 5 rows
df.show(5)

# Count number of rows
print(df.count())

# Print schema
df.printSchema()

# Summary statistics for numerical columns
df.describe(["trip_distance", "fare_amount", "total_amount"]).show()

# Number of trips by passenger count
df.groupBy("passenger_count").count().show()

# Average trip distance by payment type
df.groupBy("payment_type").avg("trip_distance").show()

# Filter trips longer than 10 miles
df.filter(df.trip_distance > 10).show(5)

# Top 5 busiest pickup locations
df.groupBy("PULocationID").count().orderBy("count", ascending=False).show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-02-01 00:12:18|  2025-02-01 00:32:33|              3|         3.12|         1|                 N|         246|    

                                                                                

+-------+------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|      total_amount|
+-------+------------------+------------------+------------------+
|  count|          35807453|          35807453|          35807453|
|   mean|6.8855818443719885|18.134971358337232| 26.57483308824564|
| stddev| 642.1435149582192|166.20058046739456|166.74687099641136|
|    min|               0.0|           -1807.6|          -1832.85|
|    max|         397994.37|         863372.12|         863380.37|
+-------+------------------+------------------+------------------+



                                                                                

+---------------+--------+
|passenger_count|   count|
+---------------+--------+
|              2| 3792436|
|              4|  601012|
|              5|  139877|
|              8|      66|
|              1|21695759|
|              0|  198983|
|              6|   80843|
|              9|      29|
|              3|  887641|
|              7|      22|
|           NULL| 8410785|
+---------------+--------+



                                                                                

+------------+------------------+
|payment_type|avg(trip_distance)|
+------------+------------------+
|           2| 3.372383868237221|
|           4| 4.296311904207681|
|           1|3.5504206481593372|
|           3|2.8053411390934935|
|           5|3.3333333333333335|
|           0|17.767744409111287|
+------------+------------------+

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+----------



+------------+-------+
|PULocationID|  count|
+------------+-------+
|         161|1552149|
|         132|1539053|
|         237|1529211|
|         236|1347510|
|         186|1141221|
+------------+-------+
only showing top 5 rows


                                                                                

In [13]:
from pyspark.sql.functions import col, when, isnan, count

# 1. Remove rows with critical nulls
critical_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", 
                    "trip_distance", "fare_amount", "total_amount"]

df_clean = df.dropna(subset=critical_columns)

# 2. Replace nulls in non-critical numeric columns with 0
numeric_columns = ["extra", "mta_tax", "tip_amount", "tolls_amount", 
                   "improvement_surcharge", "congestion_surcharge", "Airport_fee", "cbd_congestion_fee"]

for c in numeric_columns:
    df_clean = df_clean.withColumn(c, when(col(c).isNull(), 0).otherwise(col(c)))




In [6]:
from pyspark.sql.functions import col, avg, count, when, isnan

# 3️⃣ Data Cleaning
# Remove negative or unrealistic trip distances and fares
df_clean = df.filter(
    (col("trip_distance") > 0) & (col("trip_distance") < 100) & 
    (col("fare_amount") > 0) & (col("fare_amount") < 5000) &
    (col("total_amount") > 0) & (col("total_amount") < 5000)
)

# Handle missing passenger counts (replace NULL with -1 for clarity)
df_clean = df_clean.withColumn(
    "passenger_count",
    when(col("passenger_count").isNull(), -1).otherwise(col("passenger_count"))
)

# 4️⃣ Summary Statistics
# Trip distance, fare, total amount
df_clean.select(
    count("*").alias("total_trips"),
    avg("trip_distance").alias("avg_trip_distance"),
    avg("fare_amount").alias("avg_fare_amount"),
    avg("total_amount").alias("avg_total_amount")
).show()

# Passenger count distribution
df_clean.groupBy("passenger_count").count().orderBy("passenger_count").show()

# Average trip distance by payment type
df_clean.groupBy("payment_type").agg(avg("trip_distance").alias("avg_trip_distance")).show()

# Top 10 pickup and dropoff locations by number of trips
df_clean.groupBy("PULocationID").count().orderBy(col("count").desc()).show(10)
df_clean.groupBy("DOLocationID").count().orderBy(col("count").desc()).show(10)

# 5️⃣ Optional: Save cleaned dataset for faster future analysis
# df_clean.write.mode("overwrite").parquet("yellow_taxi_2025_cleaned.parquet")

# Stop Spark session
# spark.stop()


                                                                                

+-----------+------------------+-----------------+-----------------+
|total_trips| avg_trip_distance|  avg_fare_amount| avg_total_amount|
+-----------+------------------+-----------------+-----------------+
|   32879536|3.4640885841567424|19.70039076920132|28.47785038207891|
+-----------+------------------+-----------------+-----------------+



                                                                                

+---------------+--------+
|passenger_count|   count|
+---------------+--------+
|             -1| 6454404|
|              0|  191194|
|              1|20971024|
|              2| 3644396|
|              3|  848049|
|              4|  551794|
|              5|  138446|
|              6|   80159|
|              7|      14|
|              8|      39|
|              9|      17|
+---------------+--------+



                                                                                

+------------+------------------+
|payment_type| avg_trip_distance|
+------------+------------------+
|           2| 3.339318100928549|
|           4| 4.135155460454747|
|           1|3.3880381368452057|
|           3|  3.33259255470661|
|           0|3.7524660712276434|
|           5|              10.0|
+------------+------------------+



                                                                                

+------------+-------+
|PULocationID|  count|
+------------+-------+
|         161|1462952|
|         237|1462472|
|         132|1407018|
|         236|1280034|
|         186|1081774|
|         162|1054841|
|         230|1053317|
|         142| 937986|
|         138| 909703|
|         170| 895717|
+------------+-------+
only showing top 10 rows
+------------+-------+
|DOLocationID|  count|
+------------+-------+
|         236|1341350|
|         237|1318127|
|         161|1185519|
|         230| 985103|
|         170| 930851|
|         162| 883730|
|          68| 843210|
|         239| 834959|
|         142| 833184|
|         141| 798813|
+------------+-------+
only showing top 10 rows


                                                                                

In [15]:
# 3. Fix negative or invalid values
df_clean = df_clean.filter(
    (col("trip_distance") > 0) &
    (col("fare_amount") >= 0) &
    (col("total_amount") >= 0) &
    (col("passenger_count") > 0)
)

# 4. Optional: remove extreme outliers (e.g., trips > 100 miles)
df_clean = df_clean.filter(col("trip_distance") < 100)

# 5. Check the result
df_clean.describe(["trip_distance", "fare_amount", "total_amount"]).show(20, truncate=False)

# 6. Count remaining nulls
df_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns]).show()

                                                                                

+-------+------------------+------------------+------------------+
|summary|trip_distance     |fare_amount       |total_amount      |
+-------+------------------+------------------+------------------+
|count  |26239606          |26239606          |26239606          |
|mean   |3.3988158553141297|19.4794916074598  |29.241312937403386|
|stddev |4.5090285653434385|193.49874265216437|193.99932050537655|
|min    |0.01              |0.0               |0.0               |
|max    |99.82             |863372.12         |863380.37         |
+-------+------------------+------------------+------------------+



25/11/14 18:48:30 ERROR Executor: Exception in task 1.0 in stage 48.0 (TID 303)
scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache

Py4JJavaError: An error occurred while calling o1349.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 48.0 failed 1 times, most recent failure: Lost task 4.0 in stage 48.0 (TID 306) (macbookpro.lan executor driver): scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
Caused by: scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [12]:
from pyspark.sql.functions import col, sum as sum_

df.select([sum_(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



[Stage 44:>                                                       (0 + 16) / 18]



25/11/14 18:44:14 ERROR Executor: Exception in task 8.0 in stage 44.0 (TID 274)
scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache

Py4JJavaError: An error occurred while calling o1037.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 44.0 failed 1 times, most recent failure: Lost task 7.0 in stage 44.0 (TID 273) (macbookpro.lan executor driver): scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
Caused by: scala.MatchError: java.lang.OutOfMemoryError: Java heap space (of class java.lang.OutOfMemoryError)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:127)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [8]:
df.filter(col("passenger_count").isNull()).show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-05-01 00:05:05|  2025-05-01 00:34:27|           NULL|         8.12|      NULL|              NULL|          90|    

                                                                                

In [6]:
# Repartition into 18 partitions
df_repart = df.repartition(18)

# Save as Parquet
df_repart.write.mode("overwrite").parquet("yellow_taxi_2025_full.parquet")

print("Dataset saved successfully with 18 partitions.")


25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/11/14 18:35:46 WARN TaskMemoryManager

ConnectionRefusedError: [Errno 61] Connection refused