In [0]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

In [0]:
df = spark.sql("SELECT * FROM workspace.default.ncr_ride_bookings")

In [0]:

print(f"\nRecords loaded: {df.count():,}")
print(f"Columns: {len(df.columns)}")

In [0]:

# Display original schema
df.printSchema()

In [0]:
# Display sample raw data
df.limit(5).display()

In [0]:
# Check for null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).display()


In [0]:
# Convert numeric columns from string to double
numeric_columns = {
    "Avg VTAT": "AvgVTAT",
    "Avg CTAT": "AvgCTAT",
    "Booking Value": "BookingValue",
    "Ride Distance": "RideDistance",
    "Driver Ratings": "DriverRating",
    "Customer Rating": "CustomerRating"
}

print("\nConverting numeric columns:")
for old_col, new_col in numeric_columns.items():
    # Step 1: Clean the column - remove quotes, trim whitespace
    # Step 2: Replace 'null' text with empty string
    # Step 3: Use CASE WHEN to convert empty strings to NULL, else cast to double
    df = df.withColumn(new_col,
        F.when(
            (F.trim(F.regexp_replace(F.col(old_col), '"""', '')) == "") |
            (F.upper(F.trim(F.regexp_replace(F.col(old_col), '"""', ''))) == "NULL"),
            F.lit(None).cast("double")
        ).otherwise(
            F.regexp_replace(F.col(old_col), '"""', '').cast("double")
        )
    )
    print(f"  ✓ {old_col} → {new_col} (double)")

In [0]:
# Extract date components
df = df \
    .withColumn("BookingDate", F.col("Date")) \
    .withColumn("BookingTimestamp", F.col("Time")) \
    .withColumn("DayOfWeek", F.dayofweek(F.col("Date"))) \
    .withColumn("DayOfWeekName", F.when(F.col("DayOfWeek") == 1, "Sunday")
                                    .when(F.col("DayOfWeek") == 2, "Monday")
                                    .when(F.col("DayOfWeek") == 3, "Tuesday")
                                    .when(F.col("DayOfWeek") == 4, "Wednesday")
                                    .when(F.col("DayOfWeek") == 5, "Thursday")
                                    .when(F.col("DayOfWeek") == 6, "Friday")
                                    .when(F.col("DayOfWeek") == 7, "Saturday")) \
    .withColumn("Month", F.month(F.col("Date"))) \
    .withColumn("MonthName", F.date_format(F.col("Date"), "MMMM")) \
    .withColumn("Quarter", F.quarter(F.col("Date"))) \
    .withColumn("Hour", F.hour(F.col("Time"))) \
    .withColumn("Minute", F.minute(F.col("Time"))) \
    .withColumn("IsWeekend", F.when(F.col("DayOfWeek").isin(1, 7), 1).otherwise(0))

In [0]:
# Create time period buckets
df = df \
    .withColumn("TimePeriod", F.when((F.col("Hour") >= 8) & (F.col("Hour") <= 10), "Morning Peak (8am-10am)")
                                .when((F.col("Hour") >= 12) & (F.col("Hour") <= 14), "Lunch Peak (12pm-2pm)")
                                .when((F.col("Hour") >= 17) & (F.col("Hour") <= 19), "Evening Peak (5pm-7pm)")
                                .when((F.col("Hour") >= 22) | (F.col("Hour") < 1), "Night (10pm-1am)")
                                .otherwise("Off-Peak"))

print("✓ Date/Time features created:")
print("  - DayOfWeek, DayOfWeekName")
print("  - Month, MonthName, Quarter")
print("  - Hour, Minute")
print("  - IsWeekend")
print("  - TimePeriod")


In [0]:
# Analyze unique booking statuses
print("\nUnique Booking Statuses:")
df.groupBy("Booking Status").count().orderBy(F.desc("count")).display()


