# Insurance Risk Profiling - Customer Analytics

**Objective**: Create customer risk profiles and segmentation for SecureLife Insurance

**Business Goals:**
- Assess customer risk levels for pricing optimization
- Segment customers for targeted marketing and retention
- Identify cross-selling opportunities
- Analyze temporal patterns in claims and payments

**Data Flow**: Loads from Notebook 0 foundation tables → Creates risk analysis tables for Notebook 2

## Setup and Database Configuration

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import warnings
warnings.filterwarnings('ignore')

# Initialize Spark session
spark = SparkSession.builder.appName("InsuranceRiskProfiling").getOrCreate()
print("✅ Spark session initialized")

# Database configuration
DATABASE_NAME = "insurance_analytics"
print(f"📊 Using database: {DATABASE_NAME}")

✅ Spark session initialized
📊 Using database: insurance_analytics


## Load Data from Foundation Tables (Notebook 0 Outputs)

In [0]:
# BUSINESS CONTEXT: Data quality and accessibility are fundamental to analytics success
# Using persistent tables ensures reliability and consistency across the pipeline

# TODO: Load foundation tables created by Notebook 0
# 
# Step 1: Load core business entity tables
# TODO: Load customers_df from "insurance_analytics.customers" table using spark.table()
# TODO: Load policies_df from "insurance_analytics.policies" table using spark.table()
# TODO: Load claims_df from "insurance_analytics.claims" table using spark.table()
# TODO: Load payments_df from "insurance_analytics.payments" table using spark.table()
# TODO: Load interactions_df from "insurance_analytics.interactions" table using spark.table()
# 
# Step 2: Validate data loading
# TODO: Print "✅ Foundation tables loaded successfully"
# TODO: Print row counts for each dataset using .count() method
# TODO: Format counts with comma separators (e.g., f"{customers_df.count():,}")
# 
# Step 3: Handle potential errors
# TODO: Wrap loading code in try/except block
# TODO: Print error message if loading fails: "❌ Error loading foundation tables"
# TODO: Print reminder: "💡 Ensure Notebook 0 has been executed successfully"
# TODO: Use "raise" to stop execution if tables cannot be loaded
#
# EXPECTED OUTPUT: All 5 datasets loaded with row counts displayed
# Target: 15K customers, 75K policies, 12K claims, 200K payments, 30K interactions

print("📋 Loading foundation tables from Notebook 0...")

# TODO: Implement data loading logic here
try:
    customers_df = spark.table("insurance_analytics.customers")
    policies_df = spark.table("insurance_analytics.policies")
    claims_df = spark.table("insurance_analytics.claims")
    payments_df = spark.table("insurance_analytics.payments")
    interactions_df = spark.table("insurance_analytics.interactions")
    print("✅ Foundation tables loaded successfully")
    print(f"Customers: {customers_df.count():,}")
    print(f"Policies: {policies_df.count():,}")
    print(f"Claims: {claims_df.count():,}")
    print(f"Payments: {payments_df.count():,}")
except Exception as e:
    print(f"❌ Error loading foundation tables: {e}")
    print("💡 Ensure Notebook 0 has been executed successfully")
    raise

📋 Loading foundation tables from Notebook 0...
✅ Foundation tables loaded successfully
Customers: 15,000
Policies: 75,000
Claims: 10,643
Payments: 178,013


## 1. Customer Risk Assessment

### 1.1 Claims Frequency and Severity Analysis

In [0]:
# BUSINESS CONTEXT: Claims history is the strongest predictor of future risk
# Insurance companies use claims frequency and severity to set premium rates

# TODO: Analyze claims patterns per customer
# 
# Step 1: Calculate claims metrics per customer
# TODO: Group claims_df by "customer_id" 
# TODO: Calculate total_claims using count("claim_id")
# TODO: Calculate total_claim_amount using sum("claim_amount")
# TODO: Calculate avg_claim_amount using avg("claim_amount")
# TODO: Calculate max_claim_amount using max("claim_amount")
# TODO: Calculate policies_with_claims using countDistinct("policy_id")
# TODO: Store result as claims_per_customer
# 
# Step 2: Display claims analysis summary
# TODO: Print "📊 Claims Analysis Summary:"
# TODO: Show descriptive statistics using claims_per_customer.describe().show()
# 
# Step 3: Join claims data with customer profiles
# TODO: Join customers_df with claims_per_customer on "customer_id" using left join
# TODO: Store result as customer_risk_df
# 
# Step 4: Handle customers with no claims
# TODO: Use fillna() to set null values to 0 for: total_claims, total_claim_amount, avg_claim_amount, max_claim_amount, policies_with_claims
# TODO: Print row count: f"✅ Customer risk base created: {customer_risk_df.count():,} customers"
#
# EXPECTED OUTPUT: All customers with claims metrics, nulls filled with 0
# Target: 15K customers with claims data integrated

print("🔍 Analyzing claims patterns...")

# TODO: Implement claims analysis logic here
claims_per_customer = (
    claims_df
    .groupBy("customer_id")
    .agg(
        count("claim_id").alias("total_claims"),
        sum("claim_amount").alias("total_claim_amount"),
        avg("claim_amount").alias("avg_claim_amount"),
        max("claim_amount").alias("max_claim_amount"),
        countDistinct("policy_id").alias("policies_with_claims")
    )
)
print("✅ Claims analysis completed")
print("📊 Claims Analysis Summary:")
claims_per_customer.describe().show()
customer_risk_df = customers_df.join(claims_per_customer, on="customer_id", how="left")
customer_risk_df = customer_risk_df.fillna(0, subset=["total_claims", "total_claim_amount", "avg_claim_amount", "max_claim_amount", "policies_with_claims"])
print(f"✅ Customer risk base created: {customer_risk_df.count():,} customers")

🔍 Analyzing claims patterns...
✅ Claims analysis completed
📊 Claims Analysis Summary:
+-------+-----------+------------------+------------------+------------------+------------------+--------------------+
|summary|customer_id|      total_claims|total_claim_amount|  avg_claim_amount|  max_claim_amount|policies_with_claims|
+-------+-----------+------------------+------------------+------------------+------------------+--------------------+
|  count|       7016|              7016|              7016|              7016|              7016|                7016|
|   mean|       NULL|1.5169612314709235|178509.23731470894|115572.40478289289|147826.10912770836|  1.3691562143671607|
| stddev|       NULL|0.8273635973570767|314826.75338704314|196891.82583189555| 246426.0881660179|  0.6541588435213932|
|    min| CUST000004|                 1|            528.25|            528.25|            528.25|                   1|
|    max| CUST014998|                10|3970436.4699999997|        2418879.07|   

### 1.2 Policy Concentration Risk

In [0]:
# BUSINESS CONTEXT: Customers with multiple policies represent higher value but also concentration risk
# Multi-policy customers typically have lower churn rates but higher total exposure

# TODO: Calculate policy concentration metrics per customer
# 
# Step 1: Aggregate policy data per customer
# TODO: Group policies_df by "customer_id"
# TODO: Calculate total_policies using count("policy_id")
# TODO: Calculate total_premium using sum("premium_amount")
# TODO: Calculate avg_premium using avg("premium_amount")
# TODO: Calculate total_coverage using sum("coverage_amount")
# TODO: Calculate policy_types_count using countDistinct("policy_type")
# TODO: Collect policy_types using collect_list("policy_type")
# TODO: Store result as policy_metrics
# 
# Step 2: Display policy concentration analysis
# TODO: Print "📊 Policy Concentration Analysis:"
# TODO: Show descriptive statistics using policy_metrics.describe().show()
# 
# Step 3: Join policy metrics with customer risk data
# TODO: Join customer_risk_df with policy_metrics on "customer_id" using left join
# TODO: Update customer_risk_df with joined result
# 
# Step 4: Handle customers without policies (edge case protection)
# TODO: Use fillna() to set null values to 0 for: total_policies, total_premium, avg_premium, total_coverage, policy_types_count
# TODO: Print "✅ Policy concentration metrics integrated"
#
# EXPECTED OUTPUT: All customers with policy concentration metrics
# Target: Policy counts ranging from 1-5, premium amounts $500-$15,000

