In [0]:
 
file_path = "/Volumes/my_databricks_workshop/default/taxi_data_volume/yellow_tripdata_2024-01.parquet"

# Read the Parquet file into a DataFrame named 'df'.
df = spark.read.parquet(file_path)

# Display the first few rows to confirm it loaded correctly.
print("Data loaded successfully!")
display(df)

Data loaded successfully!


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
2,2024-01-01T00:57:55,2024-01-01T01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01T00:03:00,2024-01-01T00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01T00:17:06,2024-01-01T00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
1,2024-01-01T00:36:38,2024-01-01T00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
1,2024-01-01T00:46:51,2024-01-01T00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
1,2024-01-01T00:54:08,2024-01-01T01:26:31,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0
2,2024-01-01T00:49:44,2024-01-01T01:15:47,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75
1,2024-01-01T00:30:40,2024-01-01T00:58:40,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0
2,2024-01-01T00:26:01,2024-01-01T00:54:12,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0
2,2024-01-01T00:28:08,2024-01-01T00:29:16,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0


In [0]:
from pyspark.sql.functions import col

# List of columns to sum for the 'Revenue' calculation.
# Note: The 2024 dataset doesn't have 'MTA_tax' or 'improvement_surcharge'. We'll use the available columns.
revenue_columns = [
    'fare_amount', 'extra', 'tip_amount', 
    'tolls_amount', 'total_amount'
]

# Create the new 'Revenue' column.
# .na.fill(0, ...) replaces any missing values in these columns with 0 before adding them.
df_with_revenue = df.na.fill(0, subset=revenue_columns).withColumn(
    "Revenue",
    col("fare_amount") + col("extra") + col("tip_amount") +
    col("tolls_amount") + col("total_amount")
)

# Display the new column to check work.
print("Query 1: 'Revenue' column added.")
display(df_with_revenue.select("Revenue", *revenue_columns))

Query 1: 'Revenue' column added.


Revenue,fare_amount,extra,tip_amount,tolls_amount,total_amount
41.4,17.7,1.0,0.0,0.0,22.7
36.0,10.0,3.5,3.75,0.0,18.75
61.1,23.3,3.5,3.0,0.0,31.3
32.5,10.0,3.5,2.0,0.0,17.0
30.700000000000003,7.9,3.5,3.2,0.0,16.1
81.5,29.6,3.5,6.9,0.0,41.5
126.65,45.7,6.0,10.0,0.0,64.95
59.3,25.4,3.5,0.0,0.0,30.4
68.0,31.0,1.0,0.0,0.0,36.0
12.0,3.0,1.0,0.0,0.0,8.0


In [0]:
from pyspark.sql.functions import sum, desc

# Group by the pickup location ID.
# Sum the passenger_count for each location.
# Order the results from highest to lowest.
passenger_by_area = df_with_revenue.groupBy("PULocationID") \
    .agg(sum("passenger_count").alias("total_passengers")) \
    .orderBy(desc("total_passengers"))

print("Query 2: Total passengers by pickup area.")
display(passenger_by_area)

Query 2: Total passengers by pickup area.


PULocationID,total_passengers
132,208823.0
161,185613.0
237,181491.0
236,172411.0
230,150508.0
142,136354.0
162,134647.0
186,134211.0
138,117049.0
239,112932.0


In [0]:
from pyspark.sql.functions import col, avg, sum, count
 
vendor_filtered_df = df_with_revenue.filter(col("VendorID").isin([1, 2]))
 
vendor_earnings = vendor_filtered_df.groupBy("VendorID") \
    .agg(
        avg("fare_amount").alias("average_fare_per_trip"),
        sum("total_amount").alias("total_earnings_from_all_trips"),
        count("*").alias("total_number_of_trips")
    )

print("Query 3: Analysis of earnings for the 2 vendors.")
display(vendor_earnings)

Query 3: Analysis of earnings for the 2 vendors.


VendorID,average_fare_per_trip,total_earnings_from_all_trips,total_number_of_trips
1,17.639553342871242,18841261.97999331,729732
2,18.34659326009941,60602721.26999088,2234632


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count
 
window_spec = Window.partitionBy("payment_type").orderBy("tpep_pickup_datetime")
 
moving_payment_count = df_with_revenue.withColumn(
    "moving_count",
    count("*").over(window_spec)
)

print("Query 4: Moving count of payments by payment type.")
 
display(moving_payment_count.select(
    "payment_type", "tpep_pickup_datetime", "moving_count"
).orderBy("payment_type", "tpep_pickup_datetime").limit(100))