In [0]:
# Create status flags
df = df \
    .withColumn("IsCompleted", F.when(F.col("Booking Status") == "Completed", 1).otherwise(0)) \
    .withColumn("IsCustomerCancelled", F.when(F.col("Booking Status") == "Cancelled by Customer", 1).otherwise(0)) \
    .withColumn("IsDriverCancelled", F.when(F.col("Booking Status") == "Cancelled by Driver", 1).otherwise(0)) \
    .withColumn("IsNoDriverFound", F.when(F.col("Booking Status") == "No Driver Found", 1).otherwise(0)) \
    .withColumn("IsIncomplete", F.when(F.col("Booking Status") == "Incomplete", 1).otherwise(0)) \
    .withColumn("Cancelled_rides", F.col("IsCustomerCancelled") + F.col("IsDriverCancelled")) \
    .withColumn("Completed_rides", F.col("IsCompleted")) \
    .withColumn("Incomplete_rides", F.col("IsNoDriverFound") + F.col("IsIncomplete"))

print("\n✓ Booking Status Flags Created:")
print("  - IsCompleted, IsCustomerCancelled, IsDriverCancelled, IsNoDriverFound, IsIncomplete")
print("  - Completed_rides, Cancelled_rides, Incomplete_rides")


In [0]:
# Create overall cancellation flag (Cancelled = sum of Cancelled by Driver and Cancelled by Customer)
df = df \
    .withColumn("Cancelled_rides", (F.col("IsCustomerCancelled") + F.col("IsDriverCancelled"))) \
    .withColumn("Completed_rides", F.col("IsCompleted")) \
    .withColumn("Incomplete_rides", (F.col("IsNoDriverFound") + F.col("IsIncomplete")))


In [0]:
# Remove quotes and extra whitespace from string columns
string_cols = ["Booking ID", 
               "Customer ID", 
               "Vehicle Type", 
               "Pickup Location", 
               "Drop Location", 
               "Reason for cancelling by Customer", 
               "Driver Cancellation Reason", "Incomplete Rides Reason", "Payment Method"]

for col in string_cols:
    if col in df.columns:
        df = df.withColumn(col, F.trim(F.regexp_replace(F.col(col), '"""', '')))
        print(f"Cleaned: {col}")



In [0]:
# Numeric columns: fill nulls with 0 for unknown
print("\nHandling numeric nulls:")
df = df \
    .withColumn("AvgVTAT", F.coalesce(F.col("AvgVTAT"), F.lit(0))) \
    .withColumn("AvgCTAT", F.coalesce(F.col("AvgCTAT"), F.lit(0))) \
    .withColumn("BookingValue", F.coalesce(F.col("BookingValue"), F.lit(0))) \
    .withColumn("RideDistance", F.coalesce(F.col("RideDistance"), F.lit(0))) \
    .withColumn("DriverRating", F.coalesce(F.col("DriverRating"), F.lit(0))) \
    .withColumn("CustomerRating", F.coalesce(F.col("CustomerRating"), F.lit(0)))
print("Numeric nulls handled (0)")

In [0]:
# Categorical columns: fill with 'Unknown'
print("\nHandling categorical nulls:")
categorical_cols = ["Reason for cancelling by Customer", "Driver Cancellation Reason", 
                    "Incomplete Rides Reason", "Payment Method"]
for col in categorical_cols:
    df = df.withColumn(col, F.coalesce(F.col(col), F.lit("Unknown")))
    print(f"{col}")

In [0]:
# Check 1: Verify booking status flags sum to correct totals
# Register df as a temporary view
df.createOrReplaceTempView("df")

# Now run your SQL query
status_check = spark.sql("""
SELECT 
    COUNT(*) as TotalRows,
    SUM(IsCompleted) as CompletedFlag,
    SUM(IsCustomerCancelled) as CustomerCancelledFlag,
    SUM(IsDriverCancelled) as DriverCancelledFlag,
    SUM(IsNoDriverFound) as NoDriverFoundFlag,
    SUM(IsIncomplete) as IncompleteFlag,
    SUM(IsCompleted + IsCustomerCancelled + IsDriverCancelled + IsNoDriverFound + IsIncomplete) as SumOfFlags
FROM df
""")
display(status_check)

In [0]:
# Check 2: Verify numeric ranges
print("\n✓ Numeric Column Ranges:")
df.select(
    F.min("AvgVTAT").alias("Min_VTAT"),
    F.max("AvgVTAT").alias("Max_VTAT"),
    F.min("AvgCTAT").alias("Min_CTAT"),
    F.max("AvgCTAT").alias("Max_CTAT"),
    F.min("RideDistance").alias("Min_Distance"),
    F.max("RideDistance").alias("Max_Distance"),
    F.min("BookingValue").alias("Min_Value"),
    F.max("BookingValue").alias("Max_Value"),
    F.min("DriverRating").alias("Min_DriverRating"),
    F.max("DriverRating").alias("Max_DriverRating"),
    F.min("CustomerRating").alias("Min_CustomerRating"),
    F.max("CustomerRating").alias("Max_CustomerRating")
).display()