print("🔍 Analyzing policy concentration...")

# TODO: Implement policy concentration analysis logic here
policy_metrics = (
    policies_df
    .groupBy("customer_id")
    .agg(
        count("policy_id").alias("total_policies"),
        sum("premium_amount").alias("total_premium"),
        avg("premium_amount").alias("avg_premium"),
        sum("coverage_amount").alias("total_coverage"),
        countDistinct("policy_type").alias("policy_types_count"),
        collect_list("policy_type").alias("policy_types")
    )
)
print("✅ Policy concentration analysis completed")
print("📊 Policy Concentration Analysis:")
policy_metrics.describe().show()
customer_risk_df = customer_risk_df.join(policy_metrics, on="customer_id", how="left")
customer_risk_df = customer_risk_df.fillna(0, subset=["total_policies", "total_premium", "avg_premium", "total_coverage", "policy_types_count"])
print("✅ Policy concentration metrics integrated")

🔍 Analyzing policy concentration...
✅ Policy concentration analysis completed
📊 Policy Concentration Analysis:
+-------+-----------+------------------+------------------+-----------------+-----------------+------------------+
|summary|customer_id|    total_policies|     total_premium|      avg_premium|   total_coverage|policy_types_count|
+-------+-----------+------------------+------------------+-----------------+-----------------+------------------+
|  count|      14905|             14905|             14905|            14905|            14905|             14905|
|   mean|       NULL| 5.031868500503187|28289.118016773045|5658.338106322954|1201268.250318685|3.0864139550486414|
| stddev|       NULL|2.2066158280630606|27800.390976173603|5810.436355811093|874976.2352750759|1.0537328155711343|
|    min| CUST000001|                 1|            264.42|           264.42|            10000|                 1|
|    max| CUST015000|                16|269471.41000000003|         104000.0|       

### 1.3 Payment Behavior Integration

In [0]:
# BUSINESS CONTEXT: Payment behavior indicates customer reliability and financial stability
# Late payments and failures correlate with higher risk and potential churn

# TODO: Analyze customer payment behavior patterns
# 
# Step 1: Calculate payment behavior metrics
# TODO: Group payments_df by "customer_id"
# TODO: Calculate total_payments using count("payment_id")
# TODO: Calculate total_payment_amount using sum("payment_amount")
# TODO: Calculate avg_payment_amount using avg("payment_amount")
# TODO: Calculate late_payments_count using sum(when(col("late_payment_flag") == True, 1).otherwise(0))
# TODO: Calculate failed_payments_count using sum(when(col("payment_status") == "Failed", 1).otherwise(0))
# TODO: Store result as payment_behavior
# 
# Step 2: Calculate payment reliability score
# TODO: Add payment_reliability_score column to payment_behavior
# TODO: Use when/otherwise logic:
#       - If total_payments == 0, set score to 1.0
#       - Otherwise: 1.0 - (late_payments_count + failed_payments_count * 2) / total_payments
# TODO: This formula penalizes failed payments more heavily than late payments
# 
# Step 3: Join payment behavior with customer risk data
# TODO: Join customer_risk_df with payment_behavior on "customer_id" using left join
# TODO: Update customer_risk_df with joined result
# 
# Step 4: Handle customers without payment history
# TODO: Use fillna() to set null values to 0 for: total_payments, total_payment_amount, avg_payment_amount, late_payments_count, failed_payments_count
# TODO: Set payment_reliability_score to 1.0 for customers without payment history
# TODO: Print "✅ Payment behavior metrics integrated"
#
# EXPECTED OUTPUT: All customers with payment reliability scores between 0.0-1.0
# Target: Most customers should have scores 0.85-1.0 (high reliability)

print("🔍 Analyzing payment behavior...")

# TODO: Implement payment behavior analysis logic here
payment_behavior = (
    payments_df
    .groupBy("customer_id")
    .agg(
        count("payment_id").alias("total_payments"),
        sum("payment_amount").alias("total_payment_amount"),
        avg("payment_amount").alias("avg_payment_amount"),
        sum(when(col("late_payment_flag") == True, 1).otherwise(0)).alias("late_payments_count"),
        sum(when(col("payment_status") == "Failed", 1).otherwise(0)).alias("failed_payments_count")
    )
)
payment_behavior = (
    payment_behavior
    .withColumn("payment_reliability_score", 
    when(col("total_payments") == 0, 1.0)
    .otherwise(1.0 - (col("late_payments_count") + col("failed_payments_count") * 2) / col("total_payments"))
))
customer_risk_df = customer_risk_df.join(payment_behavior, on="customer_id", how="left")
customer_risk_df = customer_risk_df.fillna(0, subset=["total_payments", "total_payment_amount", "avg_payment_amount", "late_payments_count", "failed_payments_count"])
customer_risk_df = customer_risk_df.fillna(1.0, subset=["payment_reliability_score"])
print("✅ Payment behavior metrics integrated")

🔍 Analyzing payment behavior...
✅ Payment behavior metrics integrated


### 1.4 Calculate Comprehensive Risk Scores

In [0]:
# BUSINESS CONTEXT: Risk scoring enables personalized pricing and targeted risk management
# Multiple risk factors provide more accurate assessment than single indicators

# TODO: Calculate comprehensive risk scores using multiple risk factors
# 
# Step 1: Calculate claims risk score
# TODO: Create claims_risk_score column using when/otherwise logic:
#       - 0 claims = 1.0 (lowest risk)
#       - 1 claim = 2.0 (medium risk)
#       - 2 claims = 3.0 (higher risk)
#       - 3+ claims = 4.0 (highest risk)
# TODO: Default to 1.0 for any other cases
# 
# Step 2: Calculate credit risk score
# TODO: Create credit_risk_score column based on credit_score:
#       - >=750 = 1.0 (excellent credit)
#       - >=700 = 2.0 (good credit)
#       - >=650 = 3.0 (fair credit)
#       - >=600 = 4.0 (poor credit)
#       - <600 = 5.0 (very poor credit)
# 
# Step 3: Calculate age risk score
# TODO: First calculate age column: floor(datediff(current_date(), col("birth_date")) / 365.25)
# TODO: Create age_risk_score column:
#       - <25 = 3.0 (young drivers, higher risk)
#       - 25-65 = 1.0 (prime age, lowest risk)
#       - 66-75 = 2.0 (senior, moderate risk)
#       - >75 = 3.0 (very senior, higher risk)
# 
# Step 4: Calculate premium and payment risk scores
# TODO: Create premium_risk_score based on total_premium:
#       - >=5000 = 3.0 (high premium concentration)
#       - >=2000 = 2.0 (medium concentration)
#       - <2000 = 1.0 (low concentration)
# TODO: Create payment_risk_score based on payment_reliability_score:
#       - >=0.95 = 1.0 (excellent reliability)
#       - >=0.85 = 2.0 (good reliability)
#       - >=0.75 = 3.0 (fair reliability)
#       - <0.75 = 4.0 (poor reliability)
# 
# Step 5: Calculate composite risk score
# TODO: Create composite_risk_score as weighted average:
#       - claims_risk_score * 0.30 (30% weight)
#       - credit_risk_score * 0.25 (25% weight)
#       - age_risk_score * 0.15 (15% weight)
#       - premium_risk_score * 0.15 (15% weight)
#       - payment_risk_score * 0.15 (15% weight)
# TODO: Print "✅ Comprehensive risk scores calculated"
# TODO: Show risk score distribution using customer_risk_df.select("composite_risk_score").describe().show()
#
# EXPECTED OUTPUT: All customers with composite_risk_score between 1.0-4.0
# Target: Bell curve distribution with most customers between 1.5-2.5

