In [None]:
import os

CONTAINER_NAMES = os.environ["container_names"].split(",")
BRONZ_CONTAINER = CONTAINER_NAMES[0]
SILVER_CONTAINER = CONTAINER_NAMES[1]
GOLD_CONTAINER = CONTAINER_NAMES[2]

STORAGE_ACCOUNT_NAME = os.environ["storage_account_name"]

SAS_TOKEN = os.environ["sas_token"].strip('?')

In [None]:
spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", SAS_TOKEN)

In [None]:
BRONZE_FS = f"abfss://{BRONZ_CONTAINER}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/"
SILVER_FS = f"abfss://{SILVER_CONTAINER}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/"
GOLD_FS = f"abfss://{GOLD_CONTAINER}@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/"

Get the list of tables to process

In [None]:
filenames = []
for entry in dbutils.fs.ls(BRONZE_FS):
  filenames.append(entry.name)

filenames

Normalize column names

In [None]:
from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType

def format_column(column_name: str):
    res = [column_name[0]]    
    for ptr in range(1, len(column_name)):
        if column_name[ptr] == column_name[ptr].upper() and column_name[ptr - 1] == column_name[ptr - 1].lower():
            res.append('_')
        res.append(column_name[ptr])
    return ''.join(res)
    

for filename in filenames:
  df = spark.read.format("delta").load(f"{SILVER_FS}/{filename.split('.')[0]}")
  columns = df.columns

  for col in columns:
    df = df.withColumnRenamed(col, format_column(col))

  df.write.format("delta").mode("overwrite").save(f"{GOLD_FS}/{filename.split('.')[0]}")

In [None]:
df.show()