In [0]:
spark.conf.set("spark.sql.ansi.enabled", "false")

from pyspark.sql.functions import (
    col, trim, when, lit, lower, isnan,
    year, month, dayofmonth, date_format, count, sum, avg
)
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType, DoubleType
import re

def to_snake_case(name):
    name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
    name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
    return name.lower().replace(" ", "_")

table_names = []
for i in dbutils.fs.ls("abfss://silver@forsynapseaccount.dfs.core.windows.net/Sales/"):
    table_names.append(i.name.replace("/", ""))

for table in table_names:
    input_path = f"abfss://silver@forsynapseaccount.dfs.core.windows.net/Sales/{table}/"
    df = spark.read.format("delta").load(input_path)
    for old_col in df.columns:
        df = df.withColumnRenamed(old_col, to_snake_case(old_col))

    for col_name in df.columns:
        dtype = df.schema[col_name].dataType

        if isinstance(dtype, StringType):
            df = df.withColumn(
                col_name,
                trim(lower(col(col_name)))
            )
            df = df.withColumn(
                col_name,
                when(col(col_name).isNull() | isnan(col(col_name)), lit("unknown")).otherwise(col(col_name))
            )

        elif isinstance(dtype, IntegerType):
            df = df.withColumn(
                col_name,
                col(col_name).cast("int")
            )

        elif isinstance(dtype, FloatType):
            df = df.withColumn(
                col_name,
                col(col_name).cast("double")
            )

        elif isinstance(dtype, TimestampType):
            df = df.withColumn(col_name, date_format(col(col_name), "yyyy-MM-dd"))
            df = df.withColumn(f"{col_name}_year", year(col(col_name)))
            df = df.withColumn(f"{col_name}_month", month(col(col_name)))
            df = df.withColumn(f"{col_name}_day", dayofmonth(col(col_name)))

    df = df.dropDuplicates()
    output_path = f"abfss://gold@forsynapseaccount.dfs.core.windows.net/Sales/{table}/"
    df.write.format("delta").mode("overwrite").save(output_path)
    display(df)

    if "customer_id" in df.columns and "sales_amount" in df.columns:
        agg_df = df.withColumn(
            "sales_amount_numeric",
            col("sales_amount").cast("double")
        ).groupBy("customer_id").agg(
            count("*").alias("total_transactions"),
            sum("sales_amount_numeric").alias("total_sales"),
            avg("sales_amount_numeric").alias("avg_sales")
        )

        agg_output = f"abfss://gold@forsynapseaccount.dfs.core.windows.net/Sales/{table}_aggregates/"
        agg_df.write.format("delta").mode("overwrite").save(agg_output)
        # display(agg_df)