print("🔍 Calculating comprehensive risk scores...")

# TODO: Implement comprehensive risk scoring logic here
customer_risk_df = (customer_risk_df.withColumn("claims_risk_score", when(col("total_claims") == 0, 1.0)
.when(col("total_claims") == 1, 2.0)
.when(col("total_claims") == 2, 3.0)
.when(col("total_claims") >= 3, 4.0)
.otherwise(1.0))
.withColumn("credit_risk_score", when(col("credit_score") >= 750, 1.0)
.when(col("credit_score") >= 700, 2.0)
.when(col("credit_score") >= 650, 3.0)
.when(col("credit_score") >= 600, 4.0)
.otherwise(5.0))
.withColumn("age", floor(datediff(current_date(), col("birth_date")) / 365.25))
.withColumn("age_risk_score", when(col("age") < 25, 3.0)
.when((col("age") >= 25) & (col("age") <= 65), 1.0)
.when((col("age") >= 66) & (col("age") <= 75), 2.0)
.otherwise(3.0))
.withColumn("premium_risk_score", when(col("total_premium") >= 5000, 3.0)
.when(col("total_premium") >= 2000, 2.0)
.otherwise(1.0))
.withColumn("payment_reliability_score", col("total_payments") / col("total_payment_amount"))
.withColumn("payment_risk_score", when(col("payment_reliability_score") >= 0.95, 1.0)
.when(col("payment_reliability_score") >= 0.85, 2.0)
.when(col("payment_reliability_score") >= 0.75, 3.0)
.otherwise(4.0))
.withColumn("composite_risk_score", (col("claims_risk_score") * 0.30) + (col("credit_risk_score") * 0.25) + (col("age_risk_score") * 0.15) + (col("premium_risk_score") * 0.15) + (col("payment_risk_score") * 0.15)))
print("✅ Comprehensive risk scores calculated")
customer_risk_df.select("composite_risk_score").describe().show()

🔍 Calculating comprehensive risk scores...
✅ Comprehensive risk scores calculated
+-------+--------------------+
|summary|composite_risk_score|
+-------+--------------------+
|  count|               15000|
|   mean|  2.2077399999998497|
| stddev|  0.3899084007439871|
|    min|  1.4500000000000002|
|    max|  3.9500000000000006|
+-------+--------------------+



## 2. Risk-Based Customer Segmentation

### 2.1 Risk Category Assignment

In [0]:
# BUSINESS CONTEXT: Risk categorization enables targeted pricing and risk management strategies
# Industry standard practice divides customers into Low/Medium/High risk segments

# TODO: Assign risk categories based on composite risk scores
# 
# Step 1: Create risk categories
# TODO: Create risk_category column using when/otherwise logic:
#       - composite_risk_score <= 1.75 = "Low"
#       - composite_risk_score <= 2.75 = "Medium"
#       - composite_risk_score > 2.75 = "High"
# 
# Step 2: Analyze risk category distribution
# TODO: Group by risk_category and count customers
# TODO: Order by count descending
# TODO: Store result as risk_distribution
# TODO: Print "📊 Risk Category Distribution:"
# TODO: Show risk_distribution
# 
# Step 3: Calculate risk category percentages
# TODO: Get total_customers using customer_risk_df.count()
# TODO: Add percentage column to risk_distribution: (count / total_customers) * 100
# TODO: Round to 2 decimal places
# TODO: Print "📊 Risk Category Percentages:"
# TODO: Show risk_distribution with percentages
#
# EXPECTED OUTPUT: Risk categories with counts and percentages
# Target: Approximately 60% Low, 30% Medium, 10% High risk distribution

print("🎯 Assigning risk categories...")

# TODO: Implement risk category assignment logic here
customer_risk_df = customer_risk_df.withColumn("risk_category", when(col("composite_risk_score") <= 1.75, "Low")
.when(col("composite_risk_score") <= 2.75, "Medium")
.otherwise("High"))
risk_distribution = customer_risk_df.groupBy("risk_category").count().orderBy(col("count").desc())
total_customers = customer_risk_df.count()
risk_distribution = risk_distribution.withColumn("percentage", round((col("count") / total_customers) * 100, 2))
print("📊 Risk Category Distribution:")
risk_distribution.show()

🎯 Assigning risk categories...
📊 Risk Category Distribution:
+-------------+-----+----------+
|risk_category|count|percentage|
+-------------+-----+----------+
|       Medium|10464|     69.76|
|          Low| 2985|      19.9|
|         High| 1551|     10.34|
+-------------+-----+----------+



### 2.2 Value Segment Assignment

In [0]:
# BUSINESS CONTEXT: Value segmentation enables customer prioritization and resource allocation
# High-value customers receive premium service and retention efforts

# TODO: Create customer value segments based on premium amounts
# 
# Step 1: Calculate value segment thresholds
# TODO: Use approxQuantile on total_premium column with quantiles [0.7, 0.9] and accuracy 0.05
# TODO: Store results as premium_percentiles
# TODO: Extract medium_value_threshold = premium_percentiles[0] (70th percentile)
# TODO: Extract high_value_threshold = premium_percentiles[1] (90th percentile)
# 
# Step 2: Print value segment thresholds
# TODO: Print "📊 Value Segment Thresholds:"
# TODO: Print f"   High Value (top 10%): ${high_value_threshold:,.2f}+"
# TODO: Print f"   Medium Value (70-90%): ${medium_value_threshold:,.2f} - ${high_value_threshold:,.2f}"
# TODO: Print f"   Low Value (bottom 70%): < ${medium_value_threshold:,.2f}"
# 
# Step 3: Assign value segments
# TODO: Create value_segment column using when/otherwise logic:
#       - total_premium >= high_value_threshold = "High Value"
#       - total_premium >= medium_value_threshold = "Medium Value"
#       - total_premium < medium_value_threshold = "Low Value"
# 
# Step 4: Analyze value segment distribution
# TODO: Group by value_segment and count customers
# TODO: Order by count descending
# TODO: Store result as value_distribution
# TODO: Print "📊 Value Segment Distribution:"
# TODO: Show value_distribution
#
# EXPECTED OUTPUT: Value segments with approximately 10% High, 20% Medium, 70% Low
# Target: Thresholds around $3,000 (medium) and $8,000 (high)

print("💰 Assigning value segments...")

# TODO: Implement value segment assignment logic here
premium_percentiles = customer_risk_df.approxQuantile("total_premium", [0.7, 0.9], 0.05)
medium_value_threshold = premium_percentiles[0]
high_value_threshold = premium_percentiles[1]
print(f"📊 Value Segment Thresholds:")
print(f"   High Value (top 10%): ${high_value_threshold:,.2f}+")
print(f"   Medium Value (70-90%): ${medium_value_threshold:,.2f} - ${high_value_threshold:,.2f}")
print(f"   Low Value (bottom 70%): < ${medium_value_threshold:,.2f}")
customer_risk_df = customer_risk_df.withColumn("value_segment", when(col("total_premium") >= high_value_threshold, "High Value")
.when(col("total_premium") >= medium_value_threshold, "Medium Value")
.otherwise("Low Value"))

