<a href="https://colab.research.google.com/github/Wiqzard/temp/blob/main/HUK_interview.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install arff

## 0. Data Preprocessing

In [None]:
import pandas as pd
import arff

data_freq = arff.load('freMTPL2freq.arff')
df_freq = pd.DataFrame(data_freq , columns=["IDpol", "ClaimNb", "Exposure", "Area", "VehPower", "VehAge", "DrivAge", "BonusMalus", "VehBrand", "VehGas", "Density", "Region"] )
data_sev = arff.load('freMTPL2sev.arff')
df_sev = pd.DataFrame(data_sev, columns=["IDpol", "ClaimAmount"])
df_sev_agg = df_sev.groupby('IDpol', as_index=False).sum()

In [None]:
df_sev.head()
df_sev.describe()

In [None]:
df_freq.head()
df_freq.describe()

### 0.1 Remove Outlier and Create Features

In [None]:
# Create Features of Interest
df = pd.merge(df_freq, df_sev_agg, on='IDpol', how='left')
df['ClaimFreq'] = df['ClaimNb'] / df['Exposure']
df['ClaimSev'] = df['ClaimAmount']/ df['ClaimNb']
df['AnualClaimAmount'] = df['ClaimAmount'] / df['Exposure']

# Drop Duplicates with Id in
duplicates = df.duplicated()
print(f"Number of duplicate rows: {duplicates.sum()}")
df = df.drop_duplicates()

#Outlier Removal Quantile
def remove_outlier(data: pd.DataFrame, column: str) -> list[pd.DataFrame, int, int]:
    Q1 = data[column][data[column] > 0].quantile(0.25)
    Q3 = data[column][data[column] > 0].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    mask = (data[column] >= lower_bound) & (data[column] <= upper_bound) | data[column].isna()
    removed = len(data[column])-sum(mask)
    print(f"{column}: LowerBound: {lower_bound:.2f} UpperBound: {upper_bound:.2f} Removed: {removed}")
    return data[mask], lower_bound, upper_bound


df_o, a, b = remove_outlier(df, 'ClaimSev')
df_o, a, b = remove_outlier(df_o, 'ClaimFreq')
df_o = df_o.drop(columns=["IDpol"])

# Clip High Vehicle Ages
df_o['VehAge'] = df_o['VehAge'].clip(upper=50)



## 1. Descriptive Analysis

In [None]:
df_o.head()

In [None]:
df_o.describe()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

#columns = ['Area', 'VehPower', 'VehAge', 'DrivAge', 'BonusMalus', 'VehBrand', 'VehGas', 'Density', 'Region', 'ClaimAmount', 'ClaimFreq', 'ClaimSev']
plt.figure(figsize=(10, 6))

# Creating the subplot figure
plt.subplot(4, 1, 1)
sns.histplot(df_o['ClaimFreq'], kde=False, bins=50)
plt.title('Claim Frequency Distribution')

plt.subplot(4, 1, 2)
sns.histplot(df_o['ClaimFreq'][df_o['ClaimFreq'] > 0], kde=False, bins=50)
plt.title('Claim Frequency Distribution (Excl. 0)')

plt.subplot(4, 1, 3)
sns.histplot(df_o['ClaimSev'], kde=True, bins=50)
plt.title('Claim Severity Distribution')

plt.subplot(4, 1, 4)
sns.histplot(df_o['AnualClaimAmount'], kde=True, bins=50)
plt.title('Claim Severity Distribution')

plt.tight_layout()
plt.show()


## 2. Feature Analysis

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

df_encoded = df_o.copy()
df_encoded['Area'] = df_encoded['Area'].astype('category').cat.codes
df_encoded['VehBrand'] = df_encoded['VehBrand'].astype('category').cat.codes
df_encoded['VehGas'] = df_encoded['VehGas'].astype('category').cat.codes
df_encoded['Region'] = df_encoded['Region'].astype('category').cat.codes

# Calculating correlation matrix
df_encoded = df_encoded.drop(columns=['ClaimNb', 'Exposure', 'AnualClaimAmount', 'ClaimAmount'])
correlation_matrix = df_encoded.corr(method='spearman')

