## 1. Environment Setup

In this section, we:
- Import necessary libraries such as PySpark and Pandas.
- Set the Python interpreter for PySpark to work correctly in a conda environment.
- Initialize the Spark session using `SparkSession`.


In [13]:
# 1. Environment Setup

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

import os

# Set Python interpreter for Spark to use your Conda environment
os.environ["PYSPARK_PYTHON"] = r"c:\Users\162833\AppData\Local\miniconda3\envs\food_project\python.exe"

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("FoodDeliveryAnalytics") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()


## 2. Data Loading

### 🔹 zomato.csv
Load restaurant data including names, locations, ratings, and more.

### 🔹 JSON Files (file1 to file5)
Load and flatten JSON files containing restaurant data, then combine all into one Spark DataFrame.

### 🔹 Country-Code.xlsx
Load an Excel file with country codes using Pandas, then convert it to a Spark DataFrame.


In [14]:
# 2.1 Load Zomato CSV data
data_path = "C:/me/restaurant_data_pipeline/data/data1"

df_zomato = spark.read.csv(f"{data_path}/zomato.csv", header=True, inferSchema=True)


In [15]:
# 2.2 Load JSON Files
# We are loading multiple JSON files into separate DataFrames
json_files = [f"{data_path}/file{i}.json" for i in range(1, 6)]
df_json_combined = None

# Combine JSON files into one DataFrame
for file in json_files:
    df_json = spark.read.json(file)

    df_exploded = df_json.withColumn("restaurant", explode(col("restaurants.restaurant")))
  
    df_flat = df_exploded.select(
        col("restaurant.id").alias("Restaurant ID"),
        col("restaurant.name").alias("Restaurant Name"),
        col("restaurant.location.country_id").alias("Country Code"),
        col("restaurant.location.city").alias("City"),
        col("restaurant.location.address").alias("Address"),
        col("restaurant.location.locality").alias("Locality"),
        col("restaurant.location.locality_verbose").alias("Locality Verbose"),
        col("restaurant.location.longitude").alias("Longitude"),
        col("restaurant.location.latitude").alias("Latitude"),
        col("restaurant.cuisines").alias("Cuisines"),
        col("restaurant.average_cost_for_two").alias("Average Cost for two"),
        col("restaurant.currency").alias("Currency"),
        col("restaurant.has_table_booking").alias("Has Table Booking"),
        col("restaurant.has_online_delivery").alias("Has Online delivery"),
        col("restaurant.is_delivering_now").alias("Is delivering now"),
        col("restaurant.switch_to_order_menu").alias("Switch to order menu"),
        col("restaurant.price_range").alias("Price range"),
        col("restaurant.user_rating.aggregate_rating").alias("Aggregate rating"),
        col("restaurant.user_rating.rating_color").alias("Rating color"),
        col("restaurant.user_rating.rating_text").alias("Rating text"),
        col("restaurant.user_rating.votes").alias("Votes"),
    )

    # Show the first few rows of the DataFrame
    df_json_combined = df_flat if df_json_combined is None else df_json_combined.unionByName(df_flat)
    df_json_combined.show(5, truncate=False)

+-------------+--------------------------+------------+---------+-------------------------------------------------------------------------+--------------------------+-------------------------------------+-------------+-------------+----------------------------------------------------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|Restaurant Name           |Country Code|City     |Address                                                                  |Locality                  |Locality Verbose                     |Longitude    |Latitude     |Cuisines                                            |Average Cost for two|Currency|Has Table Booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+--------------------------+------------+---------+-----------------------------

In [16]:
# 2.3 Load Excel file using Pandas and convert to Spark DataFrame
import pandas as pd

df_country_code_pd = pd.read_excel(f"{data_path}/Country-Code.xlsx")
df_country_code_pd = df_country_code_pd.astype(str)  # Ensure consistent schema
df_country_code = spark.createDataFrame(df_country_code_pd)
df_country_code.show(5, truncate=False)


+------------+---------+
|Country Code|Country  |
+------------+---------+
|1           |India    |
|14          |Australia|
|30          |Brazil   |
|37          |Canada   |
|94          |Indonesia|
+------------+---------+
only showing top 5 rows