💰 Assigning value segments...
📊 Value Segment Thresholds:
   High Value (top 10%): $54,753.91+
   Medium Value (70-90%): $29,979.52 - $54,753.91
   Low Value (bottom 70%): < $29,979.52


### 2.3 Risk vs Value Matrix

In [0]:
# BUSINESS CONTEXT: Risk-value matrix enables strategic customer management
# Different combinations require different strategies (retain, price, manage risk)

# TODO: Create comprehensive risk-value matrix analysis
# 
# Step 1: Create risk-value matrix
# TODO: Group by risk_category and value_segment
# TODO: Calculate customer_count using count("*")
# TODO: Calculate avg_risk_score using avg("composite_risk_score")
# TODO: Calculate total_premium_segment using sum("total_premium")
# TODO: Calculate avg_premium_segment using avg("total_premium")
# TODO: Order by risk_category, value_segment
# TODO: Store result as risk_value_matrix
# 
# Step 2: Display risk-value matrix
# TODO: Print "📊 Risk vs Value Matrix Analysis:"
# TODO: Show risk_value_matrix
# 
# Step 3: Calculate segment percentages
# TODO: Add segment_percentage column to risk_value_matrix
# TODO: Calculate as (customer_count / total_customers) * 100
# TODO: Round to 2 decimal places
# TODO: Print "📊 Risk-Value Matrix with Percentages:"
# TODO: Show matrix with percentages
#
# EXPECTED OUTPUT: 9-cell matrix showing customer distribution across risk-value combinations
# Target: High Value-Low Risk should be priority segment for retention

print("📊 Risk vs Value Matrix Analysis:")
# TODO: Implement risk-value matrix analysis logic here
risk_value_matrix = customer_risk_df.groupBy("risk_category", "value_segment").agg(count("*").alias("customer_count"), avg("composite_risk_score").alias("avg_risk_score"), sum("total_premium").alias("total_premium_segment"), avg("total_premium").alias("avg_premium_segment")).orderBy("risk_category", "value_segment")

display(risk_value_matrix)

print("📊 Risk-Value Matrix with Percentages:")

total_customers = customer_risk_df.count()
risk_value_matrix = risk_value_matrix.withColumn("segment_percentage", round((col("customer_count") / total_customers) * 100, 2))
total_premium_all = customer_risk_df.select(sum("total_premium")).first()[0]

display(risk_value_matrix)

📊 Risk vs Value Matrix Analysis:


risk_category,value_segment,customer_count,avg_risk_score,total_premium_segment,avg_premium_segment
High,High Value,377,2.99217506631299,33068322.57000001,87714.38347480108
High,Low Value,800,2.9614999999999942,11936305.16000001,14920.381450000012
High,Medium Value,374,2.981016042780743,15200660.51,40643.47729946524
Low,High Value,292,1.75,23210822.69,79489.11880136986
Low,Low Value,2212,1.7056509945750282,23078249.13000002,10433.204850813752
Low,Medium Value,481,1.75,19283567.589999992,40090.57711018709
Medium,High Value,1524,2.275229658792665,123557843.62999988,81074.70054461934
Medium,Low Value,6825,2.2142490842490266,85917440.00999984,12588.635898901077
Medium,Medium Value,2115,2.268841607565047,86396092.74999985,40849.21643025998


📊 Risk-Value Matrix with Percentages:


risk_category,value_segment,customer_count,avg_risk_score,total_premium_segment,avg_premium_segment,segment_percentage
High,High Value,377,2.99217506631299,33068322.57000001,87714.38347480108,2.51
High,Low Value,800,2.9614999999999942,11936305.16000001,14920.381450000012,5.33
High,Medium Value,374,2.981016042780743,15200660.51,40643.47729946524,2.49
Low,High Value,292,1.75,23210822.69,79489.11880136986,1.95
Low,Low Value,2212,1.7056509945750282,23078249.13000002,10433.204850813752,14.75
Low,Medium Value,481,1.75,19283567.589999992,40090.57711018709,3.21
Medium,High Value,1524,2.275229658792665,123557843.62999988,81074.70054461934,10.16
Medium,Low Value,6825,2.2142490842490266,85917440.00999984,12588.635898901077,45.5
Medium,Medium Value,2115,2.268841607565047,86396092.74999985,40849.21643025998,14.1


## 3. Policy Portfolio Analysis

### 3.1 Policy Type Distribution Analysis

In [0]:
# BUSINESS CONTEXT: Policy mix analysis reveals portfolio composition and growth opportunities
# Understanding policy preferences helps optimize product offerings and pricing

# TODO: Analyze policy type distribution across the portfolio
# 
# Step 1: Calculate policy type metrics
# TODO: Group policies_df by "policy_type"
# TODO: Calculate policy_count using count("*")
# TODO: Calculate total_premium using sum("premium_amount")
# TODO: Calculate avg_premium using avg("premium_amount")
# TODO: Calculate total_coverage using sum("coverage_amount")
# TODO: Calculate unique_customers using countDistinct("customer_id")
# TODO: Order by policy_count descending
# TODO: Store result as policy_type_dist
# 
# Step 2: Display policy type distribution
# TODO: Print "📊 Policy Type Distribution:"
# TODO: Show policy_type_dist
# 
# Step 3: Calculate policy type percentages
# TODO: Get total_policies using policies_df.count()
# TODO: Add policy_percentage column: (policy_count / total_policies) * 100
# TODO: Calculate total_premium_all = sum of all total_premium values
# TODO: Add premium_percentage column: (total_premium / total_premium_all) * 100
# TODO: Round both percentages to 2 decimal places
# TODO: Print "📊 Policy Type Analysis with Percentages:"
# TODO: Show policy_type_dist with percentages
#
# EXPECTED OUTPUT: Policy types ranked by count with premium percentages
# Target: Auto (30%), Home (25%), Life (20%), Health (15%), Other (10%)

print("📋 Analyzing policy portfolio...")

# TODO: Implement policy type distribution analysis logic here
policy_type_dist = (
    policies_df
    .groupBy("policy_type")
    .agg(
        count("*").alias("policy_count"),
        sum("premium_amount").alias("total_premium"),
        avg("premium_amount").alias("avg_premium"),
        sum("coverage_amount").alias("total_coverage"),
        countDistinct("customer_id").alias("unique_customers")
    )
    .orderBy(desc("policy_count"))
)

print("📊 Policy Type Distribution:")
display(policy_type_dist)

total_policies = policies_df.count()

policy_type_dist = policy_type_dist.withColumn(
    "policy_percentage",
    round((col("policy_count") / total_policies) * 100, 2)
)

total_premium_all = (
    policy_type_dist
    .select(sum("total_premium").alias("sum_premium"))
    .collect()[0]["sum_premium"]
)

policy_type_dist = policy_type_dist.withColumn(
    "premium_percentage",
    round((col("total_premium") / total_premium_all) * 100, 2)
)

print("📊 Policy Type Analysis with Percentages:")
display(policy_type_dist)

📋 Analyzing policy portfolio...
📊 Policy Type Distribution:


