In [0]:
from pyspark.sql import functions as F

# Read Bronze table
bronze_df = spark.read.table("default.taxi_bronze")

print("Bronze row count:", bronze_df.count())
display(bronze_df.limit(5))


Bronze row count: 10000


pickup_datetime,dropoff_datetime,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,passenger_count,fare_amount,trip_distance_km,trip_duration_min,surge_multiplier,pickup_zone,dropoff_zone
2025-10-04 06:01:44,2025-10-04 06:27:03,13.020352,77.557474,12.957031,77.632221,1,248.96,10.732,25.32,1.0,Rajajinagar,Koramangala
2025-10-09 11:00:38,2025-10-09 11:28:12,12.877326,77.572048,13.023753,77.602085,2,340.41,16.604,27.57,1.0,Bannerghatta,Hebbal
2025-10-07 00:20:47,2025-10-07 00:33:47,13.087145,77.589177,13.013447,77.561703,2,200.3,8.718,13.01,1.0,Yelahanka,Rajajinagar
2025-10-01 08:37:05,2025-10-01 08:44:57,12.956569,77.749813,12.975441,77.757532,2,95.69,2.259,7.87,1.0,Whitefield,Whitefield
2025-10-01 03:37:41,2025-10-01 03:52:45,12.967414,77.584438,12.8948,77.55798,1,201.14,8.568,15.07,1.0,MG_Road,Bannerghatta


In [0]:
df = bronze_df.dropDuplicates()

# Drop rows with missing essential values
df = df.dropna(subset=[
    "pickup_datetime",
    "dropoff_datetime",
    "pickup_latitude",
    "pickup_longitude",
    "dropoff_latitude",
    "dropoff_longitude",
    "fare_amount"
])

print("After cleaning:", df.count())


After cleaning: 10000


In [0]:
df = df.withColumn(
    "duration_min",
    (F.col("dropoff_datetime").cast("long") -
     F.col("pickup_datetime").cast("long")) / 60
)

df = df.filter(F.col("duration_min") > 0)


In [0]:
from pyspark.sql import functions as F

df = df.withColumn("pickup_latitude", F.col("pickup_latitude").cast("double")) \
       .withColumn("pickup_longitude", F.col("pickup_longitude").cast("double")) \
       .withColumn("dropoff_latitude", F.col("dropoff_latitude").cast("double")) \
       .withColumn("dropoff_longitude", F.col("dropoff_longitude").cast("double"))


In [0]:
df = df.filter(
    F.col("pickup_latitude").isNotNull() &
    F.col("pickup_longitude").isNotNull() &
    F.col("dropoff_latitude").isNotNull() &
    F.col("dropoff_longitude").isNotNull()
)


In [0]:
df = df.withColumn("pickup_latitude", F.col("pickup_latitude").cast("double")) \
       .withColumn("pickup_longitude", F.col("pickup_longitude").cast("double")) \
       .withColumn("dropoff_latitude", F.col("dropoff_latitude").cast("double")) \
       .withColumn("dropoff_longitude", F.col("dropoff_longitude").cast("double"))

df = df.filter(
    F.col("pickup_latitude").isNotNull() &
    F.col("pickup_longitude").isNotNull() &
    F.col("dropoff_latitude").isNotNull() &
    F.col("dropoff_longitude").isNotNull()
)

print("After lat/long cleaning:", df.count())


After lat/long cleaning: 0


In [0]:
df = df.withColumn(
    "distance_km",
    2 * 6371 * F.asin(
        F.sqrt(
            F.pow(
                F.sin((F.radians(F.col("dropoff_latitude")) -
                       F.radians(F.col("pickup_latitude"))) / 2), 2
            )
            +
            F.cos(F.radians(F.col("pickup_latitude"))) *
            F.cos(F.radians(F.col("dropoff_latitude"))) *
            F.pow(
                F.sin((F.radians(F.col("dropoff_longitude")) -
                       F.radians(F.col("pickup_longitude"))) / 2), 2
            )
        )
    )
)


In [0]:
df.select(
    "pickup_latitude",
    "pickup_longitude",
    "dropoff_latitude",
    "dropoff_longitude",
    "distance_km"
).show(5, truncate=False)


+---------------+----------------+----------------+-----------------+-----------+
|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|distance_km|
+---------------+----------------+----------------+-----------------+-----------+
+---------------+----------------+----------------+-----------------+-----------+



In [0]:
df = df.filter(F.col("distance_km") > 0)
print("After distance filter:", df.count())


After distance filter: 0
