In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session with case sensitivity enabled
spark = SparkSession.builder \
    .appName("Capstone Project") \
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()

In [None]:
cosmos_endpoint = "https://capstone-cosmosdb-noel.documents.azure.com:443/"
cosmos_master_key = ""
database_name = "capstone_cosmos_database"
zomato_delivery_operational_analytics_container = "zomato_delivery_operational_analytics_container"
zomato_dataset_metropolitan_container = "zomato_dataset_metropolitan_container"


spark.conf.set("spark.cosmos.accountEndpoint", cosmos_endpoint)
spark.conf.set("spark.cosmos.accountKey", cosmos_master_key)
spark.conf.set("spark.cosmos.database", database_name)

In [None]:
num_partition = spark.sparkContext.defaultParallelism
print(num_partition)

8


### retreiving data from zomato_delivery_operational_analytics_container

In [None]:
spark.conf.set("spark.cosmos.container", zomato_delivery_operational_analytics_container)

zomato_delivery_operational_analytics_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
    .option("spark.cosmos.accountKey", cosmos_master_key) \
    .option("spark.cosmos.database", database_name) \
    .option("spark.cosmos.container", zomato_delivery_operational_analytics_container) \
    .load()
 
zomato_delivery_operational_analytics_df.show()

+------+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+--------------------+-------------+-------------+----------------+------------------+----------------+----------+-------------------+------------------+--------+
|    ID|Time_Order_picked|Time_Orderd|Restaurant_latitude|Restaurant_longitude|Delivery_location_longitude|Delivery_person_Ratings|Vehicle_condition|Delivery_person_Age|Delivery_location_latitude|Road_traffic_density|                  id|Type_of_order|         City|Time_taken (min)|Delivery_person_ID| Type_of_vehicle|Order_Date|multiple_deliveries|Weather_conditions|Festival|
+------+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+--------------------+----

In [None]:
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df.repartition(num_partition)
zomato_delivery_operational_analytics_df_modified.cache()
zomato_delivery_operational_analytics_df_modified.count()

Out[6]: 44983

In [None]:
zomato_delivery_operational_analytics_df_modified.select([count(when(isnull(col), col)).alias(col) for col in zomato_delivery_operational_analytics_df_modified.columns]).show()

+---+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+---+-------------+----+----------------+------------------+---------------+----------+-------------------+------------------+--------+
| ID|Time_Order_picked|Time_Orderd|Restaurant_latitude|Restaurant_longitude|Delivery_location_longitude|Delivery_person_Ratings|Vehicle_condition|Delivery_person_Age|Delivery_location_latitude|Road_traffic_density| id|Type_of_order|City|Time_taken (min)|Delivery_person_ID|Type_of_vehicle|Order_Date|multiple_deliveries|Weather_conditions|Festival|
+---+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+---+-------------+----+----------------+------------------+---------------+----------+--

### retreiving data from zomato_dataset_metropolitan_container

In [None]:
spark.conf.set("spark.cosmos.container", zomato_dataset_metropolitan_container)

zomato_dataset_metropolitan_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
    .option("spark.cosmos.accountKey", cosmos_master_key) \
    .option("spark.cosmos.database", database_name) \
    .option("spark.cosmos.container", zomato_dataset_metropolitan_container) \
    .load()
 
zomato_dataset_metropolitan_df.show()

+---------------+--------------------+-------------+------------+-------------+-----+------+------------+--------------+--------------------+----------+--------------------+--------------+
|Delivery Rating|           Item Name|Dining Rating|    Cuisine |   Place Name|Votes|Prices|Dining Votes|Delivery Votes|                  id|      City|     Restaurant Name|   Best Seller|
+---------------+--------------------+-------------+------------+-------------+-----+------+------------+--------------+--------------------+----------+--------------------+--------------+
|            3.8|   Chicken Sandwhich|          3.7|       Shake|      Chembur|    0|   129|          16|           330|ae8cbb7b-b8d5-4a8...|    Mumbai|        Shivsu Pizza|          null|
|            3.8|Chilli Cheese Toa...|          3.7|       Shake|      Chembur|    0|   139|          16|           330|af779ee1-9310-4ae...|    Mumbai|        Shivsu Pizza|          null|
|            3.8|      Pepper Chicken|          3.7|   