policy_type,policy_count,total_premium,avg_premium,total_coverage,unique_customers
Auto,22294,62011500.0,2781.533147932179,3124625000,11522
Home,18891,25136027.79000001,1330.5821708750204,3263139841,10732
Life,15164,36057095.92000006,2377.809016090745,7521759620,9474
Health,11745,279175200.0,23769.70625798212,3554100000,8136
Travel,3535,16036275.0,4536.428571428572,108640000,3137
Business,3371,3233205.3300000075,959.1235034114528,332638810,3002


📊 Policy Type Analysis with Percentages:


policy_type,policy_count,total_premium,avg_premium,total_coverage,unique_customers,policy_percentage,premium_percentage
Auto,22294,62011500.0,2781.533147932179,3124625000,11522,29.73,14.71
Home,18891,25136027.79000001,1330.5821708750204,3263139841,10732,25.19,5.96
Life,15164,36057095.92000006,2377.809016090745,7521759620,9474,20.22,8.55
Health,11745,279175200.0,23769.70625798212,3554100000,8136,15.66,66.21
Travel,3535,16036275.0,4536.428571428572,108640000,3137,4.71,3.8
Business,3371,3233205.3300000075,959.1235034114528,332638810,3002,4.49,0.77


### 3.2 Cross-Selling Opportunity Analysis

In [0]:
# BUSINESS CONTEXT: Cross-selling increases customer lifetime value and reduces churn
# Single-policy customers represent the highest cross-selling potential

# TODO: Identify cross-selling opportunities based on policy diversity
# 
# Step 1: Analyze customers by policy diversity
# TODO: Group customer_risk_df by "policy_types_count"
# TODO: Calculate customer_count using count("*")
# TODO: Calculate avg_premium using avg("total_premium")
# TODO: Calculate avg_risk_score using avg("composite_risk_score")
# TODO: Order by policy_types_count
# TODO: Store result as policy_diversity
# 
# Step 2: Display policy diversity analysis
# TODO: Print "📊 Customer Distribution by Policy Diversity:"
# TODO: Show policy_diversity
# 
# Step 3: Identify cross-selling opportunities
# TODO: Filter customer_risk_df for policy_types_count == 1, store as single_policy_customers
# TODO: Filter customer_risk_df for policy_types_count > 1, store as multi_policy_customers
# TODO: Calculate cross_sell_opportunity = single_policy_customers.count()
# TODO: Calculate cross_sell_percentage = (cross_sell_opportunity / total_customers) * 100
# 
# Step 4: Display cross-selling insights
# TODO: Print "🎯 Cross-Selling Opportunities:"
# TODO: Print f"   Single Policy Customers: {cross_sell_opportunity:,} ({cross_sell_percentage:.1f}%)"
# TODO: Print f"   Multi-Policy Customers: {multi_policy_customers.count():,}"
# 
# Step 5: Identify high-value cross-selling targets
# TODO: Filter single_policy_customers for value_segment != "Low Value"
# TODO: Select: customer_id, risk_category, value_segment, total_premium, policy_types_count
# TODO: Store as high_value_single_policy
# TODO: Print f"🎯 High-Value Single Policy Customers (Priority Targets): {high_value_single_policy.count():,}"
#
# EXPECTED OUTPUT: Cross-selling opportunity analysis with priority targets identified
# Target: 60% single-policy customers with 15-20% being high-value targets

print("🎯 Identifying cross-selling opportunities...")

# TODO: Implement cross-selling opportunity analysis logic here
policy_diversity = (
    customer_risk_df
    .groupBy("policy_types_count")
    .agg(
        count("*").alias("customer_count"),
        avg("total_premium").alias("avg_premium"),
        avg("composite_risk_score").alias("avg_risk_score")
    )
    .orderBy("policy_types_count")
)
print("📊 Customer Distribution by Policy Diversity:")
display(policy_diversity)
single_policy_customers = customer_risk_df.filter(col("policy_types_count") == 1)
multi_policy_customers = customer_risk_df.filter(col("policy_types_count") > 1)
cross_sell_opportunity = single_policy_customers.count()
cross_sell_percentage = (cross_sell_opportunity / customer_risk_df.count()) * 100
print(f"🎯 Cross-Selling Opportunities:")
print(f"   Single Policy Customers: {cross_sell_opportunity:,} ({cross_sell_percentage:.1f}%)")
print(f"   Multi-Policy Customers: {multi_policy_customers.count():,}")
high_value_single_policy = single_policy_customers.filter(col("value_segment") != "Low Value").select("customer_id", "risk_category", "value_segment", "total_premium", "policy_types_count")
print(f"🎯 High-Value Single Policy Customers (Priority Targets): {high_value_single_policy.count():,}")


🎯 Identifying cross-selling opportunities...
📊 Customer Distribution by Policy Diversity:


policy_types_count,customer_count,avg_premium,avg_risk_score
0,95,0.0,1.700526315789473
1,925,7673.173297297296,1.96762162162162
2,3403,17445.977660887453,2.129782544813431
3,5426,26825.77469037956,2.2211666052340338
4,3879,38646.89687548336,2.279453467388536
5,1154,46244.94351819761,2.3480502599653423
6,118,53799.36855932205,2.399576271186441


🎯 Cross-Selling Opportunities:
   Single Policy Customers: 925 (6.2%)
   Multi-Policy Customers: 13,980
🎯 High-Value Single Policy Customers (Priority Targets): 58


### 3.3 Portfolio Risk Concentration

In [0]:
# BUSINESS CONTEXT: Risk concentration analysis helps identify portfolio vulnerabilities
# Geographic and policy type concentration can amplify losses during adverse events

# TODO: Analyze portfolio risk concentration by policy type and geography
# 
# Step 1: Analyze risk concentration by policy type
# TODO: Join policies_df with customer_risk_df on "customer_id"
# TODO: Select customer_id, risk_category, composite_risk_score from customer_risk_df
# TODO: Group by policy_type and risk_category
# TODO: Calculate policy_count using count("*")
# TODO: Calculate total_premium using sum("premium_amount")
# TODO: Calculate avg_premium using avg("premium_amount")
# TODO: Order by policy_type, risk_category
# TODO: Store result as policy_risk_analysis
# 
# Step 2: Display policy risk analysis
# TODO: Print "📊 Portfolio Risk Distribution by Policy Type:"
# TODO: Show policy_risk_analysis
# 
# Step 3: Analyze geographic risk concentration
# TODO: Group customer_risk_df by "state"
# TODO: Calculate customer_count using count("*")
# TODO: Calculate avg_risk_score using avg("composite_risk_score")
# TODO: Calculate total_premium_state using sum("total_premium")
# TODO: Order by customer_count descending
# TODO: Store result as geographic_risk
# 
# Step 4: Display geographic risk concentration
# TODO: Print "📊 Geographic Risk Concentration (Top 10 States):"
# TODO: Show top 10 states using geographic_risk.show(10)
#
# EXPECTED OUTPUT: Risk concentration analysis by policy type and state
# Target: Identify states/policy types with high risk concentration

print("⚖️ Analyzing portfolio risk concentration...")

# TODO: Implement portfolio risk concentration analysis logic here
policy_risk_analysis = (
    policies_df
    .join(customer_risk_df, "customer_id")
    .select("customer_id", "risk_category", "composite_risk_score", "policy_type", "premium_amount")
    .groupBy("policy_type", "risk_category")
    .agg(
        count("*").alias("policy_count"),
        sum("premium_amount").alias("total_premium"),
        avg("premium_amount").alias("avg_premium")
    )
    .orderBy("policy_type", "risk_category")
)
print("📊 Portfolio Risk Distribution by Policy Type:")