plt.figure(figsize=(12, 10))
heatmap = sns.heatmap(correlation_matrix, annot=True, fmt=".2f", cmap='coolwarm', vmin=-1, vmax=1, linewidths=0.5)
plt.title('Correlation Matrix Heatmap')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)

plt.show()

### 2.2 Response Variable-Predictor Visualization

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

def calculate_and_plot(freq_df, sev_df, group_col, bins, labels, xlabel,
                       plot_predicted=False, pred_claim_freq_col=None, pred_claim_sev_col=None):
    freq_df_copy = freq_df.copy()
    sev_df_copy = sev_df.copy()

    # Calculate the mean claim frequency and average cost for each group
    mean_claim_freq = freq_df_copy.groupby(group_col)['ClaimFreq'].mean().reset_index()
    mean_avg_cost = sev_df_copy.groupby(group_col)['ClaimSev'].mean().reset_index()

    if plot_predicted:
        # Calculate the mean predicted values for each group
        mean_pred_claim_freq = freq_df_copy.groupby(group_col)[pred_claim_freq_col].mean().reset_index()
        mean_pred_avg_cost = sev_df_copy.groupby(group_col)[pred_claim_sev_col].mean().reset_index()

    # Calculate the aggregated data for the binned groups
    freq_df_copy[f'{group_col}Group'] = pd.cut(freq_df_copy[group_col], bins=bins, labels=labels, right=False)
    sev_df_copy[f'{group_col}Group'] = pd.cut(sev_df_copy[group_col], bins=bins, labels=labels, right=False)

    agg_data_freq = freq_df_copy.groupby(f'{group_col}Group').agg(
        EmpiricalClaimFreq=('ClaimFreq', 'mean'),
        Exposure=('Exposure', 'sum')
    ).reset_index()

    agg_data_sev = sev_df_copy.groupby(f'{group_col}Group').agg(
        EmpiricalAvgCost=('ClaimSev', 'mean'),
        Exposure=('Exposure', 'sum')
    ).reset_index()

    #agg_data = pd.merge(agg_data_freq, agg_data_sev, on=f'{group_col}Group')

    if plot_predicted:
        agg_data_freq['PredictedClaimFreq'] = freq_df_copy.groupby(f'{group_col}Group')[pred_claim_freq_col].mean().values
        agg_data_sev['PredictedAvgCost'] = sev_df_copy.groupby(f'{group_col}Group')[pred_claim_sev_col].mean().values
    # Plot the figures
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(13, 3))

    # Top left plot: Empirical (and Predicted if plot_predicted=True) Annual Claim Frequency vs. Group (Mean for each group)
    axes[0].scatter(mean_claim_freq[group_col], mean_claim_freq['ClaimFreq'], color='red', label='Empirical')
    if plot_predicted:
        axes[0].scatter(mean_pred_claim_freq[group_col], mean_pred_claim_freq[pred_claim_freq_col], color='pink', label='Predicted')
    axes[0].set_xlabel(xlabel)
    axes[0].set_ylabel('Annual Claim Frequency')
    axes[0].legend()

    # Top right plot: Empirical (and Predicted if plot_predicted=True) Annual Claim Frequency by Group
    axes[1].bar(agg_data_sev[f'{group_col}Group'], agg_data_sev['Exposure'], color='grey', alpha=0.5)
    axes[1].set_ylabel('Earned Exposure')
    axes[1].set_xlabel(xlabel)
    axes2 = axes[1].twinx()
    axes2.errorbar(agg_data_sev[f'{group_col}Group'], agg_data_sev['EmpiricalAvgCost'], color='blue', marker='o', linestyle='-', label='Empirical')
    if plot_predicted:
        #axes2.errorbar(sev_df_copy["ClaimSev"], sev_df_copy[pred_claim_sev_col], color='blue', marker='x', linestyle='--', label='Predicted')
        axes2.errorbar(agg_data_sev[f'{group_col}Group'], agg_data_sev['PredictedAvgCost'], color='green', marker='x', linestyle='--', label='Predicted')
    axes2.set_ylabel('Annual Claim Severity')
    axes2.legend()
    #print(agg_data_sev)
    plt.tight_layout()
    plt.show()

