In [None]:
# Databricks notebook source
# COMMAND ----------
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, max, avg, datediff, current_date, when, lit, broadcast

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# COMMAND ----------
def load_data():
    """Load data from Unity Catalog tables."""
    try:
        policy_df = spark.table("catalog.source_db.policy")
        claims_df = spark.table("catalog.source_db.claims")
        demographics_df = spark.table("catalog.source_db.demographics")
        scores_df = spark.table("catalog.source_db.scores")
        aiml_insights_df = spark.table("catalog.source_db.aiml_insights")
        logger.info("Data loaded successfully from Unity Catalog tables.")
        return policy_df, claims_df, demographics_df, scores_df, aiml_insights_df
    except Exception as e:
        logger.error(f"Error loading data from Unity Catalog tables: {e}")
        raise

# COMMAND ----------
def select_and_filter_data(demographics_df):
    """Select relevant fields from demographics data."""
    try:
        selected_demographics_df = demographics_df.select(
            "Customer_ID", "Customer_Name", "Email", "Phone_Number", "Address", "City", "State", 
            "Postal_Code", "Date_of_Birth", "Gender", "Marital_Status", "Occupation", 
            "Income_Level", "Customer_Segment"
        )
        logger.info("Demographics data selected successfully.")
        return selected_demographics_df
    except Exception as e:
        logger.error(f"Error selecting demographics data: {e}")
        raise

# COMMAND ----------
def integrate_data(selected_demographics_df, policy_df, claims_df):
    """Integrate datasets based on key identifiers."""
    try:
        joined_df = selected_demographics_df.join(
            policy_df, selected_demographics_df.Customer_ID == policy_df.customer_id, "inner"
        ).join(
            claims_df, policy_df.policy_id == claims_df.Policy_ID, "inner"
        )
        logger.info("Data integration completed successfully.")
        return joined_df
    except Exception as e:
        logger.error(f"Error during data integration: {e}")
        raise

# COMMAND ----------
def aggregate_and_summarize_data(joined_df):
    """Compute aggregate metrics."""
    try:
        summarized_df = joined_df.groupBy("Customer_ID").agg(
            count("Claim_ID").alias("Total_Claims"),
            count("policy_id").alias("Policy_Count"),
            max("Claim_Date").alias("Recent_Claim_Date"),
            avg("Claim_Amount").alias("Average_Claim_Amount")
        )
        logger.info("Data aggregation and summarization completed successfully.")
        return summarized_df
    except Exception as e:
        logger.error(f"Error during data aggregation and summarization: {e}")
        raise

# COMMAND ----------
def apply_custom_calculations(summarized_df):
    """Derive additional metrics."""
    try:
        final_df = summarized_df.withColumn(
            "Age", datediff(current_date(), col("Date_of_Birth")) / 365
        ).withColumn(
            "Claim_To_Premium_Ratio", when(col("total_premium_paid") != 0, col("Average_Claim_Amount") / col("total_premium_paid")).otherwise(0)
        ).withColumn(
            "Claims_Per_Policy", when(col("Policy_Count") != 0, col("Total_Claims") / col("Policy_Count")).otherwise(0)
        ).withColumn(
            "Retention_Rate", lit(0.85)
        ).withColumn(
            "Cross_Sell_Opportunities", lit("Multi-Policy Discount, Home Coverage Add-on")
        ).withColumn(
            "Upsell_Potential", lit("Premium Vehicle Coverage")
        )
        logger.info("Custom calculations applied successfully.")
        return final_df
    except Exception as e:
        logger.error(f"Error during custom calculations: {e}")
        raise

# COMMAND ----------
def consolidate_data(final_df, aiml_insights_df, scores_df):
    """Integrate insights from AI/ML models and risk scores."""
    try:
        consolidated_df = final_df.join(
            broadcast(aiml_insights_df), "Customer_ID", "inner"
        ).join(
            broadcast(scores_df), "Customer_ID", "inner"
        )
        logger.info("Comprehensive data consolidation completed successfully.")
        return consolidated_df
    except Exception as e:
        logger.error(f"Error during data consolidation: {e}")
        raise

# COMMAND ----------
def write_output(consolidated_df):
    """Write the final DataFrame to a Unity Catalog table."""
    try:
        consolidated_df.write.format("delta").mode("overwrite").saveAsTable("catalog.target_db.customer_360")
        logger.info("Customer 360 data written successfully to Unity Catalog target table.")
    except Exception as e:
        logger.error(f"Error writing Customer 360 data to Unity Catalog target table: {e}")
        raise

# COMMAND ----------
def main():
    policy_df, claims_df, demographics_df, scores_df, aiml_insights_df = load_data()
    selected_demographics_df = select_and_filter_data(demographics_df)
    joined_df = integrate_data(selected_demographics_df, policy_df, claims_df)
    summarized_df = aggregate_and_summarize_data(joined_df)
    final_df = apply_custom_calculations(summarized_df)
    consolidated_df = consolidate_data(final_df, aiml_insights_df, scores_df)
    write_output(consolidated_df)

if __name__ == "__main__":
    main()
