# Bronze to Silver

---

In [0]:
# Required Imports

# Core SparkSession and DataFrame API
from pyspark.sql import SparkSession, DataFrame

import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql import Window, Row

import os
import ast

In [0]:
# Azure Storage Account Details:
storage_account_name = "adlsyelpdata"
account_key = "BxtVS/wmFdQMoV0n1q7tagxVM1TCQNObKTt8mNx/MKPcysRqWv4jsBTjSp9q6VRH+68M1zuRAwA9+AStEZq7Rg=="

In [0]:
# Creating Spark Session
spark = SparkSession.builder \
  .appName("Bronze_to_Silver") \
  .getOrCreate()


# Connecting to Azure Storage using Account Key
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
  account_key
)

In [0]:
# Bronze container files
display(dbutils.fs.ls(f"abfss://bronze@adlsyelpdata.dfs.core.windows.net/"))

path,name,size,modificationTime
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/,us_city/,0,1733131620000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_api/,yelp_api/,0,1732979576000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/,yelp_open/,0,1732979570000


In [0]:
# Displaying files present in folder yelp_open
display(dbutils.fs.ls(f"abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open"))

path,name,size,modificationTime
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_business.json,yelp_academic_dataset_business.json,118863795,1732979653000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_checkin.json,yelp_academic_dataset_checkin.json,286958945,1732979698000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_review.json,yelp_academic_dataset_review.json,5341868833,1732980299000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_tip.json,yelp_academic_dataset_tip.json,180604475,1732979691000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_user.json,yelp_academic_dataset_user.json,3363329011,1732979911000


In [0]:
# Displaying files present in folder us_city
display(dbutils.fs.ls(f"abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city"))

path,name,size,modificationTime
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-attributes.csv,us-cities-attributes.csv,1614,1733131926000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-demographics.json,us-cities-demographics.json,850747,1733131636000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-humidity.csv,us-cities-humidity.csv,9075077,1733131956000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-pressure.csv,us-cities-pressure.csv,12155911,1733131958000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-temperature.csv,us-cities-temperature.csv,13971171,1733131958000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-weather-description.csv,us-cities-weather-description.csv,21858089,1733131960000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-wind-direction.csv,us-cities-wind-direction.csv,10171003,1733131954000
abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-wind-speed.csv,us-cities-wind-speed.csv,7457531,1733131924000


---

## Normalization of Yelp dataset

In [0]:
# Function to describe a dataframe
def describe_dataframe(df: DataFrame, sample_rows: int = 5):
    """
    A reusable function to describe all details of a PySpark DataFrame.
    
    Parameters:
        df (DataFrame): The PySpark DataFrame to describe.
        sample_rows (int): Number of sample rows to display. Default is 5.
        
    Returns:
        None: Prints the DataFrame details.
    """
    print("=== DataFrame Overview ===")
    
    # Number of rows and columns
    print(f"\nNumber of rows: {df.count()}")
    print(f"Number of columns: {len(df.columns)}")
    
    # Schema
    print("\nSchema:")
    df.printSchema()
    
    # Display sample data
    print(f"\nSample Data (First {sample_rows} Rows):")
    df.show(sample_rows, truncate=False)

In [0]:
def read_file_from_adls(file_path, options=None):
    """
    Reads a file from Azure Data Lake Storage (ADLS) into a PySpark DataFrame.
    Automatically detects the file type based on the file extension.

    Parameters:
        file_path (str): The ADLS file path (abfss://...).
        options (dict): Additional options to pass to the reader (e.g., for CSV headers).

    Returns:
        DataFrame: A PySpark DataFrame containing the data.
    """
    try:
        # Default options
        if options is None:
            options = {}

        # Detect file type from the file extension
        file_extension = os.path.splitext(file_path)[-1].lower()
        
        if file_extension == ".json":
            df = spark.read.options(**options).json(file_path)
        elif file_extension == ".csv":
            df = spark.read.options(**options).csv(file_path)
        elif file_extension == ".parquet":
            df = spark.read.parquet(file_path)
        elif file_extension == ".avro":
            df = spark.read.format("avro").options(**options).load(file_path)
        else:
            raise ValueError(f"Unsupported file extension: {file_extension}")

        print(f"Successfully loaded file: {file_path} as {file_extension.upper()}")
        return df
    except Exception as e:
        print(f"Error loading file {file_path}: {e}")
        return None


