In [0]:
%pip uninstall -y databricks_helpers 
%pip install git+https://github.com/data-derp/databricks_helpers#egg=databricks_helpers 

In [0]:
exercise_name = "final_day_presentation"

In [0]:
from databricks_helpers.databricks_helpers import DataDerpDatabricksHelpers

helpers = DataDerpDatabricksHelpers(dbutils, exercise_name)

current_user = helpers.current_user()
working_directory = helpers.working_directory()

print(f"Your current working directory is: {working_directory}")

In [0]:
# Read raw file from Gold

gold_layer_path = working_directory + "/silver"

gold_df = spark\
    .read\
    .parquet(gold_layer_path)

print(f"Schema of the raw DataFrame:")
gold_df.printSchema()
display(gold_df)


### 1. How do restaurant ratings and popularity (number of ratings) vary across different cities and localities?

In [0]:
from pyspark.sql.functions import regexp_replace, col, avg, sum, count, round, min, max
import plotly.express as px

def analyze_and_plot_city_performance(input_df):

    # --- Calculations (Aggregation) ---
    print(" Performing city-level aggregation...")
    city_analysis = input_df.groupBy("City").agg(
        round(avg("Avg_rating"), 2).alias("avg_city_rating"),
        round(avg("Total_ratings")).alias("avg_city_popularity"),
        sum("Total_ratings").alias("total_city_popularity"),
        count("*").alias("total_restaurants_in_city")
    )

    # ---  Display Results in Notebook ---
    city_analysis_by_rating = city_analysis.orderBy(col("avg_city_rating").desc())
    city_analysis_by_popularity = city_analysis.orderBy(col("avg_city_popularity").desc())

     # Find the min and max for each column to be used in the score
    min_max_values = city_analysis.agg(
        min("avg_city_rating").alias("min_rating"),
        max("avg_city_rating").alias("max_rating"),
        min("avg_city_popularity").alias("min_popularity"),
        max("avg_city_popularity").alias("max_popularity")
    ).first()

    min_rating = min_max_values["min_rating"]
    max_rating = min_max_values["max_rating"]
    min_popularity = min_max_values["min_popularity"]
    max_popularity = min_max_values["max_popularity"]

    # Normalize the columns to a 0-1 scale
    df_normalized = city_analysis.withColumn("rating_score",round((col("avg_city_rating") - min_rating) / (max_rating - min_rating), 3)).withColumn(
        "popularity_score",round((col("avg_city_popularity") - min_popularity) / (max_popularity - min_popularity), 3))

    # Define weights and calculate the final score
    # Let's say rating is slightly more important than popularity
    weight_rating = 0.6
    weight_popularity = 0.4

    df_with_score = df_normalized.withColumn("city_score",round((col("rating_score") * weight_rating) + (col("popularity_score") * weight_popularity), 3))

    # --- Display the final results ---
    df_with_score.select("city", "avg_city_rating", "avg_city_popularity","rating_score", "popularity_score", "city_score", "total_restaurants_in_city").orderBy(col("city_score").desc()).show()

    # --- Plotting ---
    # Convert the aggregated Spark DataFrame to a Pandas DataFrame for plotting
    pandas_df = df_with_score.toPandas()

    # Sort cities by the overall score for a more insightful plot
    pandas_df_sorted = pandas_df.sort_values("city_score", ascending=False)

    # Reshape the data from wide to long format for Plotly Express
    df_melted = pandas_df.melt(
        id_vars=['City', 'city_score'],
        value_vars=['rating_score', 'popularity_score'],
        var_name='metric_type',
        value_name='normalized_score'
    )

    # Create the grouped bar chart
    fig = px.bar(
        df_melted,
        x="City",
        y="normalized_score",
        color="metric_type",  # Creates the groups for rating vs. popularity
        barmode="group",
        title="Rating vs. Popularity Score by City",
        labels={
            "normalized_score": " Score (0 to 1)",
            "City": "City",
            "metric_type": "Metric"
        },
        height=600  # Adjust height for better readability
    )

    fig.show()

    return city_analysis_by_rating, city_analysis_by_popularity, df_with_score

city_restaurant_analysis = analyze_and_plot_city_performance(gold_df)
display(city_restaurant_analysis)


In [0]:
output_dir = working_directory + "/gold"

city_restaurant_analysis[0].write.mode("overwrite").parquet(output_dir)

### What is the market share and performance of vegetarian vs. non-vegetarian restaurants in key areas?

In [0]:
from pyspark.sql.functions import col, count, sum, round, avg, when
from pyspark.sql.window import Window
import plotly.express as px

