In [1]:
!pip install pyspark  # Run only once per Colab session

from pyspark.sql import SparkSession

# Start Spark
spark = SparkSession.builder.appName("NYC_Taxi_Project").getOrCreate()




In [2]:
from google.colab import files
uploaded = files.upload()


Saving yellow_tripdata_2020-01.parquet to yellow_tripdata_2020-01.parquet
Saving yellow_tripdata_2018-01.parquet to yellow_tripdata_2018-01.parquet


In [4]:
df1 = spark.read.parquet("yellow_tripdata_2018-01.parquet")
df2 = spark.read.parquet("yellow_tripdata_2020-01.parquet")

# Make sure schemas match (same columns)
df = df1.union(df2)

df.printSchema()
df.show(10)



root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+----

In [5]:
from pyspark.sql.functions import year

df.withColumn("year", year("tpep_pickup_datetime")).select("year").distinct().show()


+----+
|year|
+----+
|2003|
|2018|
|2009|
|2001|
|2008|
|2017|
|2002|
|2019|
|2020|
|2021|
+----+



In [7]:
from pyspark.sql.functions import col

df = df.withColumn("Revenue",
    col("fare_amount") + col("extra") + col("mta_tax") +
    col("improvement_surcharge") + col("tip_amount") +
    col("tolls_amount") + col("total_amount")
)

df.select("Revenue").show(10)


+------------------+
|           Revenue|
+------------------+
|              11.6|
|              30.6|
|              16.6|
|              69.6|
|              33.1|
|              11.6|
|24.700000000000003|
|              12.6|
|              17.0|
|              13.6|
+------------------+
only showing top 10 rows



In [8]:
from pyspark.sql.functions import sum as _sum

df.groupBy("PULocationID") \
  .agg(_sum("passenger_count").alias("total_passengers")) \
  .orderBy("total_passengers", ascending=False) \
  .show(10)


+------------+----------------+
|PULocationID|total_passengers|
+------------+----------------+
|         237|       1006231.0|
|         161|        999902.0|
|         236|        954194.0|
|         230|        871699.0|
|         162|        841749.0|
|         186|        799081.0|
|         234|        740300.0|
|          48|        735460.0|
|         170|        730158.0|
|         142|        721599.0|
+------------+----------------+
only showing top 10 rows



In [9]:
from pyspark.sql.functions import avg

df.groupBy("VendorID") \
  .agg(
      avg("fare_amount").alias("avg_fare"),
      avg("total_amount").alias("avg_total_earning")
  ) \
  .orderBy("VendorID") \
  .show(10)


+--------+------------------+------------------+
|VendorID|          avg_fare| avg_total_earning|
+--------+------------------+------------------+
|       1|12.055622533380596|16.180159728476944|
|       2|12.678202108439645|17.249695015684615|
|       5| 48.84400000000001|            63.754|
+--------+------------------+------------------+



In [10]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count

# Ensure datetime is sorted properly
window_spec = Window.partitionBy("payment_type").orderBy("tpep_pickup_datetime").rowsBetween(-10, 0)

df.withColumn("moving_count", count("*").over(window_spec)) \
  .select("payment_type", "tpep_pickup_datetime", "moving_count") \
  .show(10)


+------------+--------------------+------------+
|payment_type|tpep_pickup_datetime|moving_count|
+------------+--------------------+------------+
|           0| 2020-01-01 00:01:51|           1|
|           0| 2020-01-01 00:02:00|           2|
|           0| 2020-01-01 00:02:38|           3|
|           0| 2020-01-01 00:02:38|           4|
|           0| 2020-01-01 00:03:00|           5|
|           0| 2020-01-01 00:03:00|           6|
|           0| 2020-01-01 00:06:00|           7|
|           0| 2020-01-01 00:06:02|           8|
|           0| 2020-01-01 00:08:00|           9|
|           0| 2020-01-01 00:10:37|          10|
+------------+--------------------+------------+
only showing top 10 rows



In [11]:
from pyspark.sql.functions import to_date, rank, sum as _sum
from pyspark.sql.window import Window

# Extract trip date from pickup datetime
df_daily = df.withColumn("trip_date", to_date("tpep_pickup_datetime"))

# Aggregate daily total stats per vendor
agg_df = df_daily.groupBy("VendorID", "trip_date").agg(
    _sum("passenger_count").alias("total_passengers"),
    _sum("trip_distance").alias("total_distance"),
    _sum("total_amount").alias("total_earning")
)