In [0]:
def write_parquet_to_adls(df, path, mode="overwrite", partition_by=None):
    """
    Write a PySpark DataFrame to Parquet format with options for overwrite mode and partitioning.

    Parameters:
        df (DataFrame): The PySpark DataFrame to write.
        path (str): The destination path for the Parquet file.
        mode (str): Write mode. Options: 'overwrite', 'append', 'ignore', 'error'. Default is 'overwrite'.
        partition_by (list): List of column names to partition the data by. Default is None.
    
    Returns:
        None
    """
    try:
        if partition_by:
            df.write.parquet(path, mode=mode, partitionBy=partition_by)
        else:
            df.write.parquet(path, mode=mode)
        
        print(f"Successfully written Parquet file to {path}")
    except Exception as e:
        print(f"Error writing Parquet file to {path}: {e}")

In [0]:
# Yelp Open dataset Paths
business_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_business.json"
checkin_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_checkin.json"
review_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_review.json"
tip_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_tip.json"
user_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_user.json"


# City Demographics Path
city_demograph_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-demographics.json"
city_attributes_path = "abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-attributes.json"

### business.json

In [0]:
business_df = read_file_from_adls(business_path)
describe_dataframe(business_df)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_business.json as .JSON
=== DataFrame Overview ===

Number of rows: 150346
Number of columns: 14

Schema:
root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- C

#### business_attributes

In [0]:
business_attributes_df = business_df.select("business_id", "attributes.*")
describe_dataframe(business_attributes_df)

=== DataFrame Overview ===

Number of rows: 150346
Number of columns: 40

Schema:
root
 |-- business_id: string (nullable = true)
 |-- AcceptsInsurance: string (nullable = true)
 |-- AgesAllowed: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- Ambience: string (nullable = true)
 |-- BYOB: string (nullable = true)
 |-- BYOBCorkage: string (nullable = true)
 |-- BestNights: string (nullable = true)
 |-- BikeParking: string (nullable = true)
 |-- BusinessAcceptsBitcoin: string (nullable = true)
 |-- BusinessAcceptsCreditCards: string (nullable = true)
 |-- BusinessParking: string (nullable = true)
 |-- ByAppointmentOnly: string (nullable = true)
 |-- Caters: string (nullable = true)
 |-- CoatCheck: string (nullable = true)
 |-- Corkage: string (nullable = true)
 |-- DietaryRestrictions: string (nullable = true)
 |-- DogsAllowed: string (nullable = true)
 |-- DriveThru: string (nullable = true)
 |-- GoodForDancing: string (nullable = true)
 |-- GoodForKids: string (nul

In [0]:
# Function to parse boolean-like attributes using built-in PySpark functions
def convert_to_boolean(df, columns):
    for col_name in columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name) == 'True', True)
            .when(F.col(col_name) == 'False', False)
            .otherwise(None)
        )
    return df

# List of boolean-like attributes
bool_attrs = [
    "AcceptsInsurance",
    "BYOB",
    "BikeParking",
    "BusinessAcceptsBitcoin",
    "BusinessAcceptsCreditCards",
    "ByAppointmentOnly",
    "Caters",
    "CoatCheck",
    "Corkage",
    "DogsAllowed",
    "DriveThru",
    "GoodForDancing",
    "GoodForKids",
    "HappyHour",
    "HasTV",
    "Open24Hours",
    "OutdoorSeating",
    "RestaurantsCounterService",
    "RestaurantsDelivery",
    "RestaurantsGoodForGroups",
    "RestaurantsReservations",
    "RestaurantsTableService",
    "RestaurantsTakeOut",
    "WheelchairAccessible"
]

# Apply the transformation
business_attributes_df = convert_to_boolean(business_attributes_df, bool_attrs)

In [0]:
# Function to clean and standardize string attributes using PySpark functions
def convert_to_cleaned_string(df, columns):
    """
    Cleans and standardizes string attributes in a PySpark DataFrame.

    Parameters:
        df (DataFrame): Input PySpark DataFrame.
        columns (list): List of column names to be cleaned.

    Returns:
        DataFrame: PySpark DataFrame with updated columns.
    """
    for col_name in columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name).isNull() | (F.col(col_name) == ''), None)  # Handle null or empty strings
            .otherwise(F.lower(F.regexp_replace(F.regexp_replace(F.col(col_name), "u'", ""), "'", "")))  # Standardize strings
        )
    return df

