In [35]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from xgboost import XGBRegressor, XGBClassifier
from sklearn.metrics import (mean_squared_error, r2_score, roc_auc_score, 
                           accuracy_score, classification_report, make_scorer)
import shap
import matplotlib.pyplot as plt
import warnings
from scipy.stats import randint, uniform



In [None]:
# Load your data
try:
    df = pd.read_csv('file.csv')
    print("Data loaded successfully. Shape:", df.shape)
except Exception as e:
    print(f"Error loading file: {e}")
    raise


In [None]:

# Data Cleaning and Preparation --------------------------------------------

def clean_and_prepare_data(df):
    """Clean data and remove specified columns"""
    # Remove specified columns
    columns_to_drop = ['CrossBorder', 'Citizenship', 'MaritalStatus', 
                      'CustomValueEstimate', 'NumberOfVehiclesInFleet']
    df = df.drop(columns=[col for col in columns_to_drop if col in df.columns])
    
    # Create claim indicator if not exists
    if 'HasClaim' not in df.columns:
        df['HasClaim'] = (df['TotalClaims'] > 0).astype(int)
    
    # Convert known numeric columns
    numeric_cols = ['TotalClaims', 'CalculatedPremiumPerTerm', 'SumInsured', 
                   'RegistrationYear', 'Cylinders', 'cubiccapacity', 'kilowatts',
                   'NumberOfDoors', 'CapitalOutstanding', 'VehicleAge']
    
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    
    # Convert categorical columns to strings
    categorical_cols = ['IsVATRegistered', 'LegalType', 'Title',
                       'Language', 'Bank', 'AccountType', 'Gender',
                       'Country', 'Province', 'PostalCode', 'MainCrestaZone',
                       'SubCrestaZone', 'ItemType', 'mmcode', 'VehicleType',
                       'make', 'Model', 'bodytype', 'AlarmImmobiliser', 'TrackingDevice',
                       'NewVehicle', 'WrittenOff', 'Rebuilt', 'Converted',
                       'TermFrequency', 'ExcessSelected', 'CoverCategory', 'CoverType',
                       'CoverGroup', 'Section', 'Product', 'StatutoryClass', 'StatutoryRiskType']
    
    for col in categorical_cols:
        if col in df.columns:
            df[col] = df[col].astype(str)
    
    # Feature engineering
    current_year = pd.Timestamp.now().year
    if 'RegistrationYear' in df.columns:
        df['VehicleAge'] = current_year - df['RegistrationYear']
    if all(col in df.columns for col in ['CalculatedPremiumPerTerm', 'SumInsured']):
        df['PremiumToSumInsuredRatio'] = df['CalculatedPremiumPerTerm'] / df['SumInsured']
    
    return df

df = clean_and_prepare_data(df)


In [None]:

# Define Features and Targets ----------------------------------------------

non_features = ['PolicyID', 'UnderwrittenCoverID', 'TransactionMonth', 
               'TotalClaims', 'CalculatedPremiumPerTerm', 'HasClaim']
features = [col for col in df.columns if col not in non_features]

# Separate numeric and categorical features
numeric_features = df[features].select_dtypes(include=np.number).columns.tolist()
categorical_features = df[features].select_dtypes(include=['object', 'string']).columns.tolist()

print("Numeric features:", numeric_features)
print("Categorical features:", categorical_features)

# Preprocessing Pipeline --------------------------------------------------

numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)])


In [None]:

# Parameter Grids for Tuning ----------------------------------------------

xgb_reg_params = {
    'regressor__n_estimators': randint(100, 500),
    'regressor__max_depth': randint(3, 10),
    'regressor__learning_rate': uniform(0.01, 0.3),
    'regressor__subsample': uniform(0.6, 0.4),
    'regressor__colsample_bytree': uniform(0.6, 0.4),
    'regressor__gamma': uniform(0, 0.5)
}

rf_reg_params = {
    'regressor__n_estimators': [100, 200, 300],
    'regressor__max_depth': [None, 5, 10, 15],
    'regressor__min_samples_split': [2, 5, 10],
    'regressor__min_samples_leaf': [1, 2, 4]
}

xgb_clf_params = {
    'classifier__n_estimators': randint(100, 500),
    'classifier__max_depth': randint(3, 10),
    'classifier__learning_rate': uniform(0.01, 0.3),
    'classifier__subsample': uniform(0.6, 0.4),
    'classifier__colsample_bytree': uniform(0.6, 0.4),
    'classifier__gamma': uniform(0, 0.5)
}

rf_clf_params = {
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [None, 5, 10, 15],
    'classifier__min_samples_split': [2, 5, 10],
    'classifier__min_samples_leaf': [1, 2, 4]
}


In [None]:

# Model 1: Claim Severity Prediction with Tuning --------------------------