⚖️ Analyzing portfolio risk concentration...
📊 Portfolio Risk Distribution by Policy Type:


## 4. Temporal Pattern Analysis

### 4.1 Seasonal Claims Patterns

In [0]:
# BUSINESS CONTEXT: Seasonal patterns help predict claims volume and adjust reserves
# Weather-related claims typically spike in certain months (winter storms, summer hail)

# TODO: Analyze seasonal patterns in claims data
# 
# Step 1: Add temporal components to claims data
# TODO: Create claims_temporal by adding these columns to claims_df:
#       - claim_month using month("claim_date")
#       - claim_quarter using quarter("claim_date")
#       - claim_year using year("claim_date")
# 
# Step 2: Calculate monthly claims pattern
# TODO: Group claims_temporal by "claim_month"
# TODO: Calculate claims_count using count("claim_id")
# TODO: Calculate total_claim_amount using sum("claim_amount")
# TODO: Calculate avg_claim_amount using avg("claim_amount")
# TODO: Order by claim_month
# TODO: Store result as monthly_claims
# 
# Step 3: Display monthly claims pattern
# TODO: Print "📊 Monthly Claims Pattern:"
# TODO: Show all 12 months using monthly_claims.show(12)
# 
# Step 4: Create seasonal analysis
# TODO: Add season column to claims_temporal using when/otherwise logic:
#       - Months 12, 1, 2 = "Winter"
#       - Months 3, 4, 5 = "Spring"
#       - Months 6, 7, 8 = "Summer"
#       - Otherwise = "Fall"
# TODO: Group by season and calculate: claims_count, total_claim_amount, avg_claim_amount
# TODO: Order by claims_count descending
# TODO: Store result as seasonal_claims
# 
# Step 5: Display seasonal analysis
# TODO: Print "📊 Seasonal Claims Analysis:"
# TODO: Show seasonal_claims
#
# EXPECTED OUTPUT: Monthly and seasonal claims patterns
# Target: Identify peak months/seasons for claims activity

print("📅 Analyzing seasonal claims patterns...")

# TODO: Implement seasonal claims pattern analysis logic here
claims_df = (
    claims_df
    .withColumn("claim_month", month("claim_date"))
    .withColumn("claim_quarter", quarter("claim_date"))
    .withColumn("claim_year", year("claim_date"))
)
monthly_claims = (
    claims_df
    .groupBy("claim_month")
    .agg(
        count("*").alias("claims_count"),
        sum("claim_amount").alias("total_claim_amount"),
        avg("claim_amount").alias("avg_claim_amount")
    )
    .orderBy("claim_month")
)
print("📊 Monthly Claims Pattern:")
monthly_claims.show(12)
seasonal_claims = (
    claims_df
    .withColumn("season", 
        when(month("claim_date").isin([12, 1, 2]), "Winter")
        .when(month("claim_date").isin([3, 4, 5]), "Spring")
        .when(month("claim_date").isin([6, 7, 8]), "Summer")
        .otherwise("Fall")
    )
    .groupBy("season")
    .agg(
        count("*").alias("claims_count"),
        sum("claim_amount").alias("total_claim_amount"),
        avg("claim_amount").alias("avg_claim_amount")
    )
    .orderBy(desc("claims_count"))
)
print("📊 Seasonal Claims Analysis:")
seasonal_claims.show()

📅 Analyzing seasonal claims patterns...
📊 Monthly Claims Pattern:
+-----------+------------+--------------------+------------------+
|claim_month|claims_count|  total_claim_amount|  avg_claim_amount|
+-----------+------------+--------------------+------------------+
|          1|         849| 1.038257335800001E8|122291.79455830401|
|          2|         832|1.0437907649999997E8|125455.62079326919|
|          3|         972|1.2181834916000003E8|125327.51971193419|
|          4|         982|1.1091925937000005E8|112952.40261710799|
|          5|        1182|1.4780195363999993E8|125043.95401015223|
|          6|        1337|1.6001307719000006E8|119680.68600598359|
|          7|         791| 8.153647679000004E7|103080.24878634645|
|          8|         711| 8.079758383999991E7|113639.35842475375|
|          9|         697|       8.136679335E7| 116738.5844332855|
|         10|         770| 9.467202950000004E7|122950.68766233772|
|         11|         738| 8.108266671999998E7|109868.112086720

### 4.2 Payment Behavior Patterns

In [0]:
# BUSINESS CONTEXT: Payment timing patterns reveal customer cash flow cycles
# Understanding payment preferences helps optimize billing and collection strategies

# TODO: Analyze payment behavior patterns over time
# 
# Step 1: Add temporal components to payments data
# TODO: Create payments_temporal by adding these columns to payments_df:
#       - payment_month using month("payment_date")
#       - payment_quarter using quarter("payment_date")
# 
# Step 2: Calculate monthly payment patterns
# TODO: Group payments_temporal by "payment_month"
# TODO: Calculate payment_count using count("payment_id")
# TODO: Calculate total_payments using sum("payment_amount")
# TODO: Calculate avg_payment using avg("payment_amount")
# TODO: Calculate late_payments using sum(when(col("late_payment_flag") == True, 1).otherwise(0))
# TODO: Order by payment_month
# TODO: Store result as monthly_payments
# 
# Step 3: Display monthly payment patterns
# TODO: Print "📊 Monthly Payment Patterns:"
# TODO: Show all 12 months using monthly_payments.show(12)
# 
# Step 4: Analyze payment methods
# TODO: Group payments_df by "payment_method"
# TODO: Calculate payment_count using count("payment_id")
# TODO: Calculate total_amount using sum("payment_amount")
# TODO: Calculate avg_amount using avg("payment_amount")
# TODO: Calculate late_payments using sum(when(col("late_payment_flag") == True, 1).otherwise(0))
# TODO: Calculate failed_payments using sum(when(col("payment_status") == "Failed", 1).otherwise(0))
# TODO: Order by payment_count descending
# TODO: Store result as payment_method_analysis
# 
# Step 5: Display payment method analysis
# TODO: Print "📊 Payment Method Analysis:"
# TODO: Show payment_method_analysis
#
# EXPECTED OUTPUT: Monthly payment patterns and payment method analysis
# Target: Identify preferred payment methods and seasonal payment patterns

print("💳 Analyzing payment behavior patterns...")

# TODO: Implement payment behavior pattern analysis logic here
payments_temporal = (
    payments_df
    .withColumn("payment_month", month("payment_date"))
    .withColumn("payment_quarter", quarter("payment_date"))
)
monthly_payments = (
    payments_temporal
    .groupBy("payment_month")
    .agg(
        count("*").alias("payment_count"),
        sum("payment_amount").alias("total_payments"),
        avg("payment_amount").alias("avg_payment"),
        sum(when(col("late_payment_flag") == True, 1).otherwise(0)).alias("late_payments"),
    )
    .orderBy(desc("payment_month"))
)
payment_method_analysis = (
    payments_df
    .groupBy("payment_method")
    .agg(
        count("*").alias("payment_count"),
        sum("payment_amount").alias("total_amount"),
        avg("payment_amount").alias("avg_amount"),
        sum(when(col("late_payment_flag") == True, 1).otherwise(0)).alias("late_payments"),
        sum(when(col("payment_status") == "Failed", 1).otherwise(0)).alias("failed_payments"),
    )
    .orderBy(desc("payment_count"))
)
print("📊 Monthly Payment Patterns:")
monthly_payments.show(12)
print("📊 Payment Method Analysis:")
payment_method_analysis.show(12)