# List of string-like attributes
str_attrs = [
    "AgesAllowed", 
    "Alcohol",
    "BYOBCorkage",
    "NoiseLevel",
    "RestaurantsAttire",
    "Smoking",
    "WiFi",
]

# Apply the function to your DataFrame
business_attributes_df = convert_to_cleaned_string(business_attributes_df, str_attrs)

In [0]:
# Function to convert string-like integers to native integer type using PySpark functions
def convert_to_integer(df, columns):
    """
    Converts string-like integers (e.g., '1', '2') into native integer types in a PySpark DataFrame.

    Parameters:
        df (DataFrame): Input PySpark DataFrame.
        columns (list): List of column names to be converted.

    Returns:
        DataFrame: PySpark DataFrame with updated columns.
    """
    for col_name in columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name).isNull() | (F.col(col_name) == 'None'), None)  # Handle null or 'None'
            .otherwise(F.col(col_name).cast('int'))  # Cast valid values to integers
        )
    return df

# List of integer-like attributes
int_attrs = [
    "RestaurantsPriceRange2",
]

# Apply the function to your DataFrame
business_attributes_df = convert_to_integer(business_attributes_df, int_attrs)

In [0]:
def parse_boolean_dict(x):
    # Convert dicts masked as strings to string:boolean format
    if x is None or x == 'None' or x == '':
        return None
    return ast.literal_eval(x)

parse_boolean_dict_udf = F.udf(parse_boolean_dict, T.MapType(T.StringType(), T.BooleanType()))

bool_dict_attrs = [
    "Ambience",
    "BestNights",
    "BusinessParking",
    "DietaryRestrictions",
    "GoodForMeal",
    "HairSpecializesIn",
    "Music"
]

for attr in bool_dict_attrs:
    business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_dict_udf(attr))
    # Get all keys of the MapType
    key_rows = business_attributes_df.select(F.explode(attr)).select("key").distinct().collect()
    # Convert each key into column (with proper name)
    exprs = ["{}['{}'] as {}".format(attr, row.key, attr+"_"+row.key.replace('-', '_')) for row in key_rows]
    business_attributes_df = business_attributes_df.selectExpr("*", *exprs).drop(attr)

In [0]:
describe_dataframe(business_attributes_df)

=== DataFrame Overview ===

Number of rows: 150346
Number of columns: 82