In [None]:
zomato_dataset_metropolitan_df_modified = zomato_dataset_metropolitan_df.repartition(num_partition)
zomato_dataset_metropolitan_df_modified.cache()
zomato_dataset_metropolitan_df_modified.count()

Out[10]: 123651

In [None]:
zomato_dataset_metropolitan_df_modified.select([count(when(isnull(col), col)).alias(col) for col in zomato_dataset_metropolitan_df_modified.columns]).show()

+---------------+---------+-------------+--------+----------+-----+------+------------+--------------+---+----+---------------+-----------+
|Delivery Rating|Item Name|Dining Rating|Cuisine |Place Name|Votes|Prices|Dining Votes|Delivery Votes| id|City|Restaurant Name|Best Seller|
+---------------+---------+-------------+--------+----------+-----+------+------------+--------------+---+----+---------------+-----------+
|           1280|        0|        32236|       0|         0|    0|     0|           0|             0|  0|   0|              0|      95709|
+---------------+---------+-------------+--------+----------+-----+------+------------+--------------+---+----+---------------+-----------+



# Transformations

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
zomato_dataset_metropolitan_df_modified = zomato_dataset_metropolitan_df_modified \
    .withColumn('Delivery Rating', col('Delivery Rating').cast(FloatType())).withColumnRenamed('Delivery Rating', 'Delivery_Rating') \
    .withColumn('Item Name', col('Item Name').cast(StringType())).withColumnRenamed('Item Name', 'Item_Name') \
    .withColumn('Dining Rating', col('Dining Rating').cast(FloatType())).withColumnRenamed('Dining Rating', 'Dining_Rating') \
    .withColumnRenamed('Cuisine ', 'Cuisine') \
    .withColumn('Place Name', col('Place Name').cast(StringType())).withColumnRenamed('Place Name', 'Place_Name') \
    .withColumn('Votes', col('Votes').cast(IntegerType())) \
    .withColumn('Prices', col('Prices').cast(FloatType())) \
    .withColumn('Dining Votes', col('Dining Votes').cast(IntegerType())).withColumnRenamed('Dining Votes', 'Dining_Votes') \
    .withColumn('Delivery Votes', col('Delivery Votes').cast(IntegerType())).withColumnRenamed('Delivery Votes', 'Delivery_Votes') \
    .withColumn('City', col('City').cast(StringType())) \
    .withColumn('Restaurant Name', col('Restaurant Name').cast(StringType())).withColumnRenamed('Restaurant Name', 'Restaurant_Name') \
    .withColumn('Best Seller', col('Best Seller').cast(StringType())).withColumnRenamed('Best Seller', 'Best_Seller') \
    .withColumn('id', col('id').cast(StringType())).withColumnRenamed('id', 'unique_row_id')

zomato_dataset_metropolitan_df_modified.show()

+---------------+--------------------+-------------+----------+--------------+-----+------+------------+--------------+--------------------+----------+--------------------+--------------+
|Delivery_Rating|           Item_Name|Dining_Rating|   Cuisine|    Place_Name|Votes|Prices|Dining_Votes|Delivery_Votes|       unique_row_id|      City|     Restaurant_Name|   Best_Seller|
+---------------+--------------------+-------------+----------+--------------+-----+------+------------+--------------+--------------------+----------+--------------------+--------------+
|            4.2|       Sandesh Plain|          3.8|  Desserts|      C Scheme|    0|120.48|         872|             0|5baa741c-05e5-45f...|    Jaipur|               Kanha|          null|
|            4.0|Paneer Grilled Sa...|         null| Beverages|   RTC X roads|    0| 100.0|           0|            15|3dc7bf49-f3e5-44f...| Hyderabad| Yashwanth Pizza Den|    BESTSELLER|
|            4.0|Chicken Pepperoni...|         null|  Desser

In [None]:
zomato_dataset_metropolitan_df_modified.select([count(when(isnull(col), col)).alias(col) for col in zomato_dataset_metropolitan_df_modified.columns]).show()