print("\n=== Claim Severity Model (Policies with Claims) ===")
df_claims = df[df['HasClaim'] == 1].copy()

if len(df_claims) > 10:
    X_sev = df_claims[features]
    y_sev = df_claims['TotalClaims']
    
    X_train_sev, X_test_sev, y_train_sev, y_test_sev = train_test_split(
        X_sev, y_sev, test_size=0.2, random_state=42)
    
    # Linear Regression (no tuning needed)
    lr_model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', LinearRegression())])
    lr_model.fit(X_train_sev, y_train_sev)
    y_pred_lr = lr_model.predict(X_test_sev)
    rmse_lr = np.sqrt(mean_squared_error(y_test_sev, y_pred_lr))
    r2_lr = r2_score(y_test_sev, y_pred_lr)
    
    # Random Forest with GridSearch
    rf_model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', RandomForestRegressor(random_state=42))])
    
    rf_search = RandomizedSearchCV(
        rf_model, rf_reg_params, n_iter=20, cv=3, 
        scoring='neg_root_mean_squared_error', random_state=42, n_jobs=-1)
    rf_search.fit(X_train_sev, y_train_sev)
    y_pred_rf = rf_search.predict(X_test_sev)
    rmse_rf = np.sqrt(mean_squared_error(y_test_sev, y_pred_rf))
    r2_rf = r2_score(y_test_sev, y_pred_rf)
    
    # XGBoost with RandomizedSearch
    xgb_model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', XGBRegressor(random_state=42))])
    
    xgb_search = RandomizedSearchCV(
        xgb_model, xgb_reg_params, n_iter=50, cv=3, 
        scoring='neg_root_mean_squared_error', random_state=42, n_jobs=-1)
    xgb_search.fit(X_train_sev, y_train_sev)
    y_pred_xgb = xgb_search.predict(X_test_sev)
    rmse_xgb = np.sqrt(mean_squared_error(y_test_sev, y_pred_xgb))
    r2_xgb = r2_score(y_test_sev, y_pred_xgb)
    
    results_sev = {
        'Linear Regression': {'RMSE': rmse_lr, 'R2': r2_lr},
        'Random Forest': {'RMSE': rmse_rf, 'R2': r2_rf, 'best_params': rf_search.best_params_},
        'XGBoost': {'RMSE': rmse_xgb, 'R2': r2_xgb, 'best_params': xgb_search.best_params_}
    }
    
    print("\nModel Performance:")
    for name, metrics in results_sev.items():
        print(f"\n{name}:")
        print(f"RMSE = {metrics['RMSE']:.2f}, R2 = {metrics['R2']:.4f}")
        if 'best_params' in metrics:
            print("Best Parameters:")
            for param, value in metrics['best_params'].items():
                print(f"  {param}: {value}")
else:
    print(f"Not enough claims ({len(df_claims)}) to build severity model")
    results_sev = {}


In [None]:

# Model 2: Claim Probability Prediction with Tuning -----------------------

print("\n=== Claim Probability Model ===")
X_prob = df[features]
y_prob = df['HasClaim']

X_train_prob, X_test_prob, y_train_prob, y_test_prob = train_test_split(
    X_prob, y_prob, test_size=0.2, random_state=42, stratify=y_prob)

# Handle class imbalance
if len(y_train_prob) > 0:
    class_ratio = (len(y_train_prob) - sum(y_train_prob)) / max(1, sum(y_train_prob))
else:
    class_ratio = 1

# Random Forest with GridSearch
rf_clf_model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(random_state=42, class_weight='balanced'))])

rf_clf_search = RandomizedSearchCV(
    rf_clf_model, rf_clf_params, n_iter=20, cv=3, 
    scoring='roc_auc', random_state=42, n_jobs=-1)
rf_clf_search.fit(X_train_prob, y_train_prob)
y_pred_rf = rf_clf_search.predict_proba(X_test_prob)[:, 1]
auc_rf = roc_auc_score(y_test_prob, y_pred_rf)
accuracy_rf = accuracy_score(y_test_prob, (y_pred_rf > 0.5).astype(int))

# XGBoost with RandomizedSearch
xgb_clf_model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', XGBClassifier(random_state=42, 
                                objective='binary:logistic',
                                scale_pos_weight=class_ratio))])

xgb_clf_search = RandomizedSearchCV(
    xgb_clf_model, xgb_clf_params, n_iter=50, cv=3, 
    scoring='roc_auc', random_state=42, n_jobs=-1)
xgb_clf_search.fit(X_train_prob, y_train_prob)
y_pred_xgb = xgb_clf_search.predict_proba(X_test_prob)[:, 1]
auc_xgb = roc_auc_score(y_test_prob, y_pred_xgb)
accuracy_xgb = accuracy_score(y_test_prob, (y_pred_xgb > 0.5).astype(int))