Schema:
root
 |-- business_id: string (nullable = true)
 |-- AcceptsInsurance: boolean (nullable = true)
 |-- AgesAllowed: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- BYOB: boolean (nullable = true)
 |-- BYOBCorkage: string (nullable = true)
 |-- BikeParking: boolean (nullable = true)
 |-- BusinessAcceptsBitcoin: boolean (nullable = true)
 |-- BusinessAcceptsCreditCards: boolean (nullable = true)
 |-- ByAppointmentOnly: boolean (nullable = true)
 |-- Caters: boolean (nullable = true)
 |-- CoatCheck: boolean (nullable = true)
 |-- Corkage: boolean (nullable = true)
 |-- DogsAllowed: boolean (nullable = true)
 |-- DriveThru: boolean (nullable = true)
 |-- GoodForDancing: boolean (nullable = true)
 |-- GoodForKids: boolean (nullable = true)
 |-- HappyHour: boolean (nullable = true)
 |-- HasTV: boolean (nullable = true)
 |-- NoiseLevel: string (nullable = true)
 |-- Open24Hours: boolean (null

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/business_attributes.parquet"
write_parquet_to_adls(business_attributes_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/business_attributes.parquet


#### city_demographics.csv

In [0]:
city_demograph_df = read_file_from_adls(city_demograph_path)
describe_dataframe(city_demograph_df)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/us_city/us-cities-demographics.json as .JSON
=== DataFrame Overview ===

Number of rows: 2891
Number of columns: 12

Schema:
root
 |-- average_household_size: double (nullable = true)
 |-- city: string (nullable = true)
 |-- count: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- median_age: double (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- race: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- total_population: long (nullable = true)


Sample Data (First 5 Rows):
+----------------------+---------+------+-----------------+------------+---------------+----------+------------------+---------------------------------+----------+----------+----------------+
|average_household_size|city     |count |female_population|fore

In [0]:
# Flatten the `fields` struct
city_demograph_df = city_demograph_df.select(*city_demograph_df.columns)

# Prepare the `race` column
city_demograph_df = city_demograph_df.withColumn(
    "race",
    F.regexp_replace(F.lower(F.col("race")), "[\\s-]", "_")  # Replace spaces and hyphens with underscores
)

# Group by all columns except `race` and `count` and pivot `race` into columns
group_columns = [col for col in city_demograph_df.columns if col not in ["race", "count"]]
city_demograph_df = city_demograph_df.groupby(*group_columns).pivot("race").max("count")


In [0]:
describe_dataframe(city_demograph_df)

=== DataFrame Overview ===

Number of rows: 596
Number of columns: 15

Schema:
root
 |-- average_household_size: double (nullable = true)
 |-- city: string (nullable = true)
 |-- female_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- median_age: double (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- total_population: long (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----------------+-----------------+------------+---------------+----------+------------------+--------------+----------+----------------+---------------------------------+-----+----------

In [0]:
# Select and prepare city-state data from business_df
cities_df = (
    business_df
    .selectExpr("city", "state as state_code")
    .filter(F.col("city").isNotNull() & F.col("state_code").isNotNull())  # Filter null values
    .distinct()  # Ensure unique city-state pairs
)

# Join with demographics data
cities_df = (
    cities_df
    .join(city_demograph_df, ["city", "state_code"], how="left")  # Left join with demographics data
    .withColumn("city_id", F.monotonically_increasing_id())  # Add unique city identifier
)

In [0]:
describe_dataframe(cities_df)

=== DataFrame Overview ===

Number of rows: 1467
Number of columns: 16

Schema:
root
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- female_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- median_age: double (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- state: string (nullable = true)
 |-- total_population: long (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)
 |-- city_id: long (nullable = false)


Sample Data (First 5 Rows):
+------------+----------+----------------------+-----------------+------------+---------------+----------+------------------+-----+----------------+------------------------

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/cities.parquet"
write_parquet_to_adls(cities_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/cities.parquet


#### addresses
Pull address information from business.json, but instead of city take newly created city_id.

In [0]:
addresses_df = (
    business_df
    .selectExpr("address", "latitude", "longitude", "postal_code", "city", "state as state_code")
    .join(cities_df.select("city", "state_code", "city_id"), ["city", "state_code"], how='left')
    .drop("city", "state_code")
    .distinct()
    .withColumn("address_id", F.row_number().over(Window.orderBy("city_id", "address")))
)

In [0]:
addresses_df.filter(F.col("city_id").isNull()).show()

+-------+--------+---------+-----------+-------+----------+
|address|latitude|longitude|postal_code|city_id|address_id|
+-------+--------+---------+-----------+-------+----------+
+-------+--------+---------+-----------+-------+----------+



In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/addresses.parquet"
write_parquet_to_adls(addresses_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/addresses.parquet


#### business categories

In [0]:
# Parse and normalize categories
business_categories_df = business_df.select(
    "business_id",
    F.split(
        F.regexp_replace("categories", "Wills, Trusts, & Probates", "Wills_Trusts_Probates"),
        r",\s*"
    ).alias("categories")
)

# Create unique categories with IDs
categories_df = business_categories_df.select(
    F.explode("categories").alias("category")
).dropDuplicates()

categories_df = categories_df.withColumn(
    "category_id", F.row_number().over(Window.orderBy("category"))
)

# Create business-category mapping
business_categories_df = business_categories_df.select(
    "business_id", F.explode("categories").alias("category")
).join(categories_df, "category", how="left").drop("category")


In [0]:
describe_dataframe(business_categories_df)

=== DataFrame Overview ===

Number of rows: 668516
Number of columns: 2

Schema:
root
 |-- business_id: string (nullable = true)
 |-- category_id: integer (nullable = true)


Sample Data (First 5 Rows):
+----------------------+-----------+
|business_id           |category_id|
+----------------------+-----------+
|Pns2l4eNsfO8kk83dixA6A|359        |
|Pns2l4eNsfO8kk83dixA6A|1200       |
|Pns2l4eNsfO8kk83dixA6A|798        |
|Pns2l4eNsfO8kk83dixA6A|8          |
|Pns2l4eNsfO8kk83dixA6A|551        |
+----------------------+-----------+
only showing top 5 rows



In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/business_categories.parquet"
write_parquet_to_adls(business_categories_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/business_categories.parquet


#### business hours

In [0]:
# Select hours columns
business_hours_df = business_df.select("business_id", "hours.*")

# Days of the week
hour_attrs = [
    "Monday",
    "Tuesday",
    "Wednesday",
    "Thursday",
    "Friday",
    "Saturday",
    "Sunday",
]

# Process hours using PySpark functions
for attr in hour_attrs:
    business_hours_df = business_hours_df \
        .withColumn(
            f"{attr}_from", 
            (F.split(F.col(attr), "-")[0].substr(0, 2).cast("int") * 100 + 
             F.split(F.col(attr), "-")[0].substr(4, 2).cast("int"))
        ) \
        .withColumn(
            f"{attr}_to", 
            (F.split(F.col(attr), "-")[1].substr(0, 2).cast("int") * 100 + 
             F.split(F.col(attr), "-")[1].substr(4, 2).cast("int"))
        ) \
        .drop(attr)

In [0]:
describe_dataframe(business_hours_df)

=== DataFrame Overview ===

Number of rows: 150346
Number of columns: 15

Schema:
root
 |-- business_id: string (nullable = true)
 |-- Monday_from: integer (nullable = true)
 |-- Monday_to: integer (nullable = true)
 |-- Tuesday_from: integer (nullable = true)
 |-- Tuesday_to: integer (nullable = true)
 |-- Wednesday_from: integer (nullable = true)
 |-- Wednesday_to: integer (nullable = true)
 |-- Thursday_from: integer (nullable = true)
 |-- Thursday_to: integer (nullable = true)
 |-- Friday_from: integer (nullable = true)
 |-- Friday_to: integer (nullable = true)
 |-- Saturday_from: integer (nullable = true)
 |-- Saturday_to: integer (nullable = true)
 |-- Sunday_from: integer (nullable = true)
 |-- Sunday_to: integer (nullable = true)


Sample Data (First 5 Rows):
+----------------------+-----------+---------+------------+----------+--------------+------------+-------------+-----------+-----------+---------+-------------+-----------+-----------+---------+
|business_id           |Mon

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/business_hours.parquet"
write_parquet_to_adls(business_hours_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/business_hours.parquet


#### businesses

In [0]:
# Create surrogate keys for faster joins (optional optimization)
addresses_df = addresses_df.withColumn(
    "address_key",
    F.concat_ws("_", F.col("address"), F.col("latitude"), F.col("longitude"), F.col("postal_code"))
)
business_df = business_df.withColumn(
    "address_key",
    F.concat_ws("_", F.col("address"), F.col("latitude"), F.col("longitude"), F.col("postal_code"))
)

# Join on address_key and select relevant columns
businesses_df = business_df.join(
    addresses_df, "address_key", how="left"
).select(
    "business_id",
    "address_id",
    F.col("is_open").cast("boolean").alias("is_open"),
    "name",
    "review_count",
    "stars"
)

# Optional: Filter out unmatched businesses
businesses_df = businesses_df.filter(F.col("address_id").isNotNull())

In [0]:
describe_dataframe(businesses_df)

=== DataFrame Overview ===

Number of rows: 152495
Number of columns: 6

Schema:
root
 |-- business_id: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- is_open: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----------+-------+-------------------------+------------+-----+
|business_id           |address_id|is_open|name                     |review_count|stars|
+----------------------+----------+-------+-------------------------+------------+-----+
|RfkB6YGl0YUrt_ECXr7lxA|1         |true   |Tranquil Salon & Spa     |23          |4.5  |
|ttenaAAadxy5L-2oDR1ztQ|2         |true   |Beijing                  |9           |3.0  |
|ajfBGoRGUzhD0fTXLENcrw|3         |true   |Asahi Sushi & Asian Grill|56          |4.0  |
|Cpa72Va_GG2v3RLbZLkMFg|4         |true   |Studio Health Medical Spa|5           |5.0  |
|ONmS8r1C0F0vglz2nKj

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/businesses.parquet"
write_parquet_to_adls(businesses_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/businesses.parquet


### review.json

In [0]:
review_df = read_file_from_adls(review_path)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_review.json as .JSON


In [0]:
describe_dataframe(review_df)

=== DataFrame Overview ===

Number of rows: 6990280
Number of columns: 9

Schema:
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----+-------------------+-----+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
review_df = review_df.withColumn("ts", F.to_timestamp("date"))

In [0]:
describe_dataframe(review_df)

=== DataFrame Overview ===

Number of rows: 6990280
Number of columns: 10

Schema:
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- ts: timestamp (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----+-------------------+-----+----------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/review.parquet"
write_parquet_to_adls(review_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/review.parquet


### user.json

In [0]:
user_df = read_file_from_adls(user_path)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_user.json as .JSON


In [0]:
describe_dataframe(user_df)

=== DataFrame Overview ===

Number of rows: 1987897
Number of columns: 22

Schema:
root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (null

#### elite_years

In [0]:
# Transform the 'elite' column into individual years
elite_years_df = user_df.select("user_id", "elite") \
    .filter(F.col("elite").isNotNull()) \
    .withColumn("year", F.explode(F.split(F.col("elite"), ","))) \
    .filter((F.col("year") != "") & (F.col("year").isNotNull())) \
    .withColumn("year", F.col("year").cast("integer")) \
    .select("user_id", "year")

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/elite_years.parquet"
write_parquet_to_adls(elite_years_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/elite_years.parquet


#### friends

In [0]:
# Transform the 'friends' column into individual friend relationships
friends_df = user_df.select("user_id", "friends") \
    .filter(F.col("friends").isNotNull()) \
    .withColumn("friend_id", F.explode(F.split(F.col("friends"), ",\\s*"))) \
    .filter((F.col("friend_id") != "") & (F.col("friend_id").isNotNull())) \
    .select("user_id", "friend_id") \
    .distinct()

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/friends.parquet"
write_parquet_to_adls(friends_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/friends.parquet


In [0]:
# Drop unnecessary fields and cast 'yelping_since' to timestamp
user_df = user_df.drop("elite", "friends") \
    .filter(F.col("yelping_since").isNotNull()) \
    .withColumn("yelping_since", F.to_timestamp("yelping_since"))

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/user.parquet"
write_parquet_to_adls(user_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/user.parquet


### checkin.json

In [0]:
checkin_df = read_file_from_adls(checkin_path)
describe_dataframe(checkin_df)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_checkin.json as .JSON
=== DataFrame Overview ===

Number of rows: 131930
Number of columns: 2

Schema:
root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|business_id           |date                                                                              

In [0]:
# Transform the 'date' column into individual timestamps
checkin_df = checkin_df.select("business_id", "date") \
    .filter(F.col("date").isNotNull()) \
    .withColumn("ts", F.explode(F.split(F.col("date"), ",\\s*"))) \
    .filter((F.col("ts") != "") & (F.col("ts").isNotNull())) \
    .withColumn("ts", F.to_timestamp("ts")) \
    .select("business_id", "ts")

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/checkin.parquet"
write_parquet_to_adls(checkin_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/checkin.parquet


### tip.json

In [0]:
tip_df = read_file_from_adls(tip_path)
describe_dataframe(tip_df)

Successfully loaded file: abfss://bronze@adlsyelpdata.dfs.core.windows.net/yelp_open/yelp_academic_dataset_tip.json as .JSON
=== DataFrame Overview ===

Number of rows: 908915
Number of columns: 5

Schema:
root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)


Sample Data (First 5 Rows):
+----------------------+----------------+-------------------+---------------------------------------------------------+----------------------+
|business_id           |compliment_count|date               |text                                                     |user_id               |
+----------------------+----------------+-------------------+---------------------------------------------------------+----------------------+
|3uLgwr0qeCNMjKenHJwPGQ|0               |2012-05-18 02:17:21|Avengers time with the ladies.                           |AGNUgVwnZUe

In [0]:
tips_df = tip_df.withColumn("ts", F.to_timestamp("date")) \
    .drop("date") \
    .withColumn("tip_id", F.row_number().over(Window.orderBy("ts")))

In [0]:
output_path = "abfss://silver@adlsyelpdata.dfs.core.windows.net/tips.parquet"
write_parquet_to_adls(tips_df, output_path)

Successfully written Parquet file to abfss://silver@adlsyelpdata.dfs.core.windows.net/tips.parquet