def analyze_veg_nonveg_performance(input_df):

    # Filter out rows where the 'Vegetarian' column is null
    df_filtered = input_df.filter(col("Vegetarian").isNotNull())

    df_filtered = df_filtered.withColumn("Vegetarian",when(col("Vegetarian") == True, "Veg").otherwise("Non-Veg"))

    # --- 1. Market Share Calculation ---
    veg_counts_city = df_filtered.groupBy("City", "Vegetarian").agg(count("*").alias("restaurant_count"))
    city_window = Window.partitionBy("City")
    city_market_share_df = veg_counts_city.withColumn("total_restaurants_in_city",sum("restaurant_count").over(city_window)).withColumn(
        "market_share_pct",round((col("restaurant_count") / col("total_restaurants_in_city")) * 100, 2))

    # --- 2. Performance Calculation ---
    city_performance_df = df_filtered.groupBy("City", "Vegetarian").agg(
        round(avg("Avg_rating"), 2).alias("avg_rating"),
        round(avg("Total_ratings")).alias("avg_popularity_score")
    )

    # First, convert the Spark DataFrame to a Pandas DataFrame
    pandas_market_share = city_market_share_df.toPandas()
    pandas_restaurant_performance = city_performance_df.toPandas()

    # Create the grouped bar chart
    market_share = px.bar(
        pandas_market_share,
        x="City",
        y="market_share_pct",
        color="Vegetarian",  # This creates the "Veg" vs "Non-Veg" groups
        barmode="group",
        title="Market Share of Veg vs. Non-Veg Restaurants by City",
        labels={"market_share_pct": "Market Share (%)", "City": "City", "Vegetarian": "Restaurant Type"}
    )

    market_share.show()

    # # First, convert the performance Spark DataFrame to a Pandas DataFrame
    # pandas_performance = performance_results.toPandas()

    # Create the grouped bar chart for average rating
    restaurant_performance = px.bar(
        pandas_restaurant_performance,
        x="City",
        y="avg_rating",
        color="Vegetarian",
        barmode="group",
        title="Performance (Average Rating) of Veg vs. Non-Veg Restaurants by City",
        labels={"avg_rating": "Average Rating", "City": "City", "Vegetarian": "Restaurant Type"},
        color_discrete_sequence=px.colors.qualitative.Pastel, # Or D3, Plotly, Light, T10
        template="plotly_white"
    )

    restaurant_performance.show()

    return city_market_share_df, city_performance_df

# --- How to use the function ---
# Assuming 'swiggy_df_transformed' is your cleaned input DataFrame
market_share_results, performance_results = analyze_veg_nonveg_performance(gold_df)

In [0]:
output_dir = working_directory + "/gold"

market_share_results.write.mode("overwrite").parquet(output_dir)
performance_results.write.mode("overwrite").parquet(output_dir)

### To determine Top 5 restaurants among all the cities.

In [0]:
from pyspark.sql.functions import col, when, row_number
from pyspark.sql import Window
from pyspark.sql import DataFrame

#Display Top five restaurants
def top_five_res(gold_df: DataFrame) -> DataFrame:

    window_spec = Window.partitionBy("City") \
                    .orderBy(col("Avg_rating").desc(), col("Total_ratings").desc()) \

    ranked_restaurants_df = gold_df.withColumn("rank", row_number().over(window_spec))\
                            .select("Name", "Area", "City", "Avg_rating", "Total_ratings", "Cuisine", "Cost_for_two", "Vegetarian")
                            

    top_5_per_city = ranked_restaurants_df.filter(col("rank") <= 5)
    return top_5_per_city


top_five_restaurants = top_five_res(gold_df)
display(top_five_restaurants)


In [0]:
import plotly.express as px
import pandas as pd
from pyspark.sql import DataFrame 

