Create staging schema 

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS staging;


In [0]:
# read food delivery delta table from landing schema
landing_df = spark.read.table('landing.food_delivery_data')

In [0]:
# Columns renaming
col_mapping = {
    'ID': 'id',
    'Delivery_person_ID': 'delivery_person_id',
    'Delivery_person_Age': 'delivery_person_age',
    'Delivery_person_Ratings': 'delivery_person_ratings',
    'Restaurant_latitude': 'restaurant_latitude',
    'Restaurant_longitude': 'restaurant_longitude',
    'Delivery_location_latitude': 'delivery_location_latitude',
    'Delivery_location_longitude': 'delivery_location_longitude',
    'Order_Date': 'order_date',
    'Time_Orderd': 'time_ordered',
    'Time_Order_picked': 'time_order_picked',
    'Weatherconditions': 'weather_conditions',
    'Road_traffic_density': 'road_traffic_density',
    'Vehicle_condition': 'vehicle_condition',
    'Type_of_order': 'type_of_order',
    'Type_of_vehicle': 'type_of_vehicle',
    'multiple_deliveries': 'multiple_deliveries',
    'Festival': 'festival',
    'City': 'city',
    'Time_taken_min': 'time_taken_min'
}

col_renamed_df = landing_df.withColumnsRenamed(col_mapping)


In [0]:
from pyspark.sql.functions import * 

# Clean and tranform data by changing datatype and date formatting
#time difference between order placed and order picked up by delivery person
# adding column for name of week food ordered
updated_df = col_renamed_df \
    .withColumn("delivery_person_age", col("delivery_person_age").cast("int")) \
    .withColumn("delivery_person_ratings", col("delivery_person_ratings").cast("float")) \
    .withColumn("restaurant_latitude", col("restaurant_latitude").cast("float")) \
    .withColumn("restaurant_longitude", col("restaurant_longitude").cast("float")) \
    .withColumn("delivery_location_latitude", col("delivery_location_latitude").cast("float")) \
    .withColumn("delivery_location_longitude", col("delivery_location_longitude").cast("float")) \
    .withColumn("time_taken_min", split(col("time_taken_min"), " ")[1].cast("int")) \
    .withColumn("weather_conditions", split(col("weather_conditions"), " ")[1]) \
    .withColumn("time_ordered_formatted", to_timestamp(concat(col("order_date"), lit(" "), col("time_ordered")), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("time_order_picked_formatted", to_timestamp(concat(col("order_date"), lit(" "), split(col("time_order_picked")," ")[1]), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
    .withColumn(
        "order_prepare_time_mins",
        (unix_timestamp(col("time_order_picked_formatted")) - unix_timestamp(col("time_ordered_formatted"))) / 60) \
    .withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date")) \
    .withColumn("quarter", quarter("order_date")) \
    .withColumn("weekday_name", date_format(col("order_date"), "EEEE")) \
    .withColumn("city", when(col("city").like("%NaN%"), "Others").otherwise(col("city"))) \
    .withColumn("weather_conditions", when(col("weather_conditions").like("%NaN%"), "Others").otherwise(col("weather_conditions")))

# Handle missing values
staging_df = updated_df.fillna({"delivery_person_age": 0, "delivery_person_ratings": 0, "time_taken_min": 0})  
# staging_df.show(5)

# Write the cleaned data into a Delta table (this is our Staging Layer)
staging_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("staging.food_delivery_stage")