In [0]:
# Import necessary libraries
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, LongType, IntegerType
from pyspark.sql.utils import AnalysisException
from datetime import datetime

# COMMAND ----------

# DBTITLE 1,Configuration and Setup
# Define the source S3 path for the Bronze Delta files
source_catalog = "bronze"
source_schema = "e-commerce-sales"
source_path = f"{source_catalog}.`{source_schema}`."

# Define the target Unity Catalog and schema for the Silver layer
target_catalog = "silver"
target_schema = "e-commerce-sales"
audit_log_table = "audit_logs"

print(f"Source Path: '{source_catalog}.{source_schema}'")
print(f"Destination Schema: '{target_catalog}.{target_schema}'")

# COMMAND ----------

# DBTITLE 1,Audit Logging Function
audit_log_table_full_name = f"{target_catalog}.`{target_schema}`.`{audit_log_table}`"

def log_audit(table_name, status, initial_count=None, final_count=None, message=None):
    """
    Logs a record to the audit_logs table.
    """
    try:
        # Define the schema for the log entry
        log_schema = StructType([
            StructField("timestamp", TimestampType(), False),
            StructField("table_name", StringType(), False),
            StructField("status", StringType(), False),
            StructField("initial_count", LongType(), True),
            StructField("final_count", LongType(), True),
            StructField("message", StringType(), True)
        ])
        
        # Create the log entry data
        log_data = [(
            datetime.now(),
            table_name,
            status,
            initial_count,
            final_count,
            message
        )]
        
        # Create a DataFrame and append it to the audit log table
        log_df = spark.createDataFrame(log_data, schema=log_schema)
        log_df.write.format("delta").mode("append").saveAsTable(audit_log_table_full_name)
        print(f"Logged '{status}' for table '{table_name}'.")

    except Exception as e:
        print(f"FATAL: Could not write to audit log table. Error: {e}")

# Log the start of the entire Silver job
log_audit("Silver Layer Job", "Started", message="Silver layer processing job initiated.")

# COMMAND ----------

# DBTITLE 1,Process 'distribution_centers'
table_name = "distribution_centers"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {"id": 0, "name": "Unknown", "latitude": 0.0, "longitude": 0.0}
    df_filled = df.fillna(defaults)
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    print()
    log_audit(table_name, "Failed", message=str(e))
    
print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'events'
table_name = "events"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {
        "id": 0, "user_id": 0, "sequence_number": 0, "session_id": "0",
        "ip_address": "Unknown", "city": "Unknown", "state": "Unknown",
        "postal_code": "Unknown", "browser": "Unknown", "traffic_source": "Unknown",
        "uri": "Unknown", "event_type": "Unknown"
    }
    df_filled = df.fillna(defaults)
    df_filled = df_filled.withColumn("created_at", F.when(F.col("created_at").isNull(), F.current_timestamp()).otherwise(F.col("created_at")))
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'inventory_items'
table_name = "inventory_items"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {"id": 0, "product_id": 0, "cost": 0.0, "product_category": "Unknown"}
    df_filled = df.fillna(defaults)
    
    df_filled = df_filled.withColumn("created_at", F.when(F.col("created_at").isNull(), F.current_timestamp()).otherwise(F.col("created_at")))
    df_filled = df_filled.withColumn("sold_at", F.when(F.col("sold_at").isNull(), F.current_timestamp()).otherwise(F.col("sold_at")))
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'order_items'
table_name = "order_items"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {"id": 0, "order_id": 0, "user_id": 0, "product_id": 0, "inventory_item_id": 0, "status": "Unknown", "sale_price": 0.0}
    df_filled = df.fillna(defaults)
    
    timestamp_cols = ["created_at", "shipped_at", "delivered_at", "returned_at"]
    for col_name in timestamp_cols:
        df_filled = df_filled.withColumn(col_name, F.when(F.col(col_name).isNull(), F.current_timestamp()).otherwise(F.col(col_name)))
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'orders'
table_name = "orders"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.read.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {"order_id": 0, "user_id": 0, "status": "Unknown", "gender": "Unknown", "num_of_item": 0}
    df_filled = df.fillna(defaults)
    
    timestamp_cols = ["created_at", "shipped_at", "delivered_at", "returned_at"]
    for col_name in timestamp_cols:
        df_filled = df_filled.withColumn(col_name, F.when(F.col(col_name).isNull(), F.current_timestamp()).otherwise(F.col(col_name)))
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["order_id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'products'
table_name = "products"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {"id": 0, "cost": 0.0, "category": "Unknown"}
    df_filled = df.fillna(defaults)
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:

    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

# COMMAND ----------

# DBTITLE 1,Process 'users'
table_name = "users"
bronze_path = source_path + table_name
silver_table_full_name = f"{target_catalog}.`{target_schema}`.`{table_name}`"

try:
    df = spark.read.table(bronze_path)
    initial_count = df.count()
    log_audit(table_name, "Started", initial_count=initial_count)
    
    defaults = {
        "id": 0, "first_name": "Unknown", "last_name": "Unknown", "email": "Unknown",
        "age": 0, "gender": "Unknown", "state": "Unknown", "street_address": "Unknown",
        "postal_code": "Unknown", "city": "Unknown", "country": "Unknown",
        "latitude": 0.0, "longitude": 0.0, "traffic_source": "Unknown"
    }
    df_filled = df.fillna(defaults)
    
    df_filled = df_filled.withColumn("created_at", F.when(F.col("created_at").isNull(), F.current_timestamp()).otherwise(F.col("created_at")))
    
    string_columns = [f.name for f in df_filled.schema.fields if isinstance(f.dataType, StringType)]
    for col_name in string_columns:
        df_filled = df_filled.withColumn(col_name, F.trim(F.col(col_name)))
        
    df_deduplicated = df_filled.dropDuplicates(["id"])
    final_count = df_deduplicated.count()
    
    df_deduplicated.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_full_name)
    log_audit(table_name, "Success", initial_count, final_count, f"Successfully processed and saved to {silver_table_full_name}")

except Exception as e:
    log_audit(table_name, "Failed", message=str(e))

print("-" * 50)

display(spark.table(silver_table_full_name))
# COMMAND ----------

# DBTITLE 1,Finalize Job
log_audit("Silver Layer Job", "Finished", message="Silver layer processing job completed.")


# Preview the `audit_logs` table to check the results.

# COMMAND ----------

display(spark.table(audit_log_table_full_name).orderBy(F.desc("timestamp")))
