In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


Loading data

In [0]:
df = spark.read.csv("/Volumes/workspace/default/nyc_data/yellow_tripdata_2016-03.csv", header=True, inferSchema=True)
df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RatecodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1|    01-03-2016 00:00|     01-03-2016 00:07|              1|          2.5|    -73.97674561|    40.76515198|         1|                 N|     -74.00426

Query 1: Add a “Revenue” Column

In [0]:
from pyspark.sql.functions import col

df = df.withColumn("Revenue", 
    col("fare_amount") + col("extra") + col("mta_tax") + 
    col("improvement_surcharge") + col("tip_amount") + 
    col("tolls_amount") + col("total_amount"))


 Query 2: Increasing Count of Total Passengers by Area (Grouped by Pickup Location)

In [0]:
from pyspark.sql.functions import round, sum

df.groupBy(
    round(col("pickup_longitude"), 2).alias("pickup_lon_area"),
    round(col("pickup_latitude"), 2).alias("pickup_lat_area")
).agg(sum("passenger_count").alias("total_passengers")) \
 .orderBy(col("total_passengers").asc()) \
 .show()


+---------------+---------------+----------------+
|pickup_lon_area|pickup_lat_area|total_passengers|
+---------------+---------------+----------------+
|         -73.87|          40.98|               0|
|         -73.95|          40.87|               0|
|         -74.23|          40.53|               0|
|         -73.77|          40.64|               1|
|         -73.91|          40.64|               1|
|         -73.97|          40.84|               1|
|         -73.81|          40.97|               1|
|         -73.73|          40.95|               1|
|         -73.89|          40.89|               1|
|         -73.88|          40.85|               1|
|         -73.97|           40.6|               1|
|          -73.9|          40.65|               1|
|         -74.19|          40.68|               1|
|         -73.81|           40.9|               1|
|         -73.95|           40.9|               1|
|         -73.85|          40.87|               1|
|         -74.01|          40.6

Query 3: Real-time Average Fare & Total Earning by 2 Vendors

In [0]:
from pyspark.sql.functions import avg, round

df.groupBy("VendorID") \
  .agg(
    round(avg("fare_amount"), 2).alias("avg_fare"),
    round(avg("total_amount"), 2).alias("avg_total_earning")
  ) \
  .show()


+--------+--------+-----------------+
|VendorID|avg_fare|avg_total_earning|
+--------+--------+-----------------+
|       1|   12.25|            15.46|
|       2|   12.67|            15.97|
+--------+--------+-----------------+



Query 4: Moving Count of Payments Made by Each Payment Mode

In [0]:
from pyspark.sql.functions import to_timestamp, window, col, count

df_with_time = df.withColumn(
    "pickup_time",
    to_timestamp(col("tpep_pickup_datetime"), "dd-MM-yyyy HH:mm")
)


In [0]:
df_with_time.groupBy(
    window("pickup_time", "1 hour"),  # adjust as needed
    "payment_type"
).agg(count("*").alias("payment_count")) \
 .orderBy("window") \
 .show()


+--------------------+------------+-------------+
|              window|payment_type|payment_count|
+--------------------+------------+-------------+
|{2016-03-01 00:00...|           1|         4500|
|{2016-03-01 00:00...|           4|           13|
|{2016-03-01 00:00...|           2|         2522|
|{2016-03-01 00:00...|           3|           44|
|{2016-03-01 01:00...|           1|         2500|
|{2016-03-01 01:00...|           3|           18|
|{2016-03-01 01:00...|           2|         1617|
|{2016-03-01 01:00...|           4|           13|
|{2016-03-01 02:00...|           3|           19|
|{2016-03-01 02:00...|           2|         1089|
|{2016-03-01 02:00...|           1|         1491|
|{2016-03-01 02:00...|           4|            3|
|{2016-03-01 03:00...|           2|          862|
|{2016-03-01 03:00...|           1|          977|
|{2016-03-01 03:00...|           3|           16|
|{2016-03-01 03:00...|           4|            5|
|{2016-03-01 04:00...|           2|          891|


Query 5: Top 2 Vendors by Revenue on a Date, with Passenger Count & Distance

In [0]:
from pyspark.sql.functions import to_date

df_filtered = df.withColumn("trip_date", to_date("tpep_pickup_datetime", "MM-dd-yyyy HH:mm"))

df_filtered.filter(col("trip_date") == "2016-03-01") \
    .groupBy("VendorID") \
    .agg(
        sum("total_amount").alias("total_revenue"),
        sum("passenger_count").alias("total_passengers"),
        sum("trip_distance").alias("total_distance")
    ) \
    .orderBy(col("total_revenue").desc()) \
    .limit(2) \
    .show()


+--------+-------------+----------------+--------------+
|VendorID|total_revenue|total_passengers|total_distance|
+--------+-------------+----------------+--------------+
+--------+-------------+----------------+--------------+



Query 6: Most Passengers Between a Route of Two Locations

In [0]:
df.groupBy(
    round(col("pickup_longitude"), 2).alias("pickup_lon"),
    round(col("pickup_latitude"), 2).alias("pickup_lat"),
    round(col("dropoff_longitude"), 2).alias("dropoff_lon"),
    round(col("dropoff_latitude"), 2).alias("dropoff_lat")
).agg(sum("passenger_count").alias("total_passengers")) \
 .orderBy(col("total_passengers").desc()) \
 .show(1)


+----------+----------+-----------+-----------+----------------+
|pickup_lon|pickup_lat|dropoff_lon|dropoff_lat|total_passengers|
+----------+----------+-----------+-----------+----------------+
|       0.0|       0.0|        0.0|        0.0|           18876|
+----------+----------+-----------+-----------+----------------+
only showing top 1 row


Query 7: Top Pickup Locations in Last 5/10 Seconds

In [0]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("pickup_time", to_timestamp("tpep_pickup_datetime", "dd-MM-yyyy HH:mm"))


In [0]:
from pyspark.sql.functions import unix_timestamp, max as max_, sum, col, lit

df = df.withColumn("pickup_time", to_timestamp("tpep_pickup_datetime", "dd-MM-yyyy HH:mm"))

# Step 2: Find latest pickup time
latest_time = df.select(max_("pickup_time")).first()[0]

# Step 3: Filter last 10 seconds of data
df.filter(
    unix_timestamp("pickup_time") >= unix_timestamp(lit(latest_time)) - 10
).groupBy(
    "pickup_longitude", "pickup_latitude"
).agg(
    sum("passenger_count").alias("total_passengers")
).orderBy(
    col("total_passengers").desc()
).show()


+----------------+---------------+----------------+
|pickup_longitude|pickup_latitude|total_passengers|
+----------------+---------------+----------------+
|    -73.98389435|    40.75371552|               6|
|    -73.98871613|    40.74850082|               6|
|    -73.98594666|    40.75743103|               6|
|    -73.98747253|    40.71986008|               6|
|    -73.99058533|     40.7493248|               6|
|    -73.98401642|    40.76475525|               6|
|     -73.9460907|    40.77297974|               6|
|    -73.97637177|    40.74420547|               5|
|    -73.98356628|    40.72583008|               5|
|     -73.9523468|    40.80366898|               5|
|    -74.00266266|    40.71846008|               5|
|    -73.87091827|     40.7737999|               5|
|    -73.96340942|    40.77112961|               5|
|    -73.99378967|    40.72467041|               5|
|    -73.97901917|    40.75924683|               5|
|    -73.98064423|    40.73390961|               5|
|    -73.982