**Single table column transformation (column names)**

In [None]:
# List contents of bronze directory to verify our source
dbutils.fs.ls('mnt/silver/SalesLT/')

In [None]:
# List contents of silver directory to verify our destination
dbutils.fs.ls('mnt/gold/')

In [None]:
# Read Address table from silver layer as delta format
df = spark.read.format('delta').load('/mnt/silver/SalesLT/Address/')

In [None]:
# Display the DataFrame before column name transformation
display(df)

In [None]:
# Import necessary PySpark functions for date transformation
from pyspark.sql.functions import col


# Define function to convert column names to snake_case
def rename_columns_to_snake_case(df):
    """
    Convert column names from PascalCase or camelCase to snake_case in a PySpark DataFrame.
    Example: 'CustomerID' -> 'customer_id', 'firstName' -> 'first_name'

    Args:
        df (DataFrame): The input DataFrame with columns to be renamed.

    Returns:
        DataFrame: A new DataFrame with column names converted to snake_case.
    """
    # Get the list of column names from DataFrame
    column_names = df.columns

    # Dictionary to store mapping of old to new column names
    rename_map = {}

    for old_col_name in column_names:
        # Convert column name using these rules:
        # 1. Add underscore before uppercase letters (except first letter)
        # 2. Convert all letters to lowercase
        # 3. Handle special cases (consecutive uppercase letters)
        new_col_name = "".join([
            "_" + char.lower() if (
                char.isupper()   # Check if the current character is uppercase
                and idx > 0      # Ensure it's not the first character
                and not old_col_name[idx - 1].isupper()  # Ensure the previous character is not uppercase
            ) else char.lower()  # Convert character to lowercase
            for idx, char in enumerate(old_col_name)
        ]).lstrip("_")  # Remove any leading underscore

        # Avoid renaming to an existing column name
        if new_col_name in rename_map.values():
            raise ValueError(f"Duplicate column name found after renaming: '{new_col_name}'")

        # Map the old column name to the new column name
        rename_map[old_col_name] = new_col_name

    # Rename columns using the mapping
    for old_col_name, new_col_name in rename_map.items():
        df = df.withColumnRenamed(old_col_name, new_col_name)

    return df



In [None]:
# Apply the transformation to our DataFrame
df = rename_columns_to_snake_case(df)

In [None]:
# Display DataFrame with transformed column names
display(df)

**All table columns transformation (column names)**

In [None]:
# To show the basic format of ls (for debugging/understanding)
table_name_temp = []

for i in dbutils.fs.ls('mnt/silver/SalesLT'):
    table_name_temp.append(i)



In [None]:
table_name_temp

In [None]:
# Get clean list of table names without path information
table_name = []

for i in dbutils.fs.ls('mnt/silver/SalesLT'):
    table_name.append(i.name.split('/')[0])


In [None]:
table_name

In [None]:
# Process each table in silver layer
for name in table_name:

    # Construct input path from silver layer
    path = '/mnt/silver/SalesLT/' + name
    print(path)

    # Read the table as a DataFrame from silver layer
    df = spark.read.format('delta').load(path)
    
    # Transform column names to snake_case
    df = rename_columns_to_snake_case(df)
    
    # Construct output path in gold layer
    output_path = '/mnt/gold/SalesLT/' + name + '/'

    # Write transformed DataFrame to gold layer in delta format
    df.write.format('delta').mode('overwrite').save(output_path)


In [None]:
# Display the final transformed DataFrame (last table processed)
display(df)