# Assuming df is the original dataframe you provided
df_copy = df_o.copy()

# Create separate dataframes for frequency and severity
freq_df = df_copy[['DrivAge', 'BonusMalus', 'VehAge', 'ClaimFreq', 'Exposure']].copy()
sev_df = df_copy[['DrivAge', 'BonusMalus', 'VehAge', 'ClaimSev', 'Exposure']].copy()

groups = [
    {'group_col': 'DrivAge', 'bins': [18, 30, 40, 50, 60, 70, 100], 'labels': ['[18,30[', '[30,40[', '[40,50[', '[50,60[', '[60,70[', '[70-['], 'xlabel': 'Driver Age'},
    {'group_col': 'BonusMalus', 'bins': [50, 75, 100, 125, 150, 175, 200, 225], 'labels': ['[50,75[', '[75,100[', '[100,125[', '[125,150[', '[150,175[', '[175,200[', '[200,225['], 'xlabel': 'BonusMalus'},
    {'group_col': 'VehAge', 'bins': [0, 10, 20, 30, 40, 50], 'labels': ['[0,10[', '[10,20[', '[20,30[', '[30,40[', '[40,50['], 'xlabel': 'Vehicle Age'}
]
for group in groups:
    calculate_and_plot(freq_df, sev_df, group['group_col'], group['bins'], group['labels'], group['xlabel'])


## 3. Modeling

### 3.1 Helper Functions

In [None]:
import numpy as np
import pandas as pd
from tqdm import tqdm
import statsmodels.api as sm
from statsmodels.genmod.generalized_linear_model import GLMResults
from statsmodels.genmod.families import Poisson, NegativeBinomial, Tweedie, Gamma
from statsmodels.discrete.count_model import ZeroInflatedPoisson
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor


def fit_and_evaluate_glm(model_family, X_train, y_train, exposure_train, X_test, y_test, exposure_test):
    model = sm.GLM(y_train, X_train, family=model_family, var_weights=exposure_train)
    results = model.fit()

    train_predictions = results.predict(X_train)
    test_predictions = results.predict(X_test)

    train_mae = mean_absolute_error(y_train, train_predictions)
    test_mae = mean_absolute_error(y_test, test_predictions)
    train_mse = mean_squared_error(y_train, train_predictions)
    test_mse = mean_squared_error(y_test, test_predictions)

    metrics = {
        "train_mae": train_mae,
        "test_mae": test_mae,
        "train_mse": train_mse,
        "test_mse": test_mse
    }

    return results, metrics, test_predictions

def fit_and_evaluate_zip(X_train, y_train, exposure_train, X_test, y_test, exposure_test):
    model = ZeroInflatedPoisson(y_train, X_train, exposure=exposure_train)
    results = model.fit(maxiter=70)

    train_predictions = results.predict(X_train)
    test_predictions = results.predict(X_test)

    train_mae = mean_absolute_error(y_train, train_predictions)
    test_mae = mean_absolute_error(y_test, test_predictions)
    train_mse = mean_squared_error(y_train, train_predictions)
    test_mse = mean_squared_error(y_test, test_predictions)

    metrics = {
        "train_mae": train_mae,
        "test_mae": test_mae,
        "train_mse": train_mse,
        "test_mse": test_mse
    }

    return results, metrics, test_predictions

def fit_and_evaluate_rf(X_train, y_train, X_test, y_test):
    model = RandomForestRegressor(n_estimators=100, random_state=69)
    model.fit(X_train, y_train)

    train_predictions = model.predict(X_train)
    test_predictions = model.predict(X_test)

    train_mae = mean_absolute_error(y_train, train_predictions)
    test_mae = mean_absolute_error(y_test, test_predictions)
    train_mse = mean_squared_error(y_train, train_predictions)
    test_mse = mean_squared_error(y_test, test_predictions)

    metrics = {
        "train_mae": train_mae,
        "test_mae": test_mae,
        "train_mse": train_mse,
        "test_mse": test_mse
    }

    return model, metrics, test_predictions