In [0]:
# Check 3: Missing values after cleaning
print("\n✓ Null Value Count After Cleaning:")
null_check = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(f"{c}_nulls") for c in df.columns])
null_check.display()


In [0]:
# Revenue efficiency metrics (only for completed rides)
df = df \
    .withColumn("RevenuePerKm", F.when(F.col("RideDistance") > 0, F.col("BookingValue") / F.col("RideDistance")).otherwise(0)) \
    .withColumn("RevenuePerMinute", F.when(F.col("AvgCTAT") > 0, F.col("BookingValue") / F.col("AvgCTAT")).otherwise(0))


In [0]:
# Rider satisfaction (rating above/below average)
avg_customer_rating = df.filter(F.col("CustomerRating") > 0).agg(F.avg("CustomerRating")).collect()[0][0]
avg_driver_rating = df.filter(F.col("DriverRating") > 0).agg(F.avg("DriverRating")).collect()[0][0]

df = df \
    .withColumn("CustomerRatingAboveAvg", F.when(F.col("CustomerRating") > avg_customer_rating, 1).otherwise(0)) \
    .withColumn("DriverRatingAboveAvg", F.when(F.col("DriverRating") > avg_driver_rating, 1).otherwise(0))

print(f"✓ Average Customer Rating: {avg_customer_rating:.2f}")
print(f"✓ Average Driver Rating: {avg_driver_rating:.2f}")
print("✓ Derived metrics created:")
print("  - RevenuePerKm")
print("  - RevenuePerMinute")
print("  - CustomerRatingAboveAvg")
print("  - DriverRatingAboveAvg")

In [0]:
# Select final columns in logical order
df_clean = df.select(
    # Identifiers
    F.col("Booking ID").alias("BookingID"),
    F.col("Customer ID").alias("CustomerID"),
    
    # Date & Time
    "BookingDate",
    "BookingTimestamp",
    "DayOfWeek",
    "DayOfWeekName",
    "Month",
    "MonthName",
    "Quarter",
    "Hour",
    "Minute",
    "IsWeekend",
    "TimePeriod",
    
    # Location
    F.col("Pickup Location").alias("PickupLocation"),
    F.col("Drop Location").alias("DropLocation"),
    
    # Booking Details
    F.col("Vehicle Type").alias("VehicleType"),
    F.col("Payment Method").alias("PaymentMethod"),
    F.col("Booking Status").alias("BookingStatus"),
    
    # Metrics
    "BookingValue",
    "RideDistance",
    "AvgVTAT",
    "AvgCTAT",
    "DriverRating",
    "CustomerRating",
    
    # Derived Metrics
    "RevenuePerKm",
    "RevenuePerMinute",
    "CustomerRatingAboveAvg",
    "DriverRatingAboveAvg",
    
    # Status Flags
    "IsCompleted",
    "IsCustomerCancelled",
    "IsDriverCancelled",
    "IsNoDriverFound",
    "IsIncomplete",
    
    # Cancellation Reasons
    F.col("Reason for cancelling by Customer").alias("CustomerCancellationReason"),
    F.col("Driver Cancellation Reason").alias("DriverCancellationReason"),
    F.col("Incomplete Rides Reason").alias("IncompleteRideReason")
)

print("\n✓ Final columns selected and organized")
print(f"✓ Total columns: {len(df_clean.columns)}")

In [0]:
# FINAL SCHEMA REVIEW
# CLEANED DATA SCHEMA
df_clean.printSchema()
# SAMPLE CLEANED DATA (5 rows)
df_clean.limit(5).display()
# DATA SUMMARY
print(f"Total Records: {df_clean.count():,}")
print(f"Total Columns: {len(df_clean.columns)}")

In [0]:
# Save as managed table
df_clean.write.mode("overwrite").saveAsTable("uber_data_clean")

print("✓ Cleaned data saved to: uber_data_clean")
print(f"✓ Ready for analysis!")