💳 Analyzing payment behavior patterns...
📊 Monthly Payment Patterns:
+-------------+-------------+--------------------+------------------+-------------+
|payment_month|payment_count|      total_payments|       avg_payment|late_payments|
+-------------+-------------+--------------------+------------------+-------------+
|           12|        13449|2.0151222230000053E7|1498.3435370659568|         6070|
|           11|        12739|  2.00566850299999E7|   1574.4316688908|         5742|
|           10|        12938| 1.980258771000004E7|1530.5756461586054|         5759|
|            9|        12233| 1.902085370999994E7|1554.8805452464596|         5358|
|            8|        12144|1.8616936820000134E7|1533.0152190382191|         5560|
|            7|        29317|3.1470215220000166E7|1073.4459603642995|         5791|
|            6|        14714|2.1181855019999962E7|1439.5714978931603|         6656|
|            5|        14974| 2.200196107999994E7|1469.3442687324655|         6677|
|      

## 5. Save Risk Analysis Results to Database Tables

In [0]:
# BUSINESS CONTEXT: Persistent tables enable reliable data pipeline execution
# Downstream notebooks depend on these results for advanced analytics

# TODO: Save risk analysis results to database tables for downstream consumption
# 
# Step 1: Create comprehensive customer risk profiles table
# TODO: Select the following columns from customer_risk_df:
#       - customer_id, first_name, last_name, email, birth_date, age, gender, marital_status
#       - income, credit_score, employment_status, education, state, zip_code, acquisition_date
#       - risk_category, value_segment, composite_risk_score, claims_risk_score, credit_risk_score
#       - age_risk_score, premium_risk_score, payment_risk_score, payment_reliability_score
#       - total_policies, total_premium, avg_premium, total_coverage, policy_types_count, policy_types
#       - total_claims, total_claim_amount, avg_claim_amount, max_claim_amount, policies_with_claims
#       - total_payments, total_payment_amount, avg_payment_amount, late_payments_count, failed_payments_count
# TODO: Store selection as customer_risk_profiles_final
# 
# Step 2: Save customer risk profiles table
# TODO: Write customer_risk_profiles_final to database table "customer_risk_profiles"
# TODO: Use mode("overwrite") and saveAsTable(f"{DATABASE_NAME}.customer_risk_profiles")
# TODO: Print f"✅ Saved customer_risk_profiles table: {customer_risk_profiles_final.count():,} records"
# 
# Step 3: Save risk-value matrix table
# TODO: Write risk_value_matrix to database table "risk_value_matrix"
# TODO: Use mode("overwrite") and saveAsTable(f"{DATABASE_NAME}.risk_value_matrix")
# TODO: Print f"✅ Saved risk_value_matrix table: {risk_value_matrix.count():,} records"
# 
# Step 4: Save cross-selling opportunities table
# TODO: Select relevant columns from single_policy_customers: customer_id, first_name, last_name, risk_category, value_segment, total_premium, policy_types_count, policy_types, composite_risk_score
# TODO: Order by total_premium descending
# TODO: Store as cross_sell_opportunities
# TODO: Write to database table "cross_sell_opportunities"
# TODO: Print f"✅ Saved cross_sell_opportunities table: {cross_sell_opportunities.count():,} records"
# 
# Step 5: Save additional analysis tables
# TODO: Save policy_type_dist as "policy_portfolio_analysis" table
# TODO: Save seasonal_claims as "seasonal_claims_analysis" table
# TODO: Save monthly_payments as "monthly_payment_patterns" table
# TODO: Print "✅ Saved temporal analysis tables"
#
# EXPECTED OUTPUT: 6 database tables created successfully
# Target: All tables available for Notebook 2 consumption

print("💾 Saving risk analysis results to database tables...")

# TODO: Implement database table creation logic here

# TODO: Select the following columns from customer_risk_df:
#       - customer_id, first_name, last_name, email, birth_date, age, gender, marital_status
#       - income, credit_score, employment_status, education, state, zip_code, acquisition_date
#       - risk_category, value_segment, composite_risk_score, claims_risk_score, credit_risk_score
#       - age_risk_score, premium_risk_score, payment_risk_score, payment_reliability_score
#       - total_policies, total_premium, avg_premium, total_coverage, policy_types_count, policy_types
#       - total_claims, total_claim_amount, avg_claim_amount, max_claim_amount, policies_with_claims
#       - total_payments, total_payment_amount, avg_payment_amount, late_payments_count, failed_payments_count
customer_risk_profiles_final = customer_risk_df.select(
    "customer_id",
    "first_name",
    "last_name",
    "email",
    "birth_date",
    "gender",
    "marital_status",
    "income",
    "credit_score",
    "employment_status",
    "education",
    "state",
    "zip_code",
    "acquisition_date",
    "risk_category",
    "value_segment",
    "composite_risk_score",
    "claims_risk_score",
    "credit_risk_score",
    "age_risk_score",
    "premium_risk_score",
    "payment_risk_score",
    "payment_reliability_score",
    "total_policies",
    "total_premium",
    "avg_premium",
    "total_coverage",
    "policy_types_count",
    "policy_types",
    "total_claims",
    "total_claim_amount",
    "avg_claim_amount",
    "max_claim_amount",
    "policies_with_claims",
    "total_payments",
    "total_payment_amount",
    "avg_payment_amount",
    "late_payments_count",
    "failed_payments_count",
)
customer_risk_profiles_final.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.customer_risk_profiles")
print(f"✅ Saved customer_risk_profiles table: {customer_risk_profiles_final.count():,} records")

risk_value_matrix.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.risk_value_matrix")
print(f"✅ Saved risk_value_matrix table: {risk_value_matrix.count():,} records")

single_policy_customers = customer_risk_profiles_final.filter(col("total_policies") == 1)
cross_sell_opportunities = single_policy_customers.select(
    "customer_id",
    "first_name",
    "last_name",
    "risk_category",
    "value_segment",
    "total_premium",
    "policy_types_count",
    "policy_types",
    "composite_risk_score")
cross_sell_opportunities.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.cross_sell_opportunities")
print(f"✅ Saved cross_sell_opportunities table: {cross_sell_opportunities.count():,} records")

policy_type_dist = policies_df.groupBy("policy_type").count()
policy_type_dist.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.policy_portfolio_analysis")
print(f"✅ Saved policy_type_dist table: {policy_type_dist.count():,} records")

seasonal_claims = claims_df.groupBy("claim_month").agg(count("*").alias("total_claims"))
seasonal_claims.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.seasonal_claims_analysis")
print(f"✅ Saved seasonal_claims table: {seasonal_claims.count():,} records")

monthly_payment_patterns = payments_df.groupBy("payment_date").agg(count("*").alias("total_payments"))
monthly_payment_patterns.write.mode("overwrite").saveAsTable(f"{DATABASE_NAME}.monthly_payment_patterns")
print(f"✅ Saved monthly_payment_patterns table: {monthly_payment_patterns.count():,} records")

💾 Saving risk analysis results to database tables...
✅ Saved customer_risk_profiles table: 15,000 records
✅ Saved risk_value_matrix table: 9 records
✅ Saved cross_sell_opportunities table: 471 records
✅ Saved policy_type_dist table: 6 records
✅ Saved seasonal_claims table: 12 records
✅ Saved monthly_payment_patterns table: 1,991 records


## 6. Business Insights Summary

In [0]:
# BUSINESS CONTEXT: Executive summary provides key insights for strategic decision-making
# Quantified insights enable data-driven business planning and resource allocation