def display_top_five(top_five_restaurants: DataFrame) -> None:
    # 1. Convert your PySpark DataFrame to a Pandas DataFrame
    # This part assumes 'top_five_restaurants' is your correctly filtered PySpark DataFrame
    top_restaurants_pd = top_five_restaurants.toPandas()

    # 2. Create the Vertical Faceted Dot Plot with text labels
    fig = px.scatter(top_restaurants_pd,
                 x="Avg_rating",
                 y="Name",
                 facet_row="City",
                 color="Avg_rating",
                 hover_data=["Total_ratings", "Cuisine", "Cost_for_two"],
                 color_continuous_scale='Plasma',
                 facet_row_spacing=0.03,
                 text="Avg_rating"
                )
    # --- Style Customizations ---
    # Style the text labels and the markers
    fig.update_traces(
        marker=dict(size=16, line=dict(width=2, color='DarkSlateGrey')),
        textfont=dict(family="Arial", size=12, color='DarkSlateGrey'),
        textposition='middle right'
    )


    # Adjust graph size for a vertical layout and increase font sizes
    fig.update_layout(
        title="<b>Top 5 Restaurant Ratings by City</b>",
        title_font=dict(size=28, family="Arial, bold"),
        width=1400,
        height=2000,
        xaxis_title="<b>Average Rating</b>",
        template="plotly_white",
        yaxis=dict(tickfont=dict(size=14)),
        xaxis=dict(tickfont=dict(size=12))
    )

    # --- THIS IS THE CORRECTED LOGIC ---
    # Set the y-axis titles of each subplot to be the correct city name

    # First, ensure the y-axes can have their own titles and labels
    fig.update_yaxes(matches=None, showticklabels=True)

    # Loop through the annotations Plotly creates for each subplot
    for i, annotation in enumerate(fig.layout.annotations):
        # Get the city name from the annotation text (e.g., from "City=Delhi")
        city_name = annotation.text.split('=')[-1]
        
        # Set the y-axis title for the corresponding subplot
        # The yaxis numbering starts from 1
        fig.layout[f'yaxis{i+1}'].title = f'<b>{city_name}</b>'
        fig.layout[f'yaxis{i+1}'].title.font = dict(size=18)
        
        # Clear the original, right-aligned annotation text
        annotation.text = ""

    fig.show()

display_top_five(top_five_restaurants)


In [0]:
output_dir = working_directory + "/gold"

top_five_restaurants.write.mode("overwrite").parquet(output_dir)

### To build a profile of what a "successful" restaurant looks like, guiding new business strategies.

In [0]:
from pyspark.sql.functions import col, when, count

# Create the 'success_score' column using a series of conditions.
def success_score(gold_df: DataFrame) -> DataFrame:
    success_profile_df = gold_df.withColumn("Success_Score",
        when(
            (col("Avg_rating") >= 4.5) &
            (col("Total_ratings") >= 500) &
            (col("Delivery_time") < 30),
            "Effective"
        ).when(
            (col("Avg_rating") >= 4.0) & (col("Avg_rating") < 4.5) &
            (col("Total_ratings") >= 100) & (col("Total_ratings") < 500) &
            (col("Delivery_time") >= 30) & (col("Delivery_time") < 40),
            "Efficient"
        ).otherwise("Relevant")
    )
    return success_profile_df

success_profile_df = success_score(gold_df).select("Name", "Area", "City", "Success_Score") \
                     .withColumnRenamed("Success_Score", "Success Score")
success_profile_df.groupBy("Success Score") \
    .count() \
    .withColumnRenamed("count", "Restaurant Count") \
    .show()

success_profile_df.printSchema()
display(success_profile_df)


In [0]:
import plotly.express as px
import pandas as pd

from pyspark.sql import DataFrame 

def display_success_score(top_five_restaurants: DataFrame) -> None:

    # 1. Convert the PySpark DataFrame to Pandas
    success_pd = success_profile_df.toPandas()

    # 2. Create a cross-tabulation to get the counts of each score per city
    city_counts = pd.crosstab(success_pd['City'], success_pd['Success Score'])

    # 3. Normalize the counts by row to get the percentage (ratio) for each city
    city_ratios = city_counts.div(city_counts.sum(axis=1), axis=0) * 100

    # 4. Create the Heatmap
    fig = px.imshow(city_ratios,
                    text_auto='.0f',  # Automatically format text labels as integers
                    aspect="auto",
                    labels=dict(x="Success Score", y="City", color="Percentage (%)"),
                    color_continuous_scale=px.colors.sequential.GnBu, # A vibrant, modern color scale
                    title="<b>Heatmap of Success Score Ratios by City</b>"
                )

    # --- Style Customizations ---
    fig.update_layout(
        title_font=dict(size=24, family="Arial, bold"),
        xaxis_title="<b>Success Score Category</b>",
        yaxis_title="<b>City</b>"
    )
    fig.update_xaxes(side="top") # Move x-axis labels to the top for a cleaner look

    fig.show()
    
display_success_score(top_five_restaurants)



In [0]:
output_dir = working_directory + "/gold"

success_profile_df.write.mode("overwrite").parquet(output_dir)

## Relationship between Delivery time and Average Rating
### Does delivery time have a significant impact on a restaurant's average rating?

In [0]:
delivery_time_df = gold_df\
    .filter("avg_rating IS NOT NULL")\
    .groupBy("Delivery_time")\
    .agg({"Avg_rating": "avg"})\
    .toPandas() 

delivery_time_df = delivery_time_df.sort_values("Delivery_time")

#display(delivery_time_df)

fig = px.line(delivery_time_df, x="Delivery_time", y="avg(Avg_rating)")

fig.show()

avg_rating_df = gold_df\
    .filter("avg_rating IS NOT NULL")\
    .groupBy("Avg_rating")\
    .agg({"Delivery_time": "avg"})\
    .toPandas() 
