In [0]:
orders = spark.read.format("csv").option("header",True).load("/Volumes/dev/scholarnest/spark_data/data_orders.csv")
offers = spark.read.format("csv").option("header",True).load("/Volumes/dev/scholarnest/spark_data/data_offers.csv")

In [0]:
df = orders.join(offers,orders.order_gk == offers.order_gk,"inner").select(orders['*'],offers['offer_id'])
df.display()

In [0]:
from pyspark.sql import functions as F

df = (
    df
    .withColumn(
        "is_driver_assigned",
        F.when(F.col("is_driver_assigned_key") == 1, F.lit("Yes")).otherwise(F.lit("No"))
    )
    .withColumn(
        "order_status",
        F.when(F.col("order_status_key") == 4, F.lit("Client Cancelled"))
         .otherwise(F.lit("System Reject"))
    )
    .drop("is_driver_assigned_key", "order_status_key")
)
df.display()


Build up a distribution of orders according to reasons for failure: cancellations before and after driver assignment, and reasons for order rejection. Analyse the resulting plot. Which category has the highest number of orders?

In [0]:
from pyspark.sql.functions import count
import matplotlib.pyplot as plt

# Step 1: Aggregate like pivot
df_q1 = (
    df.groupBy("is_driver_assigned", "order_status")
      .agg(count("order_gk").alias("order_count"))
)

# Step 2: Convert to Pandas for pivot + plotting
pdf_q1 = df_q1.toPandas()

# Step 3: Pivot the pandas DataFrame (wide format)
pivot_q1 = pdf_q1.pivot_table(
    columns=["order_status","is_driver_assigned"],
    values="order_count",
    aggfunc="sum",
    fill_value=0
)

# Step 4: Plot the bar chart
pivot_q1.plot(
    kind="bar",
    figsize=(7, 7),
    legend=True,
    rot=0
)
plt.title("Order Counts by Driver Assignment and Order Status")
plt.ylabel("Count of Orders")
plt.show()


Plot the distribution of failed orders by hours. Is there a trend that certain hours have an abnormally high proportion of one category or another? What hours are the biggest fails? How can this be explained? 

In [0]:
# df_time = df.withColumn('order_time',F.col('order_datetime').split(":")[0])
df_time = df.withColumn(
    "order_time",
    F.split(F.col("order_datetime"), ":")[0]   # takes part before first ":"
)
df_time.display()



In [0]:
df_time_grouped = df_time.groupby("order_time","is_driver_assigned","order_status").agg(count("order_gk").alias("order_gk"))

In [0]:
grouped_q2 = df_time_grouped.toPandas()
_ = grouped_q2.reset_index().pivot(index="order_time",
                                   columns=["is_driver_assigned", "order_status"],
                                   values="order_gk").plot(xticks=range(0, 24),
                                                           figsize=(13, 7),
                                                           title="Count of Failed Orders Per Hour and Category")

Plot the average time to cancellation with and without driver, by hour. Can we draw any conclusions from this plot?



In [0]:
from pyspark.sql.functions import mean
df_cancellation_grouped = df_time.groupby("order_time","is_driver_assigned").agg(mean("cancellations_time_in_seconds").alias("cancellations_time_in_seconds"))
df_cancellation_grouped.display()

In [0]:
grouped_q3 = df_cancellation_grouped.toPandas()  
_ = grouped_q3.reset_index().pivot(index="order_time",
                                   columns="is_driver_assigned",
                                   values="cancellations_time_in_seconds").plot(xticks=range(0, 24),
                                                                                figsize=(13, 7),
                                                                                title="Average Time to Cancellation Per Hour and Driver Assignment")