# Window spec to rank vendors by earnings per day
vendor_window = Window.partitionBy("trip_date").orderBy(col("total_earning").desc())

# Apply rank and filter top 2
top_vendors = agg_df.withColumn("rank", rank().over(vendor_window)) \
                    .filter("rank <= 2") \
                    .orderBy("trip_date", "rank")

top_vendors.show(10)


+--------+----------+----------------+------------------+------------------+----+
|VendorID| trip_date|total_passengers|    total_distance|     total_earning|rank|
+--------+----------+----------------+------------------+------------------+----+
|       2|2001-01-05|             5.0|              1.53|               8.8|   1|
|       2|2002-12-31|            14.0| 8.049999999999999|43.260000000000005|   1|
|       2|2003-01-01|             6.0|             47.89|            165.42|   1|
|       2|2008-12-31|            34.0|130.17999999999998| 592.1500000000001|   1|
|       2|2009-01-01|            51.0|141.91000000000003| 713.6900000000003|   1|
|       2|2017-01-02|             1.0|               0.0|               2.9|   1|
|       2|2017-01-03|             1.0|               0.0|               2.5|   1|
|       2|2017-12-31|           532.0| 673.1999999999999| 3482.430000000006|   1|
|       2|2018-01-01|        269871.0| 486080.0100000093|2329998.2300013932|   1|
|       1|2018-0

In [12]:
df.groupBy("PULocationID", "DOLocationID") \
  .agg(_sum("passenger_count").alias("total_passengers")) \
  .orderBy("total_passengers", ascending=False) \
  .show(10)


+------------+------------+----------------+
|PULocationID|DOLocationID|total_passengers|
+------------+------------+----------------+
|         264|         264|        231494.0|
|         237|         236|        154622.0|
|         236|         236|        136353.0|
|         236|         237|        128240.0|
|         237|         237|        114256.0|
|         239|         238|         74881.0|
|         239|         142|         71138.0|
|         142|         239|         67625.0|
|         238|         239|         63189.0|
|         141|         236|         62051.0|
+------------+------------+----------------+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import unix_timestamp

# Get the latest timestamp in the dataset
latest_ts = df.select(unix_timestamp("tpep_pickup_datetime").alias("ts")) \
              .agg({"ts": "max"}).collect()[0][0]

df.withColumn("ts", unix_timestamp("tpep_pickup_datetime")) \
  .filter(f"ts >= {latest_ts - 10}") \
  .groupBy("PULocationID") \
  .agg(_sum("passenger_count").alias("recent_passengers")) \
  .orderBy("recent_passengers", ascending=False) \
  .show(10)


+------------+-----------------+
|PULocationID|recent_passengers|
+------------+-----------------+
|          90|              1.0|
+------------+-----------------+



In [15]:
df.write.parquet("combined_nyc_data.parquet", mode="overwrite")


In [16]:
# Bonus (Optional): Save Results as Parquet
import os
os.listdir("combined_nyc_data.parquet")


['._SUCCESS.crc',
 'part-00002-db2fbe48-7a4b-4ad6-9091-22765c654565-c000.snappy.parquet',
 '.part-00002-db2fbe48-7a4b-4ad6-9091-22765c654565-c000.snappy.parquet.crc',
 '_SUCCESS',
 'part-00000-db2fbe48-7a4b-4ad6-9091-22765c654565-c000.snappy.parquet',
 '.part-00000-db2fbe48-7a4b-4ad6-9091-22765c654565-c000.snappy.parquet.crc']

In [17]:
from google.colab import files
files.download("combined_nyc_data.parquet/YOUR_ACTUAL_FILENAME.parquet")


FileNotFoundError: Cannot find file: combined_nyc_data.parquet/YOUR_ACTUAL_FILENAME.parquet

In [18]:
from google.colab import files
files.download("combined_nyc_data.parquet/part-00000-a885ebf6-99fc-43b1-a84a-6af58bfb90f4-c000.snappy.parquet")


FileNotFoundError: Cannot find file: combined_nyc_data.parquet/part-00000-a885ebf6-99fc-43b1-a84a-6af58bfb90f4-c000.snappy.parquet

In [19]:
files.download("combined_nyc_data.parquet/part-00002-a885ebf6-99fc-43b1-a84a-6af58bfb90f4-c000.snappy.parquet")


FileNotFoundError: Cannot find file: combined_nyc_data.parquet/part-00002-a885ebf6-99fc-43b1-a84a-6af58bfb90f4-c000.snappy.parquet