results_prob = {
    'Random Forest': {
        'AUC': auc_rf, 
        'Accuracy': accuracy_rf,
        'best_params': rf_clf_search.best_params_
    },
    'XGBoost': {
        'AUC': auc_xgb, 
        'Accuracy': accuracy_xgb,
        'best_params': xgb_clf_search.best_params_
    }
}

print("\nModel Performance:")
for name, metrics in results_prob.items():
    print(f"\n{name}:")
    print(f"AUC = {metrics['AUC']:.4f}, Accuracy = {metrics['Accuracy']:.4f}")
    print("Best Parameters:")
    for param, value in metrics['best_params'].items():
        print(f"  {param}: {value}")
    print("Classification Report:")
    y_pred = (metrics['AUC'] > 0.5).astype(int)
    print(classification_report(y_test_prob, y_pred))


In [None]:

# Premium Optimization ----------------------------------------------------

if results_sev and results_prob:
    print("\n=== Premium Optimization ===")
    try:
        best_sev_model = xgb_search.best_estimator_ if 'XGBoost' in results_sev else rf_search.best_estimator_
        best_prob_model = xgb_clf_search.best_estimator_ if 'XGBoost' in results_prob else rf_clf_search.best_estimator_
        
        prob_claims = best_prob_model.predict_proba(X_test_prob)[:, 1]
        sev_claims = best_sev_model.predict(X_test_prob)
        
        risk_based_premium = prob_claims * sev_claims
        expense_loading = 0.2  # 20% for expenses
        profit_margin = 0.1    # 10% profit
        risk_based_premium = risk_based_premium * (1 + expense_loading + profit_margin)
        
        comparison = pd.DataFrame({
            'Actual Premium': df.loc[X_test_prob.index, 'CalculatedPremiumPerTerm'],
            'Risk-Based Premium': risk_based_premium
        }).dropna()
        
        if len(comparison) > 0:
            print("\nPremium Comparison (Sample):")
            print(comparison.sample(min(5, len(comparison))))
            corr = comparison.corr().iloc[0,1]
            print(f"\nCorrelation: {corr:.3f}")
        else:
            print("No valid premium comparisons available")
    except Exception as e:
        print(f"Premium optimization error: {str(e)}")


In [None]:

# Feature Importance ------------------------------------------------------

print("\n=== Feature Importance ===")
try:
    preprocessor.fit(X_train_prob)
    numeric_feature_names = numeric_features
    categorical_feature_names = list(
        preprocessor.named_transformers_['cat']
        .named_steps['onehot']
        .get_feature_names_out(categorical_features))
    all_feature_names = numeric_feature_names + categorical_feature_names
    
    if 'XGBoost' in results_prob:
        xgb_classifier = xgb_clf_search.best_estimator_.named_steps['classifier']
        plt.figure(figsize=(12, 8))
        explainer = shap.TreeExplainer(xgb_classifier)
        shap_values = explainer.shap_values(preprocessor.transform(X_train_prob))
        shap.summary_plot(shap_values, 
                         preprocessor.transform(X_train_prob), 
                         feature_names=all_feature_names,
                         plot_type='bar',
                         show=False)
        plt.title("Claim Probability - Feature Importance")
        plt.tight_layout()
        plt.show()
    
    if 'XGBoost' in results_sev:
        xgb_regressor = xgb_search.best_estimator_.named_steps['regressor']
        plt.figure(figsize=(12, 8))
        explainer = shap.TreeExplainer(xgb_regressor)
        shap_values = explainer.shap_values(preprocessor.transform(X_train_sev))
        shap.summary_plot(shap_values, 
                         preprocessor.transform(X_train_sev), 
                         feature_names=all_feature_names,
                         plot_type='bar',
                         show=False)
        plt.title("Claim Severity - Feature Importance")
        plt.tight_layout()
        plt.show()
except Exception as e:
    print(f"Feature importance error: {str(e)}")


In [None]:

# Final Report ------------------------------------------------------------

print("\n=== Final Results ===")
if results_sev:
    print("\nClaim Severity Models:")
    for name, metrics in results_sev.items():
        print(f"\n{name}:")
        print(f"RMSE = {metrics['RMSE']:.2f}, R2 = {metrics['R2']:.4f}")
        if 'best_params' in metrics:
            print("Best Parameters:")
            for param, value in metrics['best_params'].items():
                print(f"  {param}: {value}")

if results_prob:
    print("\nClaim Probability Models:")
    for name, metrics in results_prob.items():
        print(f"\n{name}:")
        print(f"AUC = {metrics['AUC']:.4f}, Accuracy = {metrics['Accuracy']:.4f}")
        print("Best Parameters:")
        for param, value in metrics['best_params'].items():
            print(f"  {param}: {value}")