### 3.2 Dataset Creation

In [None]:
# Preprocessing and data set preparation
model_data = df_o.copy()

# Encode Categorical Variables with 1 Hot
categorical_columns = ['Area', 'VehBrand', 'VehGas', 'Region']
model_data = pd.get_dummies(model_data, columns=categorical_columns, drop_first=True)
model_data.columns = model_data.columns.str.replace("'", "", regex=True)
model_data.columns = model_data.columns.str.replace(" ", "_", regex=True)

# Bin Integer Numerical Values
model_data["VehAge"] = pd.cut(model_data["VehAge"], bins=10, labels=False)
model_data["DrivAge"] = pd.cut(model_data["DrivAge"], bins=10, labels=False)
model_data["BonusMalus"] = pd.cut(model_data["BonusMalus"], bins=10, labels=False)

model_data["ClaimFreq"] = model_data["ClaimFreq"].astype(float)  # Ensure target is float
model_data["Exposure"] = model_data["Exposure"].astype(float)    # Ensure weights are float if needed

X = model_data.copy().drop(columns=['ClaimSev', 'ClaimFreq', 'ClaimAmount', 'AnualClaimAmount', 'ClaimNb', 'Exposure'])
X = X.astype(int)
train_data, test_data, X_train, X_test = train_test_split(model_data, X, test_size=0.2, random_state=69)

### 3.3 Claim Frequency Modeling

In [None]:
# Prepare data for ClaimFreq modeling
X_train = sm.add_constant(X_train)
X_test = sm.add_constant(X_test)
y_train_freq = np.asarray(train_data["ClaimFreq"])
y_test_freq = np.asarray(test_data["ClaimFreq"])
exposure_train = np.asarray(train_data["Exposure"])
exposure_test = np.asarray(test_data["Exposure"])

# Model families for ClaimFreq
model_families = {
    "Poisson": Poisson(),
    "Negative Binomial": NegativeBinomial(alpha=1.0),
    "Tweedie": Tweedie(var_power=1.5)
}

# Fit GLM models for ClaimFreq and store results
models = {}
predictions = {}
results_summary = {}
metrics_summary = {}

for name, family in tqdm(model_families.items()):
    results, metrics, predictions = fit_and_evaluate_glm(family, X_train, y_train_freq, exposure_train, X_test, y_test_freq, exposure_test)
    models[name] = results
    predictions[name] = predictions
    results_summary[name] = results.summary()
    metrics_summary[name] = metrics

# Fit ZIP model for ClaimFreq and store results
zip_results, zip_metrics, pred = fit_and_evaluate_zip(X_train, y_train_freq, exposure_train, X_test, y_test_freq, exposure_test)
results_summary["Zero-Inflated Poisson"] = zip_results.summary()
metrics_summary["Zero-Inflated Poisson"] = zip_metrics

# Fit Random Forest model for ClaimFreq and store results
rf_model_freq, rf_metrics_freq, rf_prediction = fit_and_evaluate_rf(X_train, y_train_freq, X_test, y_test_freq)
metrics_summary["Random Forest"] = rf_metrics_freq

In [None]:
# Display results for ClaimFreq
for name, summary in results_summary.items():
    print(f"Model: {name}\n")
    print(summary)
    print("\n")

# Create a DataFrame to display the metrics for ClaimFreq
metrics_df = pd.DataFrame(metrics_summary).T
metrics_df.columns = ["Train MAE", "Test MAE", "Train MSE", "Test MSE"]

# Display the metrics DataFrame for ClaimFreq
print("Claim Frequency Metrics:")
print(metrics_df)

### 3.4 Claim Severity Modeling

In [None]:
# Prepare data for ClaimSev modeling
train_mask = (train_data['ClaimSev'] > 0) & (~train_data['ClaimSev'].isna())
test_mask = (test_data['ClaimSev'] > 0) & (~test_data['ClaimSev'].isna())

