**Big Data Analysis on NYC Taxi Trips Using PySpark**



This project explores large-scale analysis of NYC Yellow Taxi trip data using PySpark to demonstrate scalability and performance on real-world datasets. We performed data cleaning, aggregation, and exploratory analysis to uncover patterns in fare amounts, passenger trends, and trip distances. The project showcases how big data tools efficiently handle millions of records and derive actionable insights.


**Step 1 : Mount drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Step 2 : Environment Setup**

In [None]:
#import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, desc, when

In [None]:
#initialize spark
spark = SparkSession.builder.appName("Big Data Analysis - NYC Taxi Trips").getOrCreate()

**Step 3 : Load Dataset**

In [None]:
#file path
file_path = "/content/drive/MyDrive/Big Data/yellow_tripdata_2015-01.csv"

#load the CSV file
df = spark.read.csv(file_path, header=True, inferSchema=True)

#inspect the schema and a sample data
df.printSchema()
df.show()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------

**Step 4 : Data Cleaning**

In [None]:
#check total record count
print("Total record count: ",df.count())

#count non null values per column
df.select([count(col(c)).alias(c) for c in df.columns]).show()

#filter out invalid rows
df_cleaned = df.filter((col("fare_amount")>0) & (col("trip_distance")>0) & (col("passenger_count")>0))

#record count post cleaning
print("Record after cleaning: ",df_cleaned)

Total record count:  12748986
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+--------+--------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RateCodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|   extra| mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+--------+--------+----------+------------+---------------------+------------+
|12748986|            12748986|             12748986|       12748986|     12748986|        12748986|       12748986|  

**Step 5 : Exploratory Data Analysis**

**5.1 Top 5 most common passenger count**

In [None]:
from pickle import FALSE
df_cleaned.groupBy("passenger_count").count().orderBy("count", ascending = False).show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              1|8926120|
|              2|1805631|
|              5| 695437|
|              3| 526031|
|              6| 453123|
|              4| 252052|
|              0|   6176|
|              7|      7|
|              9|      6|
|              8|      3|
+---------------+-------+



**5.2  Average Fare by Trip Distance Bucket**

In [None]:
#create bucket column
df_buckets= df_cleaned.withColumn("distance_bucket",
 when(col("trip_distance")<2,"<2 KM")
.when((col("trip_distance")>=2) & (col("trip_distance")<5),"2-5 km")
.otherwise("> 5 KM"))

#group by bucket and calculate avg fare
df_buckets.groupBy("distance_bucket").agg(avg("fare_amount") \
.alias("avg_fare"))\
.orderBy("distance_bucket")\
.show()

+---------------+------------------+
|distance_bucket|          avg_fare|
+---------------+------------------+
|         2-5 km|13.088639505102561|
|          <2 KM|  6.93148657784056|
|         > 5 KM| 31.27973626017181|
+---------------+------------------+



5.3 Top 5 most expensive trips

In [None]:
df_cleaned.select("trip_distance","fare_amount","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount") \
.orderBy(col("total_amount").desc())\
.show(5)

+-------------+-----------+-------+----------+------------+---------------------+------------+
|trip_distance|fare_amount|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------------+-----------+-------+----------+------------+---------------------+------------+
|         5.32|       22.0|    0.5| 3950588.8|         0.0|                  0.3|   3950611.6|
|          1.7|     4008.0|    0.5|       0.0|         0.0|                  0.3|      4009.3|
|          0.4|     3005.5|    0.5|       0.0|         0.0|                  0.3|     3006.35|
|         18.5|       52.0|    0.5|       0.0|     1450.09|                  0.3|     1502.89|
|         10.8|       32.5|    0.5|       0.0|     1000.66|                  0.3|     1034.46|
+-------------+-----------+-------+----------+------------+---------------------+------------+
only showing top 5 rows