avg_rating_df = avg_rating_df.sort_values("Avg_rating")

#display(avg_rating_df)

avg_rating_fig = px.line(avg_rating_df, x="Avg_rating", y="avg(Delivery_time)")

avg_rating_fig.show()
     

## Most Popular & Underserverd Cuisines

In [0]:
from pyspark.sql.functions import explode, split, log, ln, length, avg, row_number, col
from pyspark.sql.window import Window
import plotly.express as px
from pyspark.sql import SparkSession # Assuming 'spark' is passed in or accessible

def weighted_average(cuisines_df):
    return cuisines_df.withColumn(
        'Weighted_Rating', 
        col('Avg_rating') + ln(col('Total_ratings')) / 20
    )

def find_top_cuisine(cuisines_df, window_spec):
    return cuisines_df.filter('Avg_rating IS NOT NULL')\
        .groupBy("City", "Cuisine")\
        .agg(avg("Weighted_Rating").alias("avg_Weighted_Rating"))\
        .withColumn("rank", row_number().over(window_spec))\
        .filter("rank = 1")

def analyze_cuisines_and_visualize(gold_df):    
    # 1. Prepare Cuisines DataFrame (Explode 'CuisineList')
    cuisines_df = gold_df.withColumn("Cuisine", explode(split("CuisineList", ",")))\
                         .select("Name", "Cuisine", "City", "Avg_rating", "Total_ratings")

    # 2. Calculate Weighted Rating and Filter
    # Weighted_Rating = Avg_rating + ln(Total_ratings)/20
    weighted_cuisine_city_df = cuisines_df.transform(weighted_average).filter(
        length('Cuisine') < 16 # Filter out junk values
    )


    # 3. Find Best Cuisines Overall
    overall_best_cuisines = weighted_cuisine_city_df.filter('Avg_rating IS NOT NULL')\
        .groupBy("Cuisine")\
        .agg(avg("Weighted_Rating").alias("avg_Weighted_Rating"))\
        .orderBy(col("avg_Weighted_Rating").desc())

    overall_best_cuisines.show()
    
    
    # 4. Find Best (Top) Cuisine Per City
    window_spec_best = Window.partitionBy("City").orderBy(col("avg_Weighted_Rating").desc())
    
    top_cuisines_df = weighted_cuisine_city_df.transform(find_top_cuisine, window_spec_best)
    
    
    # 5. Find Underserved (Low) Cuisine Per City
    window_spec_low = Window.partitionBy("City").orderBy(col("avg_Weighted_Rating").asc())
    
    low_cuisines_df = weighted_cuisine_city_df.transform(find_top_cuisine, window_spec_low)

    # 6. Prepare City Coordinates (Hardcoded for Indian Cities)
    city_coords_data = [
        ("Mumbai", 19.0760, 72.8777), ("Delhi", 28.7041, 77.1025),
        ("Bengaluru", 12.9716, 77.5946), ("Chennai", 13.0827, 80.2707),
        ("Kolkata", 22.5726, 88.3639), ("Hyderabad", 17.3850, 78.4867),
        ("Ahmedabad", 23.0225, 72.5714), ("Pune", 18.5204, 73.8567),
        ("Jaipur", 26.9124, 75.7873), ("Surat", 21.1702, 72.8311)
    ]
    city_coords = spark.createDataFrame(city_coords_data, ["City", "Latitude", "Longitude"])

    # 7. Visualization for Best Cuisines Per City
    map_df = top_cuisines_df.join(city_coords, on="City", how="inner")
    map_pd = map_df.select("City", "Cuisine", "avg_Weighted_Rating", "Latitude", "Longitude").toPandas()

    fig_best = px.scatter_mapbox(
        map_pd, lat="Latitude", lon="Longitude", color="avg_Weighted_Rating",
        hover_name="City", hover_data=["Cuisine", "avg_Weighted_Rating"],
        size="avg_Weighted_Rating", zoom=3, mapbox_style="carto-positron", title="Best Rated Cuisine in each region"
    )

    fig_best.show() 

    # 8. Visualization for Underserverd Cuisines Per City
    low_map_df = low_cuisines_df.join(city_coords, on="City", how="inner")
    low_map_pd = low_map_df.select("City", "Cuisine", "avg_Weighted_Rating", "Latitude", "Longitude").toPandas()

    fig_low = px.scatter_mapbox(
        low_map_pd, lat="Latitude", lon="Longitude", color="avg_Weighted_Rating",
        hover_name="City", hover_data=["Cuisine", "avg_Weighted_Rating"],
        size="avg_Weighted_Rating", zoom=3, mapbox_style="carto-positron", title="Least Rated Cuisine in each region"
    )

    fig_low.show() 


analyze_cuisines_and_visualize(gold_df)