In [0]:
import os
import logging
import yaml
from datetime import datetime
from pyspark.sql.functions import (
    col, coalesce, lit, trim, regexp_replace, to_timestamp, try_to_timestamp, when, from_unixtime, current_timestamp,udf
)
from pyspark.sql.types import DateType, StringType
import re, dateutil.parser

logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(name)s-%(levelname)s-%(message)s')

def safe_load_yaml(file_path):
    try:
        if not os.path.isfile(file_path):
            logging.error(f"Configuration file not found: {file_path}")
            raise FileNotFoundError(f"Missing configuration file: {file_path}")
        else:
            with open(file_path, 'r') as f:
                return yaml.safe_load(f)
    except Exception as e:
        logging.error(f"Error loading YAML file: {e}")
        raise

def load_bronze_table(catalog, source_schema, source_table):
    try:
        df = spark.read.table(f"{catalog}.{source_schema}.{source_table}")
        logging.info(f"bronze table {source_table} loaded successfully into dataframe")
        return df
    except Exception as e:
        raise RuntimeError(f"Error loading bronze table: {e}")

def write_silver_table(df, catalog, target_schema, target_table):
    try:
        df.write.mode("overwrite").insertInto(f"{catalog}.{target_schema}.{target_table}", overwrite=True)
        logging.info(f"silver table {target_table} written successfully")
    except Exception as e:
        raise RuntimeError(f"Error writing silver table {target_table}: {e}")

def _parse_dirty_date_py(s):
    if s is None:
        return None, "missing"
    s0 = str(s).strip()
    if s0 == "":
        return None, "missing"

    # strip surrounding quotes
    s0 = re.sub(r"^[\"']|[\"']$", "", s0)

    # epoch detection (10 or 13 digits)
    digits = re.sub(r"\D", "", s0)
    try:
        if re.fullmatch(r"\d{13}", digits):
            ts = int(digits) / 1000.0
            dt = datetime.utcfromtimestamp(ts)
            return dt.date(), "epoch_ms"
        if re.fullmatch(r"\d{10}", digits):
            ts = int(digits)
            dt = datetime.utcfromtimestamp(ts)
            return dt.date(), "epoch_s"
    except Exception:
        pass

    # normalize ISO-ish strings
    s1 = re.sub(r"Z$", "", s0)          # strip trailing Z
    s1 = s1.replace("T", " ")           # replace T with space
    s1 = re.sub(r"\.\d{1,6}", "", s1)   # strip fractional seconds

    try:
        dt = dateutil.parser.parse(s1, yearfirst=True, dayfirst=False)
        return dt.date(), "parsed_yearfirst"
    except Exception:
        try:
            dt = dateutil.parser.parse(s1, dayfirst=True, yearfirst=False)
            return dt.date(), "parsed_dayfirst"
        except Exception:
            return None, "unparsed"

# UDFs for Spark
_parse_date_udf = udf(lambda s: _parse_dirty_date_py(s)[0], DateType())
_parse_status_udf = udf(lambda s: _parse_dirty_date_py(s)[1], StringType())

def normalize_date_column_safe(df, raw_col, out_col="OrderDate", status_col="date_status"):
    df2 = df.withColumn("_raw_for_date", trim(col(raw_col).cast("string")))
    df2 = df2.withColumn(out_col, _parse_date_udf(col("_raw_for_date")))
    df2 = df2.withColumn(status_col, _parse_status_udf(col("_raw_for_date")))
    return df2.drop("_raw_for_date")


# load configs
global_config = safe_load_yaml('/Workspace/Users/hritikraj143@gmail.com/Retail-Analytics/Config/global_config.yaml')
catalog = global_config['catalog']
silver_config = safe_load_yaml('/Workspace/Users/hritikraj143@gmail.com/Retail-Analytics/Config/silver_config.yaml')
source_schema = silver_config['silver']['sales_transformed']['source_schema']
source_table = silver_config['silver']['sales_transformed']['source_table']
target_schema = silver_config['silver']['sales_transformed']['target_schema']
target_table = silver_config['silver']['sales_transformed']['target_table']
logging.info('Configs loaded successfully')

# load bronze table
sales_landing = load_bronze_table(catalog, source_schema, source_table)

# remove invalid or null orderid rows
sales_landing = sales_landing.filter(col('OrderID').isNotNull())
logging.info('OrderID: Nulls rows removed')
sales_landing = sales_landing.withColumn('OrderID', sales_landing['OrderID'].cast('BIGINT'))
logging.info('OrderID: typecasted to BIGINT')
sales_landing = sales_landing.withColumn('CustomerID', coalesce(col('CustomerID'), lit('UNKNOWN_CUSTOMER')))
logging.info('CustomerID: Nulls replaced with UNKNOWN_CUSTOMER')
sales_landing = sales_landing.withColumn('ProductID', coalesce(col('ProductID'), lit('UNKNOWN_PRODUCT')))
logging.info('ProductID: Nulls replaced with UNKNOWN_PRODUCT')
sales_landing = normalize_date_column_safe(df=sales_landing, raw_col="Date")
cols_to_drop=["Date","date_status"]
sales_landing=sales_landing.drop(*cols_to_drop)
logging.info('Date: Date format cleaned and column name replaced to OrderDate')
sales_landing=sales_landing.withColumn('Quantity', sales_landing['Quantity'].cast('FLOAT'))
logging.info('Quantity: typecasted to FLOAT')
sales_landing=sales_landing.withColumn('Quantity', sales_landing['Quantity'].cast('INT'))
logging.info('Quantity: typecasted to INT')
sales_landing=sales_landing.withColumn('Is_Quantity_Missing',when(col('Quantity').isNull(),lit(True)).otherwise(lit(False)))
logging.info('New Column Added: Is_Quantity_Missing')
sales_landing=sales_landing.withColumn('Quantity',coalesce(col('Quantity'), lit(0)))
logging.info('Quantity: Nulls replaced with 0')
sales_landing=sales_landing.withColumn('Price', regexp_replace(col('Price'),"[^0-9.-]", ""))
logging.info('Price: strings removed from values')
sales_landing=sales_landing.withColumn('Price', when(col('Price')=="",None).otherwise(col('Price').cast('DOUBLE')))
logging.info('Price: typecasted to DOUBLE')
sales_landing=sales_landing.withColumn('Is_Price_Missing',when(col('Price').isNull(),lit(True)).otherwise(lit(False)))
logging.info('New Column Added: Is_Price_Missing')
sales_transformed=sales_landing
write_silver_table(sales_transformed, catalog, target_schema, target_table)
# display(sales_transformed)