## 3. Data Merging and Schema Alignment

- Ensure both Zomato and JSON datasets follow the same column structure.
- Select and reorder columns to match across DataFrames.
- Merge all into one unified DataFrame.


In [17]:
# 3 Align columns in both DataFrames to ensure consistency before merging

columns = [
    "Restaurant ID", "Restaurant Name", "Country Code", "City", "Address", "Locality", "Locality Verbose",
    "Longitude", "Latitude", "Cuisines", "Average Cost for two", "Currency", "Has Table Booking",
    "Has Online delivery", "Is delivering now", "Switch to order menu", "Price range", "Aggregate rating",
    "Rating color", "Rating text", "Votes"
]

df_zomato = df_zomato.select(*columns)
df_json_combined = df_json_combined.select(*columns)

# 4 Merge DataFrames
df_merged = df_json_combined.unionByName(df_zomato)
df_merged.show(5, truncate=False)

+-------------+--------------------------+------------+---------+-------------------------------------------------------------------------+--------------------------+-------------------------------------+-------------+-------------+----------------------------------------------------+--------------------+--------+-----------------+-------------------+-----------------+--------------------+-----------+----------------+------------+-----------+-----+
|Restaurant ID|Restaurant Name           |Country Code|City     |Address                                                                  |Locality                  |Locality Verbose                     |Longitude    |Latitude     |Cuisines                                            |Average Cost for two|Currency|Has Table Booking|Has Online delivery|Is delivering now|Switch to order menu|Price range|Aggregate rating|Rating color|Rating text|Votes|
+-------------+--------------------------+------------+---------+-----------------------------

## 4. Data Type Conversion

Convert relevant columns to appropriate numeric types (e.g., int, double) to ensure proper analytical operations.


In [18]:
# 4. Reorder columns in both DataFrames to match

df_merged = df_merged \
    .withColumn("Restaurant ID", col("Restaurant ID").cast("int")) \
    .withColumn("Country Code", col("Country Code").cast("int")) \
    .withColumn("Longitude", col("Longitude").cast("double")) \
    .withColumn("Latitude", col("Latitude").cast("double")) \
    .withColumn("Average Cost for two", col("Average Cost for two").cast("int")) \
    .withColumn("Price range", col("Price range").cast("int")) \
    .withColumn("Aggregate rating", col("Aggregate rating").cast("double")) \
    .withColumn("Votes", col("Votes").cast("int"))



## 5. Data Preview

Display the first 10 rows of the merged DataFrame and print the schema to verify all transformations.


In [19]:
df_merged.printSchema()
df_merged.show(10, truncate=False)

root
 |-- Restaurant ID: integer (nullable = true)
 |-- Restaurant Name: string (nullable = true)
 |-- Country Code: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Locality: string (nullable = true)
 |-- Locality Verbose: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Cuisines: string (nullable = true)
 |-- Average Cost for two: integer (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Has Table Booking: string (nullable = true)
 |-- Has Online delivery: string (nullable = true)
 |-- Is delivering now: string (nullable = true)
 |-- Switch to order menu: string (nullable = true)
 |-- Price range: integer (nullable = true)
 |-- Aggregate rating: double (nullable = true)
 |-- Rating color: string (nullable = true)
 |-- Rating text: string (nullable = true)
 |-- Votes: integer (nullable = true)

+-------------+--------------------------------+------------+

In [20]:
df_merged=df_merged.toPandas()
df_country_code=df_country_code.toPandas()

In [21]:
from google.cloud import bigquery


client = bigquery.Client.from_service_account_json("fooddelivary-456823-65b29f3b5b38key.json")


table_id_merged = "fooddelivary-456823.food_analytics.resturant"
table_id_country = "fooddelivary-456823.food_analytics.country_code"


job1 = client.load_table_from_dataframe(df_merged, table_id_merged).result()
job2 = client.load_table_from_dataframe(df_country_code, table_id_country).result()

print("Send Done in BigQuery")


Send Done in BigQuery
