In [1]:
# ==============================================
# STEP 1: Install & Configure PySpark in Colab
# ==============================================
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("NYC_Taxi_Analysis").getOrCreate()

print("✅ Spark Session Created Successfully!")

# ==============================================
# STEP 2: Upload Your Parquet Files
# ==============================================
from google.colab import files
print("⬆️ Please upload both Parquet files...")
uploaded = files.upload()

# Assume you uploaded:
# 1) yellow_tripdata_2020-01.parquet
# 2) another_dataset.parquet (rename accordingly)

# ==============================================
# STEP 3: Load Datasets into DataFrames
# ==============================================
df1 = spark.read.parquet("/content/yellow_tripdata_2020-01.parquet")
# If you have second parquet:
# df2 = spark.read.parquet("/content/another_file.parquet")

print("📌 Schema of Dataset:")
df1.printSchema()
df1.show(5)

# ==============================================
# STEP 4: Query 1 - Add Revenue Column
# ==============================================
df_with_revenue = df1.withColumn("Revenue",
    col("fare_amount") + col("extra") + col("mta_tax") +
    col("improvement_surcharge") + col("tip_amount") +
    col("tolls_amount") + col("total_amount")
)
print("✅ Revenue Column Added:")
df_with_revenue.select("fare_amount","extra","Revenue").show(5)

# ==============================================
# STEP 5: Query 2 - Total Passengers by Area
# ==============================================
passenger_area = df1.groupBy("PULocationID") \
    .agg(sum("passenger_count").alias("Total_Passengers")) \
    .orderBy(col("Total_Passengers").desc())
print("🚖 Total Passengers in NYC by Area:")
passenger_area.show(10)

# ==============================================
# STEP 6: Query 3 - Average Fare & Total Earning by Vendor
# ==============================================
vendor_avg = df1.groupBy("VendorID").agg(
    avg("fare_amount").alias("Avg_Fare"),
    avg("total_amount").alias("Avg_Total_Earning")
)
print("💰 Average Fare & Total Earnings by Vendor:")
vendor_avg.show()

# ==============================================
# STEP 7: Query 4 - Moving Count of Payments by Payment Mode
# ==============================================
windowSpec = Window.partitionBy("payment_type") \
                   .orderBy("tpep_pickup_datetime") \
                   .rowsBetween(-5, 0)

moving_count = df1.withColumn("moving_count", count("payment_type").over(windowSpec)) \
                  .select("payment_type", "tpep_pickup_datetime", "moving_count")
print("💳 Moving Count of Payments by Payment Mode:")
moving_count.show(10)

# ==============================================
# STEP 8: Query 5 - Highest 2 Gaining Vendors by Date
# ==============================================
top_vendors = df1.groupBy("VendorID", to_date("tpep_pickup_datetime").alias("date")) \
   .agg(sum("passenger_count").alias("passengers"), sum("trip_distance").alias("distance")) \
   .orderBy(col("distance").desc())
print("🏆 Top 2 Vendors by Distance & Passengers:")
top_vendors.show(2)

# ==============================================
# STEP 9: Query 6 - Route with Most Passengers
# ==============================================
top_route = df1.groupBy("PULocationID", "DOLocationID") \
   .agg(sum("passenger_count").alias("Total_Passengers")) \
   .orderBy(col("Total_Passengers").desc())
print("🚏 Route with Most Passengers:")
top_route.show(1)

# ==============================================
# STEP 10: Query 7 - Top Pickup Locations (Last 5/10 Seconds Filter)
# ==============================================
# Simulating last 10 seconds of dataset (example filter)
recent_pickups = df1.filter(
    (col("tpep_pickup_datetime") >= '2020-01-01 00:00:05') &
    (col("tpep_pickup_datetime") <= '2020-01-01 00:00:15')
).groupBy("PULocationID") \
 .agg(sum("passenger_count").alias("Total_Passengers")) \
 .orderBy(col("Total_Passengers").desc())

print("🕒 Top Pickup Locations in Last 10 Seconds:")
recent_pickups.show(5)

# ==============================================
# STEP 11: (Optional) Save Processed Data
# ==============================================
df_with_revenue.write.mode("overwrite").parquet("/content/nyc_taxi_processed.parquet")
print("✅ Processed data saved as parquet.")



✅ Spark Session Created Successfully!
⬆️ Please upload both Parquet files...


Saving yellow_tripdata_2018-01.parquet to yellow_tripdata_2018-01.parquet
Saving yellow_tripdata_2020-01.parquet to yellow_tripdata_2020-01.parquet
📌 Schema of Dataset:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- 