**SINGLE TABLE COLUMN TRANSFORMATION**

In [0]:
dbutils.fs.ls('mnt/silver/SalesLT')

In [0]:
dbutils.fs.ls('mnt/gold/')

In [0]:
df = spark.read.format('delta').load('/mnt/silver/SalesLT/Address/')

In [0]:
display(df)

Convert column names from PascalCase or camelCase to snake_case in a PySpark DataFrame.

    Args:
        df (DataFrame): The input DataFrame with columns to be renamed.

    Returns:
        DataFrame: A new DataFrame with column names converted to snake_case.

In [0]:
from pyspark.sql.functions import col

def rename_columns_to_snake_case(df):
    #Get the list of column names
    column_names = df.columns

    #dictionary to hold old and new column names mappings
    rename_map = {}

    for old_col_name in column_names:
      #convert column names from PascalCase or camelCase to snake_case
      new_col_name = "".join([
        "_" + char.lower() if (
          char.isupper()
          and idx > 0
          and not old_col_name[idx - 1].isupper()
        ) else char.lower()
        for idx, char in enumerate(old_col_name)
      ]).lstrip("_")

      #avoid renaming to an existing column name
      if new_col_name in rename_map.values():
        raise ValueError(f"Duplicate column name found after renaming: '{new_col_name}'")

      #map the old column name to the new column name
      rename_map[old_col_name] = new_col_name

    #rename columns using the mapping
    for old_col_name, new_col_name in rename_map.items():
      df = df.withColumnRenamed(old_col_name, new_col_name)

    return df

In [0]:
df = rename_columns_to_snake_case(df)

In [0]:
display (df)

**All table columns transformation (col names)**

In [0]:
table_name = []

for i in dbutils.fs.ls('mnt/silver/SalesLT'):   table_name.append(i.name.split('/')[0])

table_name   

In [0]:
for name in table_name:
    path = '/mnt/silver/SalesLT/' + name
    print(path)
    df = spark.read.format('delta').load(path)

    df = rename_columns_to_snake_case(df)

    output_path = '/mnt/gold/SalesLT' + name + '/'
    df.write.format('delta').mode('overwrite').save(output_path)

In [0]:
display(df)

In [0]:
for name in table_name:
    path = '/mnt/silver/SalesLT/' + name
    print(path)
    df = spark.read.format('delta').load(path)

    df = rename_columns_to_snake_case(df)

    output_path = '/mnt/gold/SalesLT/' + name + '/'
    df.write.format('delta').mode('overwrite').save(output_path)