# Single table column transformation (col names)

In [0]:
dbutils.fs.ls('mnt/silver/sales/')

[FileInfo(path='dbfs:/mnt/silver/sales/customers/', name='customers/', size=0, modificationTime=1729255720000),
 FileInfo(path='dbfs:/mnt/silver/sales/order_items/', name='order_items/', size=0, modificationTime=1729255721000),
 FileInfo(path='dbfs:/mnt/silver/sales/orders/', name='orders/', size=0, modificationTime=1729255723000),
 FileInfo(path='dbfs:/mnt/silver/sales/staffs/', name='staffs/', size=0, modificationTime=1729255725000),
 FileInfo(path='dbfs:/mnt/silver/sales/stores/', name='stores/', size=0, modificationTime=1729255726000)]

In [0]:
dbutils.fs.ls('mnt/gold/')

[FileInfo(path='dbfs:/mnt/gold/ratail_inventory/', name='ratail_inventory/', size=0, modificationTime=1729255865000)]

In [0]:
df = spark.read.format('delta').load('/mnt/silver/sales/orders/')

In [0]:
display(df)

order_id,customer_id,order_status,order_date,required_date,store_id,staff_id,shipped_date
1,259,4,2016-01-01,2016-01-03,1,2,2016-01-03
2,1212,4,2016-01-01,2016-01-04,2,6,2016-01-03
3,523,4,2016-01-02,2016-01-05,2,7,2016-01-03
4,175,4,2016-01-03,2016-01-04,1,3,2016-01-05
5,1324,4,2016-01-03,2016-01-06,2,6,2016-01-06
6,94,4,2016-01-04,2016-01-07,2,6,2016-01-05
7,324,4,2016-01-04,2016-01-07,2,6,2016-01-05
8,1204,4,2016-01-04,2016-01-05,2,7,2016-01-05
9,60,4,2016-01-05,2016-01-08,1,2,2016-01-08
10,442,4,2016-01-05,2016-01-06,2,6,2016-01-06


In [0]:
from pyspark.sql.functions import col

def rename_columns_to_snake_case(df):
    """
    Convert column names from PascalCase or camelCase to snake_case in a PySpark DataFrame.

    Args:
        df (DataFrame): The input DataFrame with columns to be renamed.

    Returns:
        DataFrame: A new DataFrame with column names converted to snake_case.
    """
    # Get the list of column names
    column_names = df.columns

    # Dictionary to hold old and new column name mappings
    rename_map = {}

    for old_col_name in column_names:
        # Convert column name from PascalCase or camelCase to snake_case
        new_col_name = "".join([
            "_" + char.lower() if (
                char.isupper()              # Check if the current character is uppercase
                and idx > 0                 # Ensure it's not the first character
                and not old_col_name[idx - 1].isupper()  # Ensure the previous character is not uppercase
            ) else char.lower()  # Convert character to lowercase
            for idx, char in enumerate(old_col_name)
        ]).lstrip("_")  # Remove any leading underscore

        # Avoid renaming to an existing column name
        if new_col_name in rename_map.values():
            raise ValueError(f"Duplicate column name found after renaming: '{new_col_name}'")

        # Map the old column name to the new column name
        rename_map[old_col_name] = new_col_name

    # Rename columns using the mapping
    for old_col_name, new_col_name in rename_map.items():
        df = df.withColumnRenamed(old_col_name, new_col_name)

    return df

# Example usage
# df = rename_columns_to_snake_case(df)



In [0]:
df = rename_columns_to_snake_case(df)

In [0]:
display(df)

order_id,customer_id,order_status,order_date,required_date,store_id,staff_id,shipped_date
1,259,4,2016-01-01,2016-01-03,1,2,2016-01-03
2,1212,4,2016-01-01,2016-01-04,2,6,2016-01-03
3,523,4,2016-01-02,2016-01-05,2,7,2016-01-03
4,175,4,2016-01-03,2016-01-04,1,3,2016-01-05
5,1324,4,2016-01-03,2016-01-06,2,6,2016-01-06
6,94,4,2016-01-04,2016-01-07,2,6,2016-01-05
7,324,4,2016-01-04,2016-01-07,2,6,2016-01-05
8,1204,4,2016-01-04,2016-01-05,2,7,2016-01-05
9,60,4,2016-01-05,2016-01-08,1,2,2016-01-08
10,442,4,2016-01-05,2016-01-06,2,6,2016-01-06


# All table columns transformation (col names)


In [0]:
# To show the basic format of ls
table_name = []

for i in dbutils.fs.ls('mnt/silver/sales'):
    table_name.append(i)

table_name

[FileInfo(path='dbfs:/mnt/silver/sales/customers/', name='customers/', size=0, modificationTime=1729255720000),
 FileInfo(path='dbfs:/mnt/silver/sales/order_items/', name='order_items/', size=0, modificationTime=1729255721000),
 FileInfo(path='dbfs:/mnt/silver/sales/orders/', name='orders/', size=0, modificationTime=1729255723000),
 FileInfo(path='dbfs:/mnt/silver/sales/staffs/', name='staffs/', size=0, modificationTime=1729255725000),
 FileInfo(path='dbfs:/mnt/silver/sales/stores/', name='stores/', size=0, modificationTime=1729255726000)]

In [0]:
table_name = []

for i in dbutils.fs.ls('mnt/silver/sales'):
    table_name.append(i.name.split('/')[0])

In [0]:
table_name

['customers', 'order_items', 'orders', 'staffs', 'stores']

In [0]:
# # If you get Schema error, run this
# # Check schema of Silver data
# silver_df = spark.read.format("delta").load("/mnt/silver/<table>")
# silver_schema = silver_df.printSchema()

# # Check schema of Gold data
# gold_df = spark.read.format("delta").load("/mnt/gold/<table>")
# gold_schema = gold_df.printSchema()


In [0]:
for name in table_name:
    path = '/mnt/silver/sales/' + name
    print(path)
    df = spark.read.format('delta').load(path)

    df = rename_columns_to_snake_case(df)

    output_path = '/mnt/gold/ratail_inventory/' + name + '/'
    df.write.format('delta').mode('overwrite').save(output_path)

/mnt/silver/sales/customers
/mnt/silver/sales/order_items
/mnt/silver/sales/orders
/mnt/silver/sales/staffs
/mnt/silver/sales/stores


In [0]:
# To show the basic format of ls
table_name = []

for i in dbutils.fs.ls('mnt/silver/production'):
    table_name.append(i)

In [0]:
table_name = []

for i in dbutils.fs.ls('mnt/silver/production'):
    table_name.append(i.name.split('/')[0])

In [0]:
for name in table_name:
    path = '/mnt/silver/production/' + name
    print(path)
    df = spark.read.format('delta').load(path)

    df = rename_columns_to_snake_case(df)

    output_path = '/mnt/gold/ratail_inventory/' + name + '/'
    df.write.format('delta').mode('overwrite').save(output_path)

/mnt/silver/production/categories
/mnt/silver/production/inventory_levels
/mnt/silver/production/products
/mnt/silver/production/suppliers


In [0]:
display(df)

supplier_id,supplier_name
1,"""Electra"""
2,"""Haro"""
3,"""Heller"""
4,"""Pure Cycles"""
5,"""Ritchey"""
6,"""Strider"""
7,"""Sun Bicycles"""
8,"""Surly"""
9,"""Trek"""
