In [0]:
import re
from pyspark.sql import DataFrame

# Utility: Convert camelCase or PascalCase to snake_case
def to_snake_case(name: str) -> str:
    """
    Convert camelCase, PascalCase, or mixed-case strings to snake_case.
    Handles common acronyms like 'ID' → 'id' correctly.
    """
    name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)  # Insert underscore between camelCase
    name = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", name)  # Handle acronyms like IDNumber → ID_Number
    return name.lower()

# Utility: Clean and rename all column names to snake_case using df.toDF()
def clean_and_snake_case_columns(df: DataFrame) -> DataFrame:
    """
    Clean unwanted characters and convert all column names to snake_case.
    """
    cleaned_cols = []
    for col_name in df.columns:
        # Remove unwanted characters and normalize spacing
        cleaned_name = re.sub(r"[ (){};\n\t=]", "", col_name).strip().replace(" ", "_")
        # Convert to snake_case
        snake_case_name = to_snake_case(cleaned_name)
        cleaned_cols.append(snake_case_name)
    return df.toDF(*cleaned_cols)

# Read delta table and clean + snake_case the column names
def read_delta_with_snake_case(spark, path: str) -> DataFrame:
    """
    Read a Delta table and return DataFrame with cleaned snake_case column names.
    """
    df = spark.read.format("delta").load(path)
    return clean_and_snake_case_columns(df)

# Join store and product on store_id
def get_store_product_data(product_df: DataFrame, store_df: DataFrame) -> DataFrame:
    """
    Join product_df and store_df on 'store_id' using inner join.
    """
    return store_df.join(product_df, on="store_id", how="inner")

# Join sales with enriched store-product data on product_id
def enrich_sales_with_store_product(sales_df: DataFrame, store_product_df: DataFrame) -> DataFrame:
    """
    Join sales_df and store_product_df on 'product_id' using inner join.
    """
    return sales_df.join(store_product_df, on="product_id", how="inner")


In [0]:
# import re
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType

# # UDF to convert camelCase to snake_case
# @udf(StringType())
# def to_snake_case(col_name):
#     """
#     Convert a single column name from camelCase to snake_case using regex.
#     """
#     s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', col_name)
#     return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# # Rename all columns of a DataFrame to snake_case
# def rename_cols_to_snake_case(df):
#     """
#     Rename all column names in the DataFrame to snake_case.
#     """
#     for col_name in df.columns:
#         new_col = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', col_name)
#         new_col = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', new_col).lower()
#         df = df.withColumnRenamed(col_name, new_col)
#     return df

# # Read delta table and rename columns to snake_case
# def read_delta_with_snake_case(spark, path):
#     """
#     Read a Delta table and rename all columns to snake_case.
#     """
#     df = spark.read.format("delta").load(path)
#     return rename_cols_to_snake_case(df)

# # Join store and product on store_id
# def get_store_product_data(product_df, store_df):
#     """
#     Join product_df and store_df on 'store_id' using inner join.
#     """
#     return store_df.join(product_df, on="store_id", how="inner")

# # Join sales with enriched store-product data on product_id
# def enrich_sales_with_store_product(sales_df, store_product_df):
#     """
#     Join sales_df and store_product_df on 'product_id' using inner join.
#     """
#     return sales_df.join(store_product_df, on="product_id", how="inner")


In [0]:
# import re
# from pyspark.sql import DataFrame
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType

# # UDF to convert camelCase to snake_case
# @udf(StringType())
# def to_snake_case(col_name: str) -> str:
#     """
#     Convert a single column name from camelCase to snake_case using regex.
#     """
#     s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', col_name)
#     return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# # Rename all columns of a DataFrame to snake_case
# def rename_cols_to_snake_case(df: DataFrame) -> DataFrame:
#     """
#     Rename all column names in the DataFrame to snake_case.
#     """
#     for col_name in df.columns:
#         new_col = to_snake_case(col_name)  # Use the UDF for consistency
#         df = df.withColumnRenamed(col_name, new_col)
#     return df

# # Read delta table and rename columns to snake_case
# def read_delta_with_snake_case(spark, path: str) -> DataFrame:
#     """
#     Read a Delta table and rename all columns to snake_case.
#     """
#     df = spark.read.format("delta").load(path)
#     return rename_cols_to_snake_case(df)

# # Join store and product on store_id
# def get_store_product_data(product_df: DataFrame, store_df: DataFrame) -> DataFrame:
#     """
#     Join product_df and store_df on 'store_id' using inner join.
#     """
#     return store_df.join(product_df, on="store_id", how="inner")

