step 1: Install JAVA and PySpark


In [3]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark 3.4.1 (recommended stable version)
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Extract Spark
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Install PySpark
!pip install -q pyspark


In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark in Colab") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

spark


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Taxi Trip Data Analysis") \
    .getOrCreate()


Load Dataset


In [8]:
from google.colab import files
uploaded = files.upload()  # Upload a large CSV file manually

# Read uploaded file with Spark
df = spark.read.option("header", "true").csv("taxi_tripdata.csv", inferSchema=True)
df.show(5)




Saving taxi_tripdata.csv to taxi_tripdata (2).csv
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2021-07-01 00:30:52|  2021-07-01 00:35:36|                 N|         1|          74|         168|    

Step 2: Inspect and Understand the Dataset

In [9]:
df.printSchema()     # Check column names and types
df.show(5)           # Preview first 5 rows
df.count()           # Total number of rows


root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+------------------+----------+-

83691

Step 3: Clean the Dataset


In [10]:
from pyspark.sql.functions import col

# Drop rows with nulls in important columns
df_clean = df.dropna(subset=["passenger_count", "trip_distance", "fare_amount"])

# Remove entries with invalid values
df_clean = df_clean.filter(
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0)
)


Step 4: Derive Insights

Average Fare Per Passenger

In [11]:
from pyspark.sql.functions import avg

df_clean.select(
    (col("fare_amount") / col("passenger_count")).alias("fare_per_passenger")
).agg(avg("fare_per_passenger")).show()


+-----------------------+
|avg(fare_per_passenger)|
+-----------------------+
|     13.690673765502694|
+-----------------------+



Average Fare Per Passenger

In [12]:
from pyspark.sql.functions import count

df_clean.groupBy("passenger_count").agg(count("*").alias("trip_count")) \
    .orderBy("trip_count", ascending=False).show()


+---------------+----------+
|passenger_count|trip_count|
+---------------+----------+
|              1|     41295|
|              2|      3779|
|              5|      1215|
|              6|      1017|
|              3|       597|
|              4|       177|
|              7|         2|
|             32|         1|
+---------------+----------+



Longest and Shortest Trips

In [13]:
df_clean.orderBy(col("trip_distance").desc()).select("trip_distance", "fare_amount").show(1)
df_clean.orderBy(col("trip_distance").asc()).select("trip_distance", "fare_amount").show(1)


+-------------+-----------+
|trip_distance|fare_amount|
+-------------+-----------+
|       109.87|      359.5|
+-------------+-----------+
only showing top 1 row

+-------------+-----------+
|trip_distance|fare_amount|
+-------------+-----------+
|         0.01|        2.5|
+-------------+-----------+
only showing top 1 row

