# Price Analytics & Elasticity Measurement

This notebook performs data cleaning on insurance contracts and claims data, and then estimates the price elasticity of demand using a logistic regression model on customer retention.

Data Source: [Insurance Dataset](https://www.kaggle.com/datasets/kpoviesistphane/insurance-dataset-for-data-engineering-practice)

# 0. Load libraries

In [None]:
import pandas as pd
import numpy as np
import re
import plotly.express as px
import plotly.graph_objects as go
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score
from datetime import datetime
import os

# Set display options
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 1000)

## 1. Data Loading

In [None]:
try:
    # Adjust paths if running from root or Analysis dir
    if os.path.exists("data/claims.csv"):
        claims = pd.read_csv("data/claims.csv")
        contracts = pd.read_csv("data/contracts.csv")
    elif os.path.exists("Analysis/data/claims.csv"):
        claims = pd.read_csv("Analysis/data/claims.csv")
        contracts = pd.read_csv("Analysis/data/contracts.csv")
    else:
        # Fallback for absolute path usage potential
        claims = pd.read_csv(
            r"c:\Users\soumy\OneDrive\Coding\python_scripts\Price-Analytics\Analysis\data\claims.csv"
        )
        contracts = pd.read_csv(
            r"c:\Users\soumy\OneDrive\Coding\python_scripts\Price-Analytics\Analysis\data\contracts.csv"
        )

    print("Data loaded successfully.")
except FileNotFoundError:
    print("Error: Data files not found.")
    exit()

Data loaded successfully.


## 2. Data Cleaning Functions

In [None]:
def clean_price(price_str):
    if pd.isna(price_str) or price_str == "":
        return np.nan
    s = str(price_str).strip()
    clean_s = re.sub(r"[^0-9.]", "", s)
    try:
        return float(clean_s)
    except ValueError:
        return np.nan


def parse_date(date_str):
    if pd.isna(date_str) or date_str == "":
        return pd.NaT
    s = str(date_str).strip()
    formats = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%d-%m-%Y"]
    for fmt in formats:
        try:
            return pd.to_datetime(s, format=fmt)
        except (ValueError, TypeError):
            continue
    try:
        return pd.to_datetime(s, dayfirst=True)
    except:
        return pd.NaT


def clean_gender(g):
    if pd.isna(g) or g == "":
        return "Unknown"
    s = str(g).strip().lower()
    if s in ["m", "male"]:
        return "M"
    elif s in ["f", "female"]:
        return "F"
    else:
        return "Unknown"

## 3. Applying Cleaning Logic

In [None]:
# Contracts
contracts["start_date_clean"] = contracts["start_date"].apply(parse_date)
contracts["end_date_clean"] = contracts["end_date"].apply(parse_date)
contracts["annual_premium_clean"] = contracts["annual_premium"].apply(clean_price)
contracts["gender_clean"] = contracts["gender"].apply(clean_gender)
contracts["age_missing"] = contracts["client_age"].isna().astype(int)
contracts["client_age_clean"] = pd.to_numeric(contracts["client_age"], errors="coerce")
contracts["client_age_clean"] = contracts["client_age_clean"].fillna(
    contracts["client_age_clean"].median()
)
contracts["csp_clean"] = contracts["csp"].fillna("Unknown")
contracts["channel_clean"] = contracts["channel"].fillna("Unknown")
contracts["product_clean"] = contracts["product"].fillna("Unknown")

valid_status_mask = contracts["status"].isin(
    ["Active", "Renewed", "Cancelled", "Expired"]
)
contracts_model = contracts[valid_status_mask].copy()
contracts_model["retained"] = (
    contracts_model["status"].isin(["Active", "Renewed"]).astype(int)
)
contracts_model = contracts_model.dropna(subset=["annual_premium_clean"])

# Claims
claims["occurrence_date_clean"] = claims["occurrence_date"].apply(parse_date)
claims["declaration_date_clean"] = claims["declaration_date"].apply(parse_date)
claims["damage_amount_clean"] = claims["damage_amount"].apply(clean_price)
claims["indemnified_amount_clean"] = claims["indemnified_amount"].apply(clean_price)

claim_counts = pd.pivot_table(
    claims,
    index="contract_id",
    columns="claim_type",
    values="claim_id",
    aggfunc="count",
    fill_value=0,
)
claim_counts.columns = ["num_claims_" + str(c) for c in claim_counts.columns]
claim_counts = claim_counts.reset_index()

claim_amounts = (
    claims.groupby("contract_id")
    .agg(
        num_claims=("claim_id", "count"),
        total_claim_cost=("indemnified_amount_clean", "sum"),
        total_damage_amount=("damage_amount_clean", "sum"),
    )
    .reset_index()
)

claims_agg = pd.merge(claim_amounts, claim_counts, on="contract_id", how="left")

## 4. Merging Data

In [None]:
df_final = pd.merge(contracts_model, claims_agg, on="contract_id", how="left")
cols_to_fill = ["num_claims", "total_claim_cost", "total_damage_amount"] + [
    c for c in claims_agg.columns if "num_claims_" in c
]
for col in cols_to_fill:
    if col in df_final.columns:
        df_final[col] = df_final[col].fillna(0)