X_train_sev = sm.add_constant(X_train[train_mask])
X_test_sev = sm.add_constant(X_test[test_mask])
y_train_sev = np.asarray(train_data[train_mask]["ClaimSev"])
y_test_sev = np.asarray(test_data[test_mask]["ClaimSev"])
exposure_train_sev = np.asarray(train_data[train_mask]["Exposure"])
exposure_test_sev = np.asarray(test_data[test_mask]["Exposure"])

# Model families for ClaimSev
sev_model_families = {
    "Poisson": Poisson(),
    "Normal": sm.families.Gaussian(),
    "Negative Binomial": NegativeBinomial(),
    "Gamma": Gamma()
}

# Fit GLM models for ClaimSev and store results
sev_models = {}
sev_predictions = {}
sev_results_summary = {}
sev_metrics_summary = {}

for name, family in tqdm(sev_model_families.items()):
    results, metrics, predictions = fit_and_evaluate_glm(family, X_train_sev, y_train_sev, exposure_train_sev, X_test_sev, y_test_sev, exposure_test_sev)
    sev_models[name] = results
    sev_predictions[name] = predictions
    sev_results_summary[name] = results.summary()
    sev_metrics_summary[name] = metrics

## Fit Random Forest model for ClaimSev and store results
rf_model_sev, rf_metrics_sev, rf_predictions = fit_and_evaluate_rf(X_train_sev, y_train_sev, X_test_sev, y_test_sev)
sev_metrics_summary["Random Forest"] = rf_metrics_sev



In [None]:
# Display results for ClaimSev
for name, summary in sev_results_summary.items():
    print(f"Model: {name}\n")
    print(summary)
    print("\n")


# Create a DataFrame to display the metrics for ClaimSev
sev_metrics_df = pd.DataFrame(sev_metrics_summary).T
sev_metrics_df.columns = ["Train MAE", "Test MAE", "Train MSE", "Test MSE"]

# Display the metrics DataFrame for ClaimSev
print("Claim Severity Metrics:")
print(sev_metrics_df)

### 3.5 Visualization of Results

In [None]:
poisson_results, poisson_metrics, freq_predictions = fit_and_evaluate_glm(Poisson(), X_train, y_train_freq, exposure_train, X_test, y_test_freq, exposure_test)
gaussian_results, gaussian_metrics, sev_predictions = fit_and_evaluate_glm(sm.families.Gaussian(), X_train_sev, y_train_sev, exposure_train_sev, X_test_sev, y_test_sev, exposure_test_sev)


In [None]:
combined_freq_data = test_data.copy() #.drop(columns='const')
combined_sev_data = test_data.copy().dropna() #.drop(columns='const')

combined_freq_data['PredClaimFreq'] = freq_predictions
combined_sev_data['PredClaimSev'] = sev_predictions
#combined_sev_data = combined_sev_data.dropna()


groups = [
    {'group_col': 'DrivAge', 'bins': [0, 1, 2, 3, 4, 5, 6], 'labels': ['[18,30[', '[30,40[', '[40,50[', '[50,60[', '[60,70[', '[70-['], 'xlabel': 'Driver Age'},
    {'group_col': 'BonusMalus', 'bins': [0, 1, 2, 3, 4, 5, 6, 7], 'labels': ['[50,75[', '[75,100[', '[100,125[', '[125,150[', '[150,175[', '[175,200[', '[200,225['], 'xlabel': 'BonusMalus'},
    {'group_col': 'VehAge', 'bins': [0, 1, 2, 3, 4, 5], 'labels': ['[0,10[', '[10,20[', '[20,30[', '[30,40[', '[40,50['], 'xlabel': 'Vehicle Age'}
]
for group in groups:
    calculate_and_plot(combined_freq_data, combined_sev_data, group['group_col'], group['bins'], group['labels'], group['xlabel'],
                       plot_predicted=True, pred_claim_freq_col='PredClaimFreq', pred_claim_sev_col='PredClaimSev')


