In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NewSparkSession") \
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()


In [0]:
spark

In [0]:
account_endpoint = "https://azurecapstonecosmos.documents.azure.com:443/"
account_key = "azure_account_key=="

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#### Data Loading

In [0]:
rest_schema = StructType([
    StructField("ID", StringType(), nullable=False),
    StructField("Restaurant_latitude", DoubleType(), nullable=True),
    StructField("Restaurant_longitude", DoubleType(), nullable=True),
    StructField("Order_Date", StringType(), nullable=True),
    StructField("Time_Orderd", StringType(), nullable=True),
    StructField("Time_Order_picked", StringType(), nullable=True),
    StructField("Type_of_order", StringType(), nullable=True),
    StructField("City", StringType(), nullable=True)
])

In [0]:
delevery_schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Delivery_person_ID", StringType(), True),
    StructField("Delivery_person_Age", IntegerType(), True),
    StructField("Delivery_person_Ratings", DoubleType(), True),
    StructField("Delivery_location_latitude", DoubleType(), True),
    StructField("Delivery_location_longitude", DoubleType(), True),
    StructField("Weather_conditions", StringType(), True),
    StructField("Road_traffic_density", StringType(), True),
    StructField("Vehicle_condition", IntegerType(), True),
    StructField("Type_of_vehicle", StringType(), True),
    StructField("multiple_deliveries", IntegerType(), True),
    StructField("Festival", StringType(), True),
    StructField("Time_taken (min)", IntegerType(), True)
])


In [0]:

restaurant_schema = StructType([
    StructField("Restaurant Name", StringType(), True),
    StructField("Dining Rating", FloatType(), True),
    StructField("Delivery Rating", FloatType(), True),
    StructField("Dining Votes", IntegerType(), True),
    StructField("Delivery Votes", IntegerType(), True),
    StructField("Cuisine ", StringType(), True),
    StructField("Place Name", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Item Name", StringType(), True),
    StructField("Best Seller", StringType(), True),
    StructField("Votes", IntegerType(), True),
    StructField("Prices", FloatType(), True)
])


In [0]:
rest_df = spark.read.format("cosmos.oltp")\
    .option("spark.cosmos.accountEndpoint", account_endpoint) \
    .option("spark.cosmos.accountKey", account_key) \
    .option("spark.cosmos.database", "Zomato") \
    .option("spark.cosmos.container", "RestaurantDelevery") \
    .schema(rest_schema) \
    .load()

In [0]:
restaurant_df = spark.read.format("cosmos.oltp")\
    .option("spark.cosmos.accountEndpoint", account_endpoint) \
    .option("spark.cosmos.accountKey", account_key) \
    .option("spark.cosmos.database", "Zomato") \
    .option("spark.cosmos.container", "Restaurant") \
    .schema(restaurant_schema) \
    .load()


In [0]:
delevery_part_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", account_endpoint) \
    .option("spark.cosmos.accountKey", account_key) \
    .option("spark.cosmos.database", "Zomato") \
    .option("spark.cosmos.container", "DeleveryPartners") \
    .schema(delevery_schema) \
    .load()


#### Shape of the data

In [0]:
print("row : ",rest_df.count()," columns : " ,len(rest_df.columns))

row :  59188  columns :  8


In [0]:
print("row : ",delevery_part_df.count()," columns : " ,len(delevery_part_df.columns))

row :  55045  columns :  13


In [0]:
print("row : ",restaurant_df.count()," columns : " ,len(restaurant_df.columns))

row :  139783  columns :  12


#### Caching the dataframes

In [0]:
rest_df = rest_df.cache()
delevery_part_df = delevery_part_df.cache()
restaurant_df = restaurant_df.cache()

#### Removing duplicates

In [0]:
rest_df = rest_df.dropDuplicates()
delevery_part_df = delevery_part_df.dropDuplicates()
restaurant_df = restaurant_df.dropDuplicates()

In [0]:
delevery_part_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Delivery_person_ID: string (nullable = true)
 |-- Delivery_person_Age: integer (nullable = true)
 |-- Delivery_person_Ratings: double (nullable = true)
 |-- Delivery_location_latitude: double (nullable = true)
 |-- Delivery_location_longitude: double (nullable = true)
 |-- Weather_conditions: string (nullable = true)
 |-- Road_traffic_density: string (nullable = true)
 |-- Vehicle_condition: integer (nullable = true)
 |-- Type_of_vehicle: string (nullable = true)
 |-- multiple_deliveries: integer (nullable = true)
 |-- Festival: string (nullable = true)
 |-- Time_taken (min): integer (nullable = true)



#### Joining data to form delevery_df

In [0]:
delevery_df = rest_df.join(delevery_part_df, on='ID', how='inner')

In [0]:
# delevery_df = delevery_df.filter((col('Delivery_location_latitude') > 1) & (col('Delivery_location_longitude') > 1) & (col('Restaurant_latitude') > 1)& (col('Restaurant_longitude') > 1))

#### Calculating approximate distance from the latitude and longitude

In [0]:
!pip install geopy

In [0]:
from geopy.distance import geodesic
# Simplified version of haversine formula.
def spherical_distance(lat1, lon1, lat2, lon2):
    location1 = (lat1, lon1)
    location2 = (lat2, lon2)
    return geodesic(location1, location2).kilometers

spherical_distance_udf = udf(spherical_distance, DoubleType())


In [0]:
delevery_df = delevery_df.withColumn('approx_distance', spherical_distance_udf(
    'Delivery_location_latitude',
    'Delivery_location_longitude',
    'Restaurant_latitude',
    'Restaurant_longitude'
))

In [0]:
# delevery_df.select('approx_distance').show()

+---------------+
|approx_distance|
+---------------+
|      3.0985498|
|      1.4880947|
|       9.041264|
|       9.074471|
|       3.035833|
|       3.059118|
|       6.136051|
|      1.5690347|
|       16.63525|
|       4.580299|
|       9.162284|
|       4.537508|
|      7.5510554|
|      16.694595|
|      13.592609|
|       7.647483|
|      3.0145607|
|      1.5549024|
|      13.661188|
|       6.118387|
+---------------+
only showing top 20 rows



#### Filtering only metropolitan values

In [0]:
delevery_df = delevery_df.filter(col('City') == 'Metropolitian')

In [0]:
delevery_df = delevery_df.cache()

In [0]:
# from geopy.geocoders import Nominatim

# def get_city(lat, lon):
#     try:
#         geolocator = Nominatim(user_agent="my_unique_app")
#         location = geolocator.reverse((lat, lon), exactly_one=True)
#         if location:
#             return location.raw.get('address', {}).get('city', None)
#         return None
#     except Exception as e:
#         print(f"Error: {e}")
#         return None


In [0]:
# udf_city = udf(lambda lat, lon: get_city(lat, lon), StringType())

# delevery_df = delevery_df.withColumn('CityName', udf_city(delevery_df.Restaurant_latitude, delevery_df.Restaurant_longitude))

In [0]:
# delevery_df.select('CityName').distinct().show()

In [0]:
# delevery_df.count()

In [0]:
delevery_df.printSchema()

root
 |-- ID: string (nullable = false)
 |-- Restaurant_latitude: double (nullable = true)
 |-- Restaurant_longitude: double (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Time_Orderd: string (nullable = true)
 |-- Time_Order_picked: string (nullable = true)
 |-- Type_of_order: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Delivery_person_ID: string (nullable = true)
 |-- Delivery_person_Age: integer (nullable = true)
 |-- Delivery_person_Ratings: double (nullable = true)
 |-- Delivery_location_latitude: double (nullable = true)
 |-- Delivery_location_longitude: double (nullable = true)
 |-- Weather_conditions: string (nullable = true)
 |-- Road_traffic_density: string (nullable = true)
 |-- Vehicle_condition: integer (nullable = true)
 |-- Type_of_vehicle: string (nullable = true)
 |-- multiple_deliveries: integer (nullable = true)
 |-- Festival: string (nullable = true)
 |-- Time_taken (min): integer (nullable = true)
 |-- approx_distance: double

#### Checking for null values

In [0]:
for column in delevery_df.columns:
    print(f"{column} : {delevery_df.select(column).filter(isnull(col(column))).count()}")

ID : 0
Restaurant_latitude : 0
Restaurant_longitude : 0
Order_Date : 0
Time_Orderd : 1291
Time_Order_picked : 0
Type_of_order : 0
City : 0
Delivery_person_ID : 0
Delivery_person_Age : 0
Delivery_person_Ratings : 1437
Delivery_location_latitude : 0
Delivery_location_longitude : 0
Weather_conditions : 476
Road_traffic_density : 464
Vehicle_condition : 0
Type_of_vehicle : 0
multiple_deliveries : 0
Festival : 111
Time_taken (min) : 0
approx_distance : 0


In [0]:
for column in restaurant_df.columns:
    print(f"{column} : {restaurant_df.select(column).filter(col(column).isNull()).count()}")

Restaurant Name : 0
Dining Rating : 26958
Delivery Rating : 1244
Dining Votes : 0
Delivery Votes : 0
Cuisine  : 0
Place Name : 0
City : 0
Item Name : 0
Best Seller : 0
Votes : 0
Prices : 0


In [0]:
delevery_df.select([count(when(isnan(col(column)), column)).alias(column) for column in delevery_df.columns]).show()

In [0]:
restaurant_df.select([count(when(isnan(col(column)), column)).alias(column) for column in restaurant_df.columns]).show()

+---------------+-------------+---------------+------------+--------------+--------+----------+----+---------+-----------+-----+------+
|Restaurant Name|Dining Rating|Delivery Rating|Dining Votes|Delivery Votes|Cuisine |Place Name|City|Item Name|Best Seller|Votes|Prices|
+---------------+-------------+---------------+------------+--------------+--------+----------+----+---------+-----------+-----+------+
|              0|            0|              0|           0|             0|       0|         0|   0|        1|          0|    0|     0|
+---------------+-------------+---------------+------------+--------------+--------+----------+----+---------+-----------+-----+------+



#### Replacing the null values by category method

In [0]:
rate_dict = {row['Restaurant Name']: row['Delivery Rating'] for row in restaurant_df.groupBy("Restaurant Name").agg(mean("Delivery Rating").alias("Delivery Rating")).collect()}

def rate_select(restaurant_name):
    return rate_dict.get(restaurant_name, None)

In [0]:
rate_udf = udf(rate_select, DoubleType())

restaurant_df = restaurant_df.withColumn(
    "Delivery Rating",
    when(col("Delivery Rating").isNull(), rate_udf(col("Restaurant Name")))
    .otherwise(col("Delivery Rating"))
)

In [0]:
rate_dict = {row['Type_of_order']: row['Delivery_person_Ratings'] for row in delevery_df.groupBy("Type_of_order").agg(mode("Delivery_person_Ratings").alias("Delivery_person_Ratings")).collect()}

def rate_select(order_type):
    return rate_dict.get(order_type, None)

In [0]:
rate_udf = udf(rate_select, DoubleType())

delevery_df = delevery_df.withColumn(
    "Delivery_person_Ratings",
    when(isnan(col('Delivery_person_Ratings')), rate_udf(col("Type_of_order")))\
    .otherwise(col("Delivery_person_Ratings"))
)

In [0]:
age_dict = {row['Type_of_order']: row['Delivery_person_Age'] for row in delevery_df.groupBy("Type_of_order").agg(floor(avg("Delivery_person_Age")).alias("Delivery_person_Age")).collect()}

def age_select(order_type):
    return age_dict.get(order_type, None)


In [0]:
age_udf = udf(age_select, IntegerType())

delevery_df = delevery_df.withColumn(
    "Delivery_person_Age",
    when(isnan(col('Delivery_person_Age')) | isnull(col('Delivery_person_Age')) , age_udf(col("Type_of_order")))\
    .otherwise(col("Delivery_person_Age"))
)

In [0]:
delevery_df.filter(isnull(col('Delivery_person_Age'))).count()

Out[39]: 0

#### Replacing NaN 

In [0]:
mode_multiple_dele = delevery_df.agg(mode('multiple_deliveries')).collect()[0][0]
mode_mtraffic = delevery_df.agg(mode('Road_traffic_density')).collect()[0][0]
avg_delevery_rate = delevery_df.select('Delivery_person_Ratings').agg(mean('Delivery_person_Ratings')).collect()[0][0]

In [0]:
mode_festival = delevery_df.agg(mode('Festival')).collect()[0][0]
delevery_df = delevery_df.withColumn('Festival', when(isnan(col('Festival')), mode_festival).otherwise(col('Festival')))
delevery_df = delevery_df.withColumn('Road_traffic_density', when(isnan(col('Road_traffic_density')), mode_mtraffic).otherwise(col('Road_traffic_density')))
delevery_df = delevery_df.withColumn('multiple_deliveries', when(isnan(col('multiple_deliveries')), mode_mtraffic).otherwise(col('multiple_deliveries')))

In [0]:
avg_rating = delevery_df.agg(avg('Delivery_person_Ratings')).collect()[0][0]
mode_traffic_density = delevery_df.agg(mode('Road_traffic_density')).collect()[0][0]

In [0]:
delevery_df = delevery_df.withColumn('Delivery_person_Ratings', 
                                     when(isnan(col('Delivery_person_Ratings')) | col('Delivery_person_Ratings').isNull(), avg_rating)
                                     .otherwise(col('Delivery_person_Ratings')))

In [0]:
avg_dining_rating  = restaurant_df.select('Dining Rating').agg(mean('Dining Rating')).collect()[0][0]

In [0]:
restaurant_df = restaurant_df.fillna({'Dining Rating': avg_dining_rating})

#### Converting the Decimal time to hh:mm

In [0]:
def decimal_to_time(value):
    try:
        float_value = float(value)
        if float_value > 1:
            float_value = float_value % 1
        
        hours = int(float_value * 24)
        minutes = int((float_value * 24 * 60) % 60)
        
        return f"{hours:02d}:{minutes:02d}"
    
    except (ValueError, TypeError):
        return value

decimal_to_time_udf = udf(decimal_to_time, StringType())

In [0]:
delevery_df = delevery_df.withColumn(
    "Time_Orderd",
    when(col("Time_Orderd") == "NaN", None)
    .when(col("Time_Orderd").rlike("^\\d+\\.\\d+$"), decimal_to_time_udf(col("Time_Orderd")))
    .otherwise(col("Time_Orderd"))
)

In [0]:
delevery_df = delevery_df.withColumn(
    "Time_Order_picked",
    when(col("Time_Order_picked") == "NaN", None)
    .when(col("Time_Order_picked").rlike("^\\d+\\.\\d+$"), decimal_to_time_udf(col("Time_Order_picked")))
    .otherwise(col("Time_Order_picked"))
)

#### Type Conversion of Date and Time

In [0]:
delevery_df = delevery_df.withColumn('Order_Date', to_date(col('Order_Date'), 'dd-MM-yyyy'))

In [0]:
delevery_df = delevery_df.withColumn("Time_Orderd", date_format(to_timestamp(col("Time_Orderd"), 'HH:mm'), 'HH:mm').alias("Time_Orderd"))

#### Splitting The Order date and Time Ordered to new columns

In [0]:
delevery_df = delevery_df.withColumn('Order_hour', hour('Time_Orderd'))
delevery_df = delevery_df.withColumn('Order_day', dayofmonth('Order_Date'))
delevery_df = delevery_df.withColumn('Order_month', month('Order_Date'))
delevery_df = delevery_df.withColumn('Order_Week', dayofweek('Order_Date'))

In [0]:
delevery_df = delevery_df.filter((col('Time_Orderd').isNotNull()))

In [0]:
delevery_df.printSchema()

root
 |-- ID: string (nullable = false)
 |-- Restaurant_latitude: double (nullable = true)
 |-- Restaurant_longitude: double (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Time_Orderd: string (nullable = true)
 |-- Time_Order_picked: string (nullable = true)
 |-- Type_of_order: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Delivery_person_ID: string (nullable = true)
 |-- Delivery_person_Age: double (nullable = true)
 |-- Delivery_person_Ratings: double (nullable = true)
 |-- Delivery_location_latitude: double (nullable = true)
 |-- Delivery_location_longitude: double (nullable = true)
 |-- Weather_conditions: string (nullable = true)
 |-- Road_traffic_density: string (nullable = true)
 |-- Vehicle_condition: integer (nullable = true)
 |-- Type_of_vehicle: string (nullable = true)
 |-- multiple_deliveries: integer (nullable = true)
 |-- Festival: string (nullable = true)
 |-- Time_taken (min): integer (nullable = true)
 |-- approx_distance: float (nu

In [0]:
delevery_df.count()

Out[53]: 32497

In [0]:
restaurant_df.count()

Out[54]: 101524

In [0]:
delevery_df = delevery_df.withColumnRenamed('Time_taken (min)', 'Time_taken')

#### Removing the space in Column names to avoid issues in the SQL

In [0]:
for field in restaurant_df.schema.fields:
    restaurant_df = restaurant_df.withColumnRenamed(field.name, field.name.replace(" ", "_"))

#### Writing the file to the dbfs storage

In [0]:
write_delevery_df = delevery_df.repartition(1)

dbfs_path = "/Volumes/databricks28/default/delivery"

write_delevery_df.write.mode("overwrite").parquet(dbfs_path)

In [0]:
write_restaurant_df = restaurant_df.repartition(1)

dbfs_path = "/Volumes/databricks28/default/restaurant"

write_restaurant_df.write.mode("overwrite").parquet(dbfs_path)


In [0]:
dbutils.fs.ls("/Volumes/databricks28/default/restaurant")

Out[64]: [FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1727950519000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/_committed_3957586234054945592', name='_committed_3957586234054945592', size=125, modificationTime=1727948937000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/_committed_8370579358370310129', name='_committed_8370579358370310129', size=237, modificationTime=1727950519000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/_started_3957586234054945592', name='_started_3957586234054945592', size=0, modificationTime=1727948937000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/_started_8370579358370310129', name='_started_8370579358370310129', size=0, modificationTime=1727950518000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/restaurant/part-00000-tid-8370579358370310129-c7059c2a-598b-46dd-94a4-709f202ae419-14283-1.c000.snappy.parquet', 

In [0]:
dbutils.fs.ls("/Volumes/databricks28/default/delivery")

Out[63]: [FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1727950516000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_committed_327163005425335967', name='_committed_327163005425335967', size=236, modificationTime=1727950516000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_committed_9172083225396536082', name='_committed_9172083225396536082', size=125, modificationTime=1727948917000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_started_2183175902330022446', name='_started_2183175902330022446', size=0, modificationTime=1727948832000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_started_327163005425335967', name='_started_327163005425335967', size=0, modificationTime=1727950516000),
 FileInfo(path='dbfs:/Volumes/databricks28/default/delivery/_started_9172083225396536082', name='_started_9172083225396536082', size=0, modificationTime=1727948916000),
 FileInfo

In [0]:

# storage_account_name = "azurestoragecapstone"
# storage_account_access_key = "Agd7/xTV4lt0HyqFldJaXwKiq8kZC088kiGs+GWhhc/ZtAbPlbWabDFLrftHra3OeTOovCVjSXu7+AStC5rulA=="
# container_name = "intermediate"
# delivery_path = ""

# spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key)

# df.write.format("parquet") \
#     .mode("overwrite") \
#     .save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{delivery_path}")