+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+
|Delivery_Rating|Item_Name|Dining_Rating|Cuisine|Place_Name|Votes|Prices|Dining_Votes|Delivery_Votes|unique_row_id|City|Restaurant_Name|Best_Seller|
+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+
|           1280|        0|        32236|      0|         0|    0|     0|           0|             0|            0|   0|              0|      95709|
+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+



In [None]:
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
    .withColumn("Restaurant_latitude", col("Restaurant_latitude").cast(DoubleType())) \
    .withColumn("Restaurant_longitude", col("Restaurant_longitude").cast(DoubleType())) \
    .withColumn("Delivery_location_latitude", col("Delivery_location_latitude").cast(DoubleType())) \
    .withColumn("Delivery_location_longitude", col("Delivery_location_longitude").cast(DoubleType())) \
    .withColumn("Delivery_person_Ratings", col("Delivery_person_Ratings").cast(DoubleType())) \
    .withColumn("Delivery_person_Age", col("Delivery_person_Age").cast(IntegerType())) \
    .withColumn("multiple_deliveries", col("multiple_deliveries").cast(IntegerType())) \
    .withColumn("Time_taken (min)", col("Time_taken (min)").cast(IntegerType())) \
    .withColumn("Weather_conditions", col("Weather_conditions").cast(StringType())) \
    .withColumn("Road_traffic_density", col("Road_traffic_density").cast(StringType())) \
    .withColumn("Vehicle_condition", col("Vehicle_condition").cast(IntegerType())) \
    .withColumn("Type_of_order", col("Type_of_order").cast(StringType())) \
    .withColumn("Type_of_vehicle", col("Type_of_vehicle").cast(StringType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("Festival", col("Festival").cast(StringType())) \
    .withColumn("Delivery_person_ID", col("Delivery_person_ID").cast(StringType())) \
    .withColumnRenamed('id', 'unique_row_id')
    # .withColumn("Time_Orderd", date_format(to_timestamp(col("Time_Orderd"), 'HH:mm'), 'HH:mm').alias("Time_Orderd")) \
    # .withColumn("Time_Order_picked", date_format(to_timestamp(col("Time_Order_picked"), 'HH:mm'), 'HH:mm').alias("Time_Order_picked")) \
    # .withColumn("Order_Date", date_format(to_timestamp(col("Order_Date"), 'dd-mm-yyyy'), 'dd-MM-yyyy').alias('Order_Date'))

### dropping unnecessary column containing row-identifier information from Cosmos

In [None]:
zomato_dataset_metropolitan_df_modified.drop('unique_row_id')
zomato_delivery_operational_analytics_df_modified.drop('unique_row_id')

Out[16]: DataFrame[ID: string, Time_Order_picked: string, Time_Orderd: string, Restaurant_latitude: double, Restaurant_longitude: double, Delivery_location_longitude: double, Delivery_person_Ratings: double, Vehicle_condition: int, Delivery_person_Age: int, Delivery_location_latitude: double, Road_traffic_density: string, Type_of_order: string, City: string, Time_taken (min): int, Delivery_person_ID: string, Type_of_vehicle: string, Order_Date: string, multiple_deliveries: int, Weather_conditions: string, Festival: string]

### operational analytics DF has a column named 'City' - consisting of values 'Urban' and 'Metropolitian'. we can filter out just 'Metropolitian' as we are looking to analyze that data

In [None]:
print(zomato_delivery_operational_analytics_df_modified.count())
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified.filter(col('City') == 'Metropolitian')
print(zomato_delivery_operational_analytics_df_modified.count())

44983
33623


### handle issues in 'Time_Orderd' and 'Time_Order_picked' in zomato_delivery_operational_analytics_df_modified

In [None]:
# Convert Order_Date to 'yyyy-MM-dd' format
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified.withColumn("Order_Date", to_date(col("Order_Date"), "dd-MM-yyyy"))

# UDF to convert decimal to HH:mm
def convert_decimal_to_time(decimal_value):
    try:
        decimal_value = float(decimal_value)
        total_minutes = decimal_value * 1440
        hours = int(total_minutes // 60)
        minutes = int(total_minutes % 60)
        return f"{hours:02}:{minutes:02}"
    except (ValueError, TypeError):
        return None

convert_decimal_to_time_udf = udf(convert_decimal_to_time, StringType())

# using UDF to handle decimal values
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
     .withColumn('Time_Orderd', when(col('Time_Orderd').cast('float').isNotNull(), 
                                    convert_decimal_to_time_udf(col('Time_Orderd'))).otherwise(col('Time_Orderd'))) \
     .withColumn('Time_Order_picked', when(col('Time_Order_picked').cast('float').isNotNull(), 
                                    convert_decimal_to_time_udf(col('Time_Order_picked'))).otherwise(col('Time_Order_picked')))



# standardize all time formats to HH:mm:ss
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
     .withColumn('Time_Orderd', \
         when(col('Time_Orderd').rlike(r'^\d{2}:\d{2}:\d{2}$'), date_format(to_timestamp(col('Time_Orderd'), 'HH:mm:ss'), 'HH:mm:ss')) \
        .when(col('Time_Orderd').rlike(r'^\d{2}:\d{2}$'), date_format(to_timestamp(col('Time_Orderd'), 'HH:mm'), 'HH:mm:ss')) \
        .otherwise(None))

zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
     .withColumn('Time_Order_picked', \
         when(col('Time_Order_picked').rlike(r'^\d{2}:\d{2}:\d{2}$'), date_format(to_timestamp(col('Time_Order_picked'), 'HH:mm:ss'), 'HH:mm:ss')) \
        .when(col('Time_Order_picked').rlike(r'^\d{2}:\d{2}$'), date_format(to_timestamp(col('Time_Order_picked'), 'HH:mm'), 'HH:mm:ss')) \
        .otherwise(None))

# Combine Date and Time into yyyy-MM-dd HH:mm:ss format
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
     .withColumn('Time_Orderd', to_timestamp(concat(col('Order_Date'), lit(' '), col('Time_Orderd')), 'yyyy-MM-dd HH:mm:ss')) \
     .withColumn('Time_Order_picked', to_timestamp(concat(col('Order_Date'), lit(' '), col('Time_Order_picked')), 'yyyy-MM-dd HH:mm:ss'))

In [None]:
zomato_delivery_operational_analytics_df_modified.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Time_Order_picked: timestamp (nullable = true)
 |-- Time_Orderd: timestamp (nullable = true)
 |-- Restaurant_latitude: double (nullable = true)
 |-- Restaurant_longitude: double (nullable = true)
 |-- Delivery_location_longitude: double (nullable = true)
 |-- Delivery_person_Ratings: double (nullable = true)
 |-- Vehicle_condition: integer (nullable = true)
 |-- Delivery_person_Age: integer (nullable = true)
 |-- Delivery_location_latitude: double (nullable = true)
 |-- Road_traffic_density: string (nullable = true)
 |-- unique_row_id: string (nullable = false)
 |-- Type_of_order: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Time_taken (min): integer (nullable = true)
 |-- Delivery_person_ID: string (nullable = true)
 |-- Type_of_vehicle: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- multiple_deliveries: integer (nullable = true)
 |-- Weather_conditions: string (nullable = true)
 |-- Festival: stri

In [None]:
zomato_delivery_operational_analytics_df_modified.show()

+------+-------------------+-------------------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+--------------------+-------------+-------------+----------------+------------------+----------------+----------+-------------------+------------------+--------+
|    ID|  Time_Order_picked|        Time_Orderd|Restaurant_latitude|Restaurant_longitude|Delivery_location_longitude|Delivery_person_Ratings|Vehicle_condition|Delivery_person_Age|Delivery_location_latitude|Road_traffic_density|       unique_row_id|Type_of_order|         City|Time_taken (min)|Delivery_person_ID| Type_of_vehicle|Order_Date|multiple_deliveries|Weather_conditions|Festival|
+------+-------------------+-------------------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+----------------

In [None]:
zomato_delivery_operational_analytics_df_modified.select([count(when(isnull(col), col)).alias(col) for col in zomato_delivery_operational_analytics_df_modified.columns]).show()

+---+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+-------------+-------------+----+----------------+------------------+---------------+----------+-------------------+------------------+--------+
| ID|Time_Order_picked|Time_Orderd|Restaurant_latitude|Restaurant_longitude|Delivery_location_longitude|Delivery_person_Ratings|Vehicle_condition|Delivery_person_Age|Delivery_location_latitude|Road_traffic_density|unique_row_id|Type_of_order|City|Time_taken (min)|Delivery_person_ID|Type_of_vehicle|Order_Date|multiple_deliveries|Weather_conditions|Festival|
+---+-----------------+-----------+-------------------+--------------------+---------------------------+-----------------------+-----------------+-------------------+--------------------------+--------------------+-------------+-------------+----+----------------+------------------

In [None]:
tempDF = zomato_delivery_operational_analytics_df_modified
original = tempDF.count()
remaining = tempDF.dropna(subset=['Time_Orderd', 'Time_Order_picked', 'multiple_deliveries', 'Delivery_person_Age']).count()

print(f'Loss by dropping: {(original-remaining)*100/original}')   # so drop

Loss by dropping: 7.53948190226928


In [None]:
zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified.dropna(subset=['Time_Orderd', 'Time_Order_picked', 'multiple_deliveries', 'Delivery_person_Age'])

In [None]:
# Split into month, year, day of week from Order_Date column

zomato_delivery_operational_analytics_df_modified = zomato_delivery_operational_analytics_df_modified \
    .withColumn('Year_ID', year(col('Order_Date'))).withColumn('Month_ID', month(col('Order_Date'))) \
    .withColumn('Day_of_Week', dayofweek(col('Order_Date'))) \
    .withColumn('Week_of_Year', weekofyear(col('Order_Date'))) \
    .withColumn('QTR_ID', quarter(col('Order_Date')))

In [None]:
### handling issues in zomato_dataset_metropolitan_df_modified
zomato_dataset_metropolitan_df_modified.select([count(when(isnull(col), col)).alias(col) for col in zomato_dataset_metropolitan_df_modified.columns]).show()

+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+
|Delivery_Rating|Item_Name|Dining_Rating|Cuisine|Place_Name|Votes|Prices|Dining_Votes|Delivery_Votes|unique_row_id|City|Restaurant_Name|Best_Seller|
+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+
|           1280|        0|        32236|      0|         0|    0|     0|           0|             0|            0|   0|              0|      95709|
+---------------+---------+-------------+-------+----------+-----+------+------------+--------------+-------------+----+---------------+-----------+



In [None]:
# Compute the average ratings
avg_delivery_rating = zomato_dataset_metropolitan_df_modified.select(mean(col("Delivery_Rating"))).first()[0]
avg_dining_rating = zomato_dataset_metropolitan_df_modified.select(mean(col("Dining_Rating"))).first()[0]

# Fill NA with average ratings
zomato_dataset_metropolitan_df_modified = zomato_dataset_metropolitan_df_modified.fillna({"Delivery_Rating": avg_delivery_rating, "Dining_Rating": avg_dining_rating})

In [None]:
# write to blob as parquet [or SQL]

# Storage account info
storage_account_name = "capstonestoragezomato"
storage_account_key = ""
container_name = "cleaned-data"

# configuration
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)



# path at which operational_analytics data will be saved
blob_path_operationalAnalytics = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/operational_analytics"
# writing to blob storage
zomato_delivery_operational_analytics_df_modified.write.mode("overwrite").parquet(blob_path_operationalAnalytics)


# path at which metropolitan_data will be saved
blob_path_metropolitanData = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/metropolitan_data"
# writing to blob storage
zomato_dataset_metropolitan_df_modified.write.mode("overwrite").parquet(blob_path_metropolitanData)

In [None]:
print((zomato_delivery_operational_analytics_df.count(), len(zomato_delivery_operational_analytics_df.columns)))
print((zomato_delivery_operational_analytics_df_modified.count(), len(zomato_delivery_operational_analytics_df_modified.columns)))

# reason for drop is majorly because of the exclusion of Urban data

(44983, 21)
(31088, 26)


In [None]:
print((zomato_dataset_metropolitan_df.count(), len(zomato_dataset_metropolitan_df.columns)))
print((zomato_dataset_metropolitan_df_modified.count(), len(zomato_dataset_metropolitan_df_modified.columns)))

(123651, 13)
(123651, 13)