df_final["annual_premium_clean"] = pd.to_numeric(
    df_final["annual_premium_clean"], errors="coerce"
)
df_final["client_age_clean"] = pd.to_numeric(
    df_final["client_age_clean"], errors="coerce"
)
df_final["num_claims"] = pd.to_numeric(df_final["num_claims"], errors="coerce")
df_final["client_age_clean"] = df_final["client_age_clean"].fillna(
    df_final["client_age_clean"].median()
)
df_final = df_final.dropna(subset=["annual_premium_clean"])

print(f"Final Dataset Shape: {df_final.shape}")

Final Dataset Shape: (12282, 34)


## 5. Elasticity Analysis & Feature Importance

In [None]:
print("\n--- Starting Random Forest Optimization ---")

# Prepare features
categorical_cols = [
    "risk_zone",
    "gender_clean",
    "channel_clean",
    "csp_clean",
    "product_clean",
]
df_model = pd.get_dummies(df_final, columns=categorical_cols, drop_first=True)

base_vars = ["annual_premium_clean", "client_age_clean", "total_damage_amount"]
claim_type_vars = [c for c in df_model.columns if "num_claims_" in c]
dummy_vars = [
    c
    for c in df_model.columns
    if any(
        stem in c
        for stem in [
            "risk_zone_",
            "gender_clean_",
            "channel_clean_",
            "csp_clean_",
            "product_clean_",
        ]
    )
]

independent_vars = base_vars + claim_type_vars + dummy_vars

X = df_model[independent_vars]
X = X.astype(float)
y = df_model["retained"].astype(int)

# 1. Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Train Clean: {X_train.shape}, Test Clean: {X_test.shape}")

# 2. Hyperparameter Grid
param_dist = {
    "n_estimators": [50, 100, 200, 300],
    "max_depth": [None, 10, 20, 30],
    "min_samples_split": [2, 5, 10],
    "min_samples_leaf": [1, 2, 4],
    "max_features": ["sqrt", "log2", None],
}

rf = RandomForestClassifier(random_state=42)

# 3. Randomized Search
random_search = RandomizedSearchCV(
    estimator=rf,
    param_distributions=param_dist,
    n_iter=20,
    cv=5,
    scoring="accuracy",
    n_jobs=-1,
    random_state=42,
    verbose=1,
)

print("Tuning hyperparameters...")
random_search.fit(X_train, y_train)

best_rf = random_search.best_estimator_

print("\n--- Best Hyperparameters ---")
print(random_search.best_params_)

# 4. Evaluation
y_pred = best_rf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)

print(f"\nTest Set Accuracy: {accuracy:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# 5. Feature Importance
print("\n--- Feature Importance (Top 10) ---")
importances = pd.DataFrame(
    {"Feature": X.columns, "Importance": best_rf.feature_importances_}
).sort_values("Importance", ascending=False)

print(importances.head(10))



--- Starting Random Forest Optimization ---
Train Clean: (9825, 27), Test Clean: (2457, 27)
Tuning hyperparameters...
Fitting 5 folds for each of 20 candidates, totalling 100 fits

--- Best Hyperparameters ---
{'n_estimators': 300, 'min_samples_split': 5, 'min_samples_leaf': 2, 'max_features': None, 'max_depth': 10}

Test Set Accuracy: 0.4986

Classification Report:
              precision    recall  f1-score   support

           0       0.51      0.69      0.59      1259
           1       0.48      0.30      0.37      1198

    accuracy                           0.50      2457
   macro avg       0.49      0.49      0.48      2457
weighted avg       0.49      0.50      0.48      2457


--- Feature Importance (Top 10) ---
                 Feature  Importance
0   annual_premium_clean    0.487102
1       client_age_clean    0.212664
12        gender_clean_M    0.027592
15   channel_clean_Phone    0.024320
2    total_damage_amount    0.022961
16     channel_clean_Web    0.022367
13  gen

# 6. Visualization

In [None]:
try:
    # 1. Price Elasticity Plot
    df_final["premium_bin"] = pd.qcut(df_final["annual_premium_clean"], q=10)
    retention_by_price = (
        df_final.groupby("premium_bin", observed=False)["retained"].mean().reset_index()
    )
    retention_by_price["price_mid"] = retention_by_price["premium_bin"].apply(
        lambda x: x.mid
    )

    fig1 = px.scatter(
        retention_by_price,
        x="price_mid",
        y="retained",
        title="Retention Rate vs Annual Premium (Deciles)",
        labels={"price_mid": "Annual Premium", "retained": "Retention Rate"},
        trendline="ols",
    )
    fig1.show()

    # 2. Feature Importance Plot
    if "importances" in locals():
        fig2 = px.bar(
            importances.head(15),
            x="Importance",
            y="Feature",
            orientation="h",
            title="Top 15 Drivers of Customer Retention (Random Forest)",
            labels={"Importance": "Importance Score", "Feature": "Factor"},
        )
        fig2.update_layout(yaxis={"categoryorder": "total ascending"})
        fig2.show()

except Exception as e:
    print("Visualization failed:", e)
