In [0]:
base = "/Volumes/advisory_demo_catalog/ai_bi_reporting/anon_files"
df_acct = spark.read.option("header","true").csv(f"{base}/ANON BST ACCT*.csv")
df_opp = spark.read.option("header","true").csv(f"{base}/ANON BST OPPS*.csv")
df_cust = spark.read.option("header","true").csv(f"{base}/ANON  Cust_*.csv")

(
    df_acct.write
    .option("delta.columnMapping.mode", "name")
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable("advisory_demo_catalog.ai_bi_reporting.anon_bst_acct_bz")
)

(
    df_opp.write
    .option("delta.columnMapping.mode", "name")
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable("advisory_demo_catalog.ai_bi_reporting.anon_bst_opp_bz")
)

(
    df_cust.write
    .option("delta.columnMapping.mode", "name")
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable("advisory_demo_catalog.ai_bi_reporting.anon_cust_intelligence_opportunity_bz")
)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, IntegerType, TimestampType
from pyspark.sql.types import DoubleType

# ==== 0) Base configuration ====
base = "/Volumes/advisory_demo_catalog/ai_bi_reporting/anon_files"

acct_path = f"{base}/ANON BST ACCT*.csv"
opp_path  = f"{base}/ANON BST OPPS*.csv"
cust_path = f"{base}/ANON  Cust_*.csv"

CATALOG = "advisory_demo_catalog"
SCHEMA  = "ai_bi_reporting"

tbl_acct = f"{CATALOG}.{SCHEMA}.anon_bst_acct_bz_2"
tbl_opp  = f"{CATALOG}.{SCHEMA}.anon_bst_opp_bz_2"
tbl_cust = f"{CATALOG}.{SCHEMA}.anon_cust_intelligence_opportunity_bz_2"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA  {SCHEMA}")

# ==== 1) Helper functions for cleaning/casting ====

def clean_number(col):
    """
    Remove common non-numeric characters (currency symbols, spaces, commas, etc.)
    """
    return F.regexp_replace(
        F.regexp_replace(F.col(col), r"^\s*[^\d\-\.,]+\s*", ""),  # remove leading symbols
        r"[,\s]", ""                                              # remove thousand separators
    )

def to_decimal(col, precision=18, scale=2):
    return clean_number(col).cast(DecimalType(precision, scale))

def to_int(col):
    # First cast to double, then to int, or use try_cast logic
    return F.when(
        F.col(col).rlike(r"^\s*-?\d+(\.\d+)?\s*$"),
        F.col(col).cast(DoubleType()).cast(IntegerType())
    ).otherwise(None)

def to_ts(col):
    """
    Try to parse timestamps such as '6/22/2016 0:00' or '2024-12-31 17:00'.
    Add additional formats if needed.
    """
    return F.coalesce(
        F.to_timestamp(F.col(col), "M/d/yyyy H:mm"),
        F.to_timestamp(F.col(col), "M/d/yyyy HH:mm"),
        F.to_timestamp(F.col(col), "yyyy-MM-dd HH:mm:ss"),
        F.to_timestamp(F.col(col), "yyyy/MM/dd HH:mm:ss"),
        F.to_timestamp(F.col(col))  # fallback with Spark inference
    )

def to_date(col):
    """
    Try to parse dates with multiple formats.
    """
    return F.coalesce(
        F.to_date(F.col(col), "M/d/yyyy"),
        F.to_date(F.col(col), "yyyy-MM-dd"),
        F.to_date(F.col(col), "yyyy/MM/dd"),
        F.to_date(F.col(col), "dd-MMM-yyyy"),
        F.to_date(F.col(col))  # fallback with Spark inference
    )

# ==== 2) Read CSV files as strings ====

rd_opts = {"header": "true", "multiLine": "false", "escape": '"', "quote": '"', "inferSchema": "false"}

df_acct_raw = spark.read.options(**rd_opts).csv(acct_path)
df_opp_raw  = spark.read.options(**rd_opts).csv(opp_path)
df_cust_raw = spark.read.options(**rd_opts).csv(cust_path)

# ==== 3) ACCT: cast detected columns ====
df_acct = (
    df_acct_raw
    .withColumn("Id", to_int("Id"))
    .withColumn("FY_Revenue_Target", to_decimal("FY_Revenue_Target", 18, 2))
    .withColumn("CVA_NPS", to_int("CVA_NPS"))
    .withColumn("CVA_CSAT", to_int("CVA_CSAT"))
    # leave other columns as string
)

# ==== 4) OPPS: cast detected columns ====
decimal_cols_opp = [
    "Amount_ACV", "CX_Ops_Overall_ACV", "CX_Ops_Overall_ECR",
    "Amount_ACV_Static", "ECR_Converted", "Amount_ACV_USD"
]
int_cols_opp = ["ID", "AccountID", "Auto_Number", "ContractRenewalMonths"]

df_opp = df_opp_raw
for c in int_cols_opp:
    if c in df_opp.columns:
        df_opp = df_opp.withColumn(c, to_int(c))

for c in decimal_cols_opp:
    if c in df_opp.columns:
        df_opp = df_opp.withColumn(c, to_decimal(c, 18, 2))

for c in ["CloseDate", "LastModifiedDate"]:
    if c in df_opp.columns:
        df_opp = df_opp.withColumn(c, to_ts(c))

# ==== 5) Cust Intelligence: cast detected columns ====
df_cust = df_cust_raw
if "ID" in df_cust.columns:
    df_cust = df_cust.withColumn("ID", to_int("ID"))

# ==== 6) Write to Delta using column mapping to preserve special characters ====

write_opts = {
    "delta.columnMapping.mode": "name",   # allows spaces/special characters in column names
    "overwriteSchema": "true"
}

(df_acct.write
    .options(**write_opts)
    .mode("overwrite")
    .saveAsTable(tbl_acct)
)

(df_opp.write
    .options(**write_opts)
    .mode("overwrite")
    .saveAsTable(tbl_opp)
)

(df_cust.write
    .options(**write_opts)
    .mode("overwrite")
    .saveAsTable(tbl_cust)
)

# ==== 7) Quick checks ====
for t in [tbl_acct, tbl_opp, tbl_cust]:
    print(f"\n===== {t} =====")
    spark.sql(f"DESCRIBE EXTENDED {t}").show(truncate=False)
    spark.sql(f"SELECT * FROM {t} LIMIT 5").show(truncate=False)
