In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, round as spark_round
from pyspark.sql.functions import sum as spark_sum

# Create Spark session
spark = SparkSession.builder.appName("CancellationRate").getOrCreate()

In [0]:
# Sample Trips Data
trips_data = [
    (1, 1, 10, 1, 'completed', '2013-10-01'),
    (2, 2, 11, 1, 'cancelled_by_driver', '2013-10-01'),
    (3, 3, 12, 6, 'completed', '2013-10-01'),
    (4, 4, 13, 6, 'cancelled_by_client', '2013-10-01'),
    (5, 1, 10, 1, 'completed', '2013-10-02'),
    (6, 2, 11, 6, 'completed', '2013-10-02'),
    (7, 3, 12, 6, 'completed', '2013-10-02'),
    (8, 2, 12, 12, 'completed', '2013-10-03'),
    (9, 3, 10, 12, 'completed', '2013-10-03'),
    (10, 4, 13, 12, 'cancelled_by_driver', '2013-10-03')
]
trips_columns = ['id', 'client_id', 'driver_id', 'city_id', 'status', 'request_at']
trips_df = spark.createDataFrame(trips_data, trips_columns)
display(trips_df)

In [0]:
# Sample Users Data
users_data = [
    (1, 'No', 'client'),
    (2, 'Yes', 'client'),
    (3, 'No', 'client'),
    (4, 'No', 'client'),
    (10, 'No', 'driver'),
    (11, 'No', 'driver'),
    (12, 'No', 'driver'),
    (13, 'No', 'driver')
]
users_columns = ['users_id', 'banned', 'role']
users_df = spark.createDataFrame(users_data, users_columns)
display(users_df)

In [0]:
from pyspark.sql.functions import col

# Join to get unbanned clients
trips_with_client = trips_df.alias('t').join(
    users_df.alias('uc'),
    col('t.client_id') == col('uc.users_id'),
    'inner'
).filter(col('uc.banned') == 'No')

display(trips_with_client)

In [0]:
# Join again to get unbanned drivers
trips_filtered = trips_with_client.alias('tc').join(
    users_df.alias('ud'),
    col('tc.driver_id') == col('ud.users_id'),
    'inner'
).filter(col('ud.banned') == 'No').select(('tc.id'),('tc.status'),('tc.request_at'))

display(trips_filtered)

In [0]:
# Add a new column to flag cancellations
trips_flagged = trips_filtered.withColumn('cancelled', when(col('status') != 'completed', 1).otherwise(0))

display(trips_flagged)

In [0]:
# Group by date and compute total and cancelled
cancellation_stats = trips_flagged.groupBy('request_at').agg(
    count('*').alias('total'),
    spark_sum('cancelled').alias('cancelled')
)

display(cancellation_stats)

In PySpark, the format_number function is used to format numeric columns to a string with a fixed number of decimal places.

🔧 Syntax: 
from pyspark.sql.functions import format_number

format_number(column, d)

column: the numeric column to be formatted.

d: number of decimal places to round and format to.

Returns a string column, not a numeric column.

In [0]:
# Calculate cancellation rate and round it
from pyspark.sql.functions import format_number

result_df = cancellation_stats \
            .withColumn('cancellation rate', format_number(col('cancelled') * 1.0 / col('total') * 1.0, 2)) \
            .select(col('request_at').alias('Day'),col('cancellation rate').alias('Cancellation Rate')) \
            .filter(col('Day').between ('2013-10-01', '2013-10-03'))

display(result_df)