Query 4: Moving count of payments by payment type.


payment_type,tpep_pickup_datetime,moving_count
0,2024-01-01T00:00:58,1
0,2024-01-01T00:02:21,2
0,2024-01-01T00:02:24,3
0,2024-01-01T00:02:36,4
0,2024-01-01T00:02:48,5
0,2024-01-01T00:03:35,6
0,2024-01-01T00:03:36,7
0,2024-01-01T00:06:48,8
0,2024-01-01T00:08:49,9
0,2024-01-01T00:08:50,10


In [0]:
from pyspark.sql.functions import to_date, sum, desc, col
 
df_with_date = df_with_revenue.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
 
particular_date_df = df_with_date.filter(col("pickup_date") == "2024-01-25")
 
vendor_daily_stats = particular_date_df.groupBy("VendorID", "pickup_date") \
    .agg(
        sum("Revenue").alias("total_revenue_on_this_day"),
        sum("passenger_count").alias("total_passengers_on_this_day"),
        sum("trip_distance").alias("total_distance_on_this_day")
    )
 
top_2_vendors = vendor_daily_stats.orderBy(desc("total_revenue_on_this_day")).limit(2)


print("Query 5: Highest two gaining vendors on January 25, 2024.")
display(top_2_vendors)

Query 5: Highest two gaining vendors on January 25, 2024.


VendorID,pickup_date,total_revenue_on_this_day,total_passengers_on_this_day,total_distance_on_this_day
2,2024-01-25,4217376.599999869,103717,357327.56999999867
1,2024-01-25,1423179.9999999884,29399,81644.40000000017


In [0]:
from pyspark.sql.functions import sum, desc
 
popular_routes = df_with_revenue.na.drop(subset=["PULocationID", "DOLocationID"]) \
    .groupBy("PULocationID", "DOLocationID") \
    .agg(sum("passenger_count").alias("total_passengers")) \
    .orderBy(desc("total_passengers"))

print("Query 6: Top 10 routes with the most passengers.")
display(popular_routes.limit(10))

Query 6: Top 10 routes with the most passengers.


PULocationID,DOLocationID,total_passengers
237,236,28162
236,237,24866
236,236,20641
237,237,19224
161,237,13025
142,239,11797
239,142,11240
237,161,11156
161,236,11007
239,238,10963


In [0]:
from pyspark.sql.functions import col, max as spark_max, sum, desc
from datetime import timedelta
 
latest_timestamp = df_with_revenue.select(spark_max("tpep_pickup_datetime")).first()[0]
 
ten_seconds_before = latest_timestamp - timedelta(seconds=10)
 
last_10_seconds_activity = df_with_revenue \
    .filter(col("tpep_pickup_datetime") >= ten_seconds_before) \
    .groupBy("PULocationID") \
    .agg(sum("passenger_count").alias("total_passengers")) \
    .orderBy(desc("total_passengers"))

print(f"Query 7: Top pickup locations in the 10 seconds before the latest timestamp ({latest_timestamp}).")
display(last_10_seconds_activity)

Query 7: Top pickup locations in the 10 seconds before the latest timestamp (2024-02-01 00:01:15).


PULocationID,total_passengers
161,1


In [0]:
 
table_name = "my_databricks_workshop.default.nyc_taxi_processed"
 
df_with_revenue.write.mode("overwrite").saveAsTable(table_name)

print(f"SUCCESS! Your final data has been saved and the table '{table_name}' has been created.")

SUCCESS! Your final data has been saved and the table 'my_databricks_workshop.default.nyc_taxi_processed' has been created.


In [0]:
%sql

-- This SQL query will now work perfectly
SELECT * FROM my_databricks_workshop.default.nyc_taxi_processed LIMIT 10;

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,Revenue
2,2024-01-01T00:57:55,2024-01-01T01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,41.4
1,2024-01-01T00:03:00,2024-01-01T00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,36.0
1,2024-01-01T00:17:06,2024-01-01T00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,61.1
1,2024-01-01T00:36:38,2024-01-01T00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0,32.5
1,2024-01-01T00:46:51,2024-01-01T00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0,30.700000000000003
1,2024-01-01T00:54:08,2024-01-01T01:26:31,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0,81.5
2,2024-01-01T00:49:44,2024-01-01T01:15:47,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75,126.65
1,2024-01-01T00:30:40,2024-01-01T00:58:40,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0,59.3
2,2024-01-01T00:26:01,2024-01-01T00:54:12,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0,68.0
2,2024-01-01T00:28:08,2024-01-01T00:29:16,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0,12.0