# # Join sales with enriched store-product data on product_id
# def enrich_sales_with_store_product(sales_df: DataFrame, store_product_df: DataFrame) -> DataFrame:
#     """
#     Join sales_df and store_product_df on 'product_id' using inner join.
#     """
#     return sales_df.join(store_product_df, on="product_id", how="inner")


In [0]:
# import re
# from pyspark.sql import DataFrame
# from pyspark.sql.functions import col, split, lower, date_format, when, regexp_extract
# from pyspark.sql.types import StringType
# from pyspark.sql import functions as F

# # UDF to convert camelCase to snake_case
# def to_snake_case(col_name: str) -> str:
#     """
#     Convert a single column name from camelCase to snake_case using regex.
#     """
#     s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', col_name)
#     return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# # Function to rename columns in a DataFrame to snake_case
# def rename_cols_to_snake_case(df: DataFrame) -> DataFrame:
#     """
#     Rename all column names in the DataFrame to snake_case.
#     """
#     for col_name in df.columns:
#         new_col = to_snake_case(col_name)
#         df = df.withColumnRenamed(col_name, new_col)
#     return df

# # Function to transform customer DataFrame
# def transform_customer_data(df: DataFrame) -> DataFrame:
#     df = rename_cols_to_snake_case(df)
    
#     # Split Name into first_name and last_name
#     df = df.withColumn("first_name", split(col("name"), " ").getItem(0)) \
#            .withColumn("last_name", split(col("name"), " ").getItem(1))
    
#     # Extract domain from email
#     df = df.withColumn("domain", regexp_extract(col("email"), r'@(.+)', 1))
    
#     # Map gender
#     df = df.withColumn("gender", when(col("gender") == "Male", "M").otherwise("F"))
    
#     # Split Joining date into date and time
#     df = df.withColumn("date", date_format(split(col("joining_date"), " ").getItem(0), "yyyy-MM-dd")) \
#            .withColumn("time", split(col("joining_date"), " ").getItem(1))
    
#     # Create expenditure-status
#     df = df.withColumn("expenditure_status", when(col("spent") < 200, "MINIMUM").otherwise("MAXIMUM"))
    
#     return df

# # Function to transform product DataFrame
# def transform_product_data(df: DataFrame) -> DataFrame:
#     df = rename_cols_to_snake_case(df)
    
#     # Create sub_category based on category_id
#     df = df.withColumn("sub_category", 
#                        when(col("category_id") == 1, "phone")
#                        .when(col("category_id") == 2, "laptop")
#                        .when(col("category_id") == 3, "playstation")
#                        .when(col("category_id") == 4, "e-device"))
    
#     return df

# # Function to transform store DataFrame
# def transform_store_data(df: DataFrame) -> DataFrame:
#     df = rename_cols_to_snake_case(df)
    
#     # Extract store category from email
#     df = df.withColumn("store_category", regexp_extract(col("email"), r'@(.+?)\.', 1))
    
#     # Format created_at and updated_at
#     df = df.withColumn("created_at", date_format(col("created_at"), "yyyy-MM-dd")) \
#            .withColumn("updated_at", date_format(col("updated_at"), "yyyy-MM-dd"))
    
#     return df

# # Function to transform sales DataFrame
# def transform_sales_data(df: DataFrame) -> DataFrame:
#     df = rename_cols_to_snake_case(df)
#     return df

# # Function to create the final gold layer DataFrame
# def create_gold_layer(sales_df: DataFrame, product_df: DataFrame, store_df: DataFrame) -> DataFrame:
#     # Join sales with product and store data
#     joined_df = sales_df.join(product_df, on="product_id", how="inner") \
#                          .join(store_df, on="store_id", how="inner")
    
#     # Select required columns for the gold layer
#     gold_df = joined_df.select(
#         col("order_date"),
#         col("category"),
#         col("city"),
#         col("customer_id"),
#         col("order_id"),
#         col("product_id"),
#         col("profit"),
#         col("region"),
#         col("sales"),
#         col("segment"),
#         col("ship_date"),
#         col("ship_mode"),
#         col("latitude"),
#         col("longitude"),
#         col("store_name"),
#         col("location"),
#         col("manager_name"),
#         col("product_name"),
#         col("price"),
#         col("stock_quantity"),
#         col("image_url")
#     )
    
#     return gold_df