# TODO: Generate comprehensive business insights summary
# 
# Step 1: Calculate risk distribution insights
# TODO: Collect risk_distribution results using .collect()
# TODO: Print "🎯 KEY BUSINESS INSIGHTS FROM RISK ANALYSIS"
# TODO: Print "=" * 60
# TODO: Print "📊 RISK DISTRIBUTION:"
# TODO: For each risk category, calculate percentage and print formatted results
# TODO: Use format: f"   {risk_category} Risk: {count:,} customers ({percentage:.1f}%)"
# 
# Step 2: Calculate value distribution insights
# TODO: Collect value_distribution results using .collect()
# TODO: Print "\n💰 VALUE DISTRIBUTION:"
# TODO: For each value segment, calculate percentage and print formatted results
# 
# Step 3: Calculate portfolio insights
# TODO: Print f"\n🎯 PORTFOLIO INSIGHTS:"
# TODO: Print f"   Total Customers: {total_customers:,}"
# TODO: Print f"   Total Policies: {total_policies:,}"
# TODO: Print f"   Cross-selling Opportunity: {cross_sell_percentage:.1f}% ({cross_sell_opportunity:,} customers)"
# 
# Step 4: Calculate financial insights
# TODO: Calculate total_premium_portfolio using sum("total_premium") from customer_risk_df
# TODO: Calculate avg_premium_customer = total_premium_portfolio / total_customers
# TODO: Print f"   Total Premium Portfolio: ${total_premium_portfolio:,.2f}"
# TODO: Print f"   Average Premium per Customer: ${avg_premium_customer:,.2f}"
# 
# Step 5: Calculate operational insights
# TODO: Calculate total_claims_count using claims_df.count()
# TODO: Calculate claims_rate = (total_claims_count / total_policies) * 100
# TODO: Calculate late_payment_rate from payments data
# TODO: Print claims and payment statistics
# 
# Step 6: List database tables created
# TODO: Print "\n📊 DATABASE TABLES CREATED FOR DOWNSTREAM ANALYSIS:"
# TODO: List all 6 tables with brief descriptions
# TODO: Print "\n🚀 READY FOR NOTEBOOK 2: CLPV AND RETENTION ANALYSIS"
#
# EXPECTED OUTPUT: Comprehensive business insights with quantified metrics
# Target: Executive-ready summary with actionable insights

print("🎯 KEY BUSINESS INSIGHTS FROM RISK ANALYSIS")
print("=" * 60)

# TODO: Implement business insights summary logic here
risk_distribution = customer_risk_profiles_final.groupBy("risk_category").count()
risk_distribution.collect()
for risk_category, count in risk_distribution.collect():
  percentage = (count / total_customers) * 100
  print(f"   {risk_category} Risk: {count:,} customers ({percentage:.1f}%)")

value_distribution = customer_risk_profiles_final.groupBy("value_segment").count()
value_distribution.collect()
print("\n💰 VALUE DISTRIBUTION:")
for value_segment, count in value_distribution.collect():
  percentage = (count / total_customers) * 100
  print(f"   {value_segment}: {count:,} customers ({percentage:.1f}%)")

print(f"\n🎯 PORTFOLIO INSIGHTS:")
print(f"   Total Customers: {total_customers:,}")
print(f"   Total Policies: {total_policies:,}")

cross_sell_percentage = (cross_sell_opportunity / total_customers) * 100
print(f"   Cross-selling Opportunity: {cross_sell_percentage:.1f}% ({cross_sell_opportunity:,} customers)")

total_premium_portfolio = customer_risk_df.select(sum("total_premium")).first()[0]
avg_premium_customer = total_premium_portfolio / total_customers
print(f"   Total Premium Portfolio: ${total_premium_portfolio:,.2f}")
print(f"   Average Premium per Customer: ${avg_premium_customer:,.2f}")

total_claims_count = claims_df.count()
claims_rate = (total_claims_count / total_policies) * 100
late_payment_rate = payments_df.filter(col("late_payment_flag") == 1).count() / total_policies * 100

print(f"   Claims Rate: {claims_rate:.1f}%")
print(f"   Late Payment Rate: {late_payment_rate:.1f}%")
print("\n📊 DATABASE TABLES CREATED FOR DOWNSTREAM ANALYSIS:")
print("   - claims_per_customer: Summary of claims data per customer")
print("   - customer_risk_df: Comprehensive risk analysis for each customer")
print("   - claims_df: Detailed claims data")
print("   - customers_df: Customer demographic and profile data")
print("   - interactions_df: Customer interactions data")
print("   - payments_df: Payment data")
print("   - policies_df: Policy data")
print("\n🚀 READY FOR NOTEBOOK 2: CLPV AND RETENTION ANALYSIS")

🎯 KEY BUSINESS INSIGHTS FROM RISK ANALYSIS
   High Risk: 1,551 customers (10.3%)
   Low Risk: 2,985 customers (19.9%)
   Medium Risk: 10,464 customers (69.8%)

💰 VALUE DISTRIBUTION:
   Medium Value: 2,970 customers (19.8%)
   High Value: 2,193 customers (14.6%)
   Low Value: 9,837 customers (65.6%)

🎯 PORTFOLIO INSIGHTS:
   Total Customers: 15,000
   Total Policies: 75,000
   Cross-selling Opportunity: 6.2% (925 customers)
   Total Premium Portfolio: $421,649,304.04
   Average Premium per Customer: $28,109.95
   Claims Rate: 14.2%
   Late Payment Rate: 96.6%

📊 DATABASE TABLES CREATED FOR DOWNSTREAM ANALYSIS:
   - claims_per_customer: Summary of claims data per customer
   - customer_risk_df: Comprehensive risk analysis for each customer
   - claims_df: Detailed claims data
   - customers_df: Customer demographic and profile data
   - interactions_df: Customer interactions data
   - payments_df: Payment data
   - policies_df: Policy data

🚀 READY FOR NOTEBOOK 2: CLPV AND RETENTION ANAL

## Summary and Next Steps

### ✅ Risk Analysis Objectives Achieved:
1. **Customer Risk Scoring** - Comprehensive risk assessment using 5 risk factors
2. **Customer Segmentation** - Risk categories (Low/Medium/High) and value segments
3. **Portfolio Analysis** - Policy distribution and cross-selling opportunities
4. **Temporal Patterns** - Seasonal claims and payment behavior analysis
5. **Database Integration** - All results saved to persistent tables for pipeline reliability

### 🎯 Key Analytical Outputs:
- **15,000 customers** scored and segmented by risk and value
- **Cross-selling opportunities** identified for single-policy customers
- **Seasonal patterns** documented for claims and payments
- **Portfolio risk concentration** analyzed by geography and policy type

### 🗄️ Database Tables Created:
- `customer_risk_profiles` - Core customer risk and segmentation data
- `risk_value_matrix` - Strategic customer matrix for business decisions
- `cross_sell_opportunities` - Priority customers for cross-selling
- `policy_portfolio_analysis` - Portfolio composition and performance
- `seasonal_claims_analysis` - Seasonal trend analysis
- `monthly_payment_patterns` - Payment behavior insights

### 🚀 Next Steps:
- **Notebook 2**: Load risk profiles and calculate Customer Lifetime Premium Value (CLPV)
- **Notebook 2**: Build renewal prediction models using risk segmentation
- **Notebook 2**: Develop pricing optimization based on risk analysis
- **Notebook 3**: Create executive dashboards using all accumulated insights

### 💡 Business Value:
This analysis provides the foundation for data-driven customer management, enabling SecureLife to optimize pricing, target retention efforts, and identify growth opportunities based on customer risk profiles and value segments.