### 3.6 Combine Models for Modeling the Average Anual Claim Amount

In [None]:
test_data_amount = test_data.copy()
test_data_amount.loc[test_data_amount['ClaimNb'] == 0, 'AnualClaimAmount'] = 0
test_data_amount = test_data_amount.dropna(subset=['AnualClaimAmount'])
X_test_amount = X_test.copy()
X_test_amount = X_test_amount.loc[test_data_amount.index]

freq_predictions = poisson_results.predict(X_test_amount)
sev_predictions = gaussian_results.predict(X_test_amount)

predicted_annual_amount = freq_predictions * sev_predictions
predicted_annual_amount.describe()

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error

# This is now only on the ones that already have a claim, we need all and if ClaimNb=0 fill NaN with 0
test_data_amount['PredClaimFreq'] = freq_predictions
test_data_amount['PredClaimSev'] = sev_predictions
test_data_amount['PredAnnualClaimAmount'] = predicted_annual_amount

mae = mean_absolute_error(test_data_amount['AnualClaimAmount'], test_data_amount['PredAnnualClaimAmount'])
mse = mean_squared_error(test_data_amount['AnualClaimAmount'], test_data_amount['PredAnnualClaimAmount'])
mean = (test_data_amount['AnualClaimAmount'] - test_data_amount['PredAnnualClaimAmount']).mean()

print(f'MAE: {mae:.2f}')
print(f'MSE: {mse:.2f}')
print(f'Mean: {mean:.2f}')

groups = [
    {'group_col': 'DrivAge', 'bins': [0, 1, 2, 3, 4, 5, 6], 'labels': ['[18,30[', '[30,40[', '[40,50[', '[50,60[', '[60,70[', '[70-['], 'xlabel': 'Driver Age'},
    {'group_col': 'BonusMalus', 'bins': [0, 1, 2, 3, 4, 5, 6, 7], 'labels': ['[50,75[', '[75,100[', '[100,125[', '[125,150[', '[150,175[', '[175,200[', '[200,225['], 'xlabel': 'BonusMalus'},
    {'group_col': 'VehAge', 'bins': [0, 1, 2, 3, 4, 5], 'labels': ['[0,10[', '[10,20[', '[20,30[', '[30,40[', '[40,50['], 'xlabel': 'Vehicle Age'}
]



In [None]:
fig, axes = plt.subplots(len(groups), 1, figsize=(8, 10))

for idx, group in enumerate(groups):
    group_col = group['group_col']
    bins = group['bins']
    labels = group['labels']
    xlabel = group['xlabel']

    test_data_amount[f'{group_col}Group'] = pd.cut(test_data_amount[group_col], bins=bins, labels=labels, right=False)

    agg_data_sev = test_data_amount.groupby(f'{group_col}Group').agg({
        'PredAnnualClaimAmount': 'mean',
        'AnualClaimAmount': 'mean',
        'Exposure': 'sum'
    }).reset_index()

    agg_data_sev['EmpiricalAvgCost'] = agg_data_sev['AnualClaimAmount'] / agg_data_sev['Exposure']
    agg_data_sev['PredictedAvgCost'] = agg_data_sev['PredAnnualClaimAmount'] / agg_data_sev['Exposure']

    ax1 = axes[idx]
    ax1.bar(agg_data_sev[f'{group_col}Group'], agg_data_sev['Exposure'], color='grey', alpha=0.5)
    ax1.set_ylabel('Earned Exposure')
    ax1.set_xlabel(xlabel)

    ax2 = ax1.twinx()
    ax2.errorbar(agg_data_sev[f'{group_col}Group'], agg_data_sev['EmpiricalAvgCost'], color='blue', marker='o', linestyle='-', label='Empirical')
    ax2.errorbar(agg_data_sev[f'{group_col}Group'], agg_data_sev['PredictedAvgCost'], color='green', marker='x', linestyle='--', label='Predicted')
    ax2.set_ylabel('Annual Claim Amount')
    ax2.legend()

plt.tight_layout()
plt.show()