<a href="https://colab.research.google.com/github/fjadidi2001/Insurance/blob/main/Nov21Insur.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pytorch_tabnet

Collecting pytorch_tabnet
  Downloading pytorch_tabnet-4.1.0-py3-none-any.whl.metadata (15 kB)
Downloading pytorch_tabnet-4.1.0-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.5/44.5 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytorch_tabnet
Successfully installed pytorch_tabnet-4.1.0


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import warnings
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
from xgboost import XGBClassifier
from pytorch_tabnet.tab_model import TabNetClassifier
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, matthews_corrcoef, log_loss, confusion_matrix, roc_curve, auc
)
from sklearn.base import clone

# Ignore warnings
warnings.filterwarnings('ignore')

# Load dfset
file_path = '/content/drive/My Drive/telematics_syn.csv'
df = pd.read_csv(file_path)

# Split df into features and target
X = df.drop('NB_Claim', axis=1)
y = df['NB_Claim']

# Identify categorical and numerical columns
categorical_cols = X.select_dtypes(include=['object', 'category']).columns
numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns

# Preprocessing pipelines for numerical and categorical df
numerical_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# Combine preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ])

# Split the df
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Preprocess the training df
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)

# Balance the training dfset using SMOTE
smote = SMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train, y_train)

# Define models
models = {
    'Logistic Regression': OneVsRestClassifier(LogisticRegression(max_iter=1000)),
    'XGBoost': XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', objective='multi:softprob'),
    'TabNet': TabNetClassifier(
        verbose=0,  # Set to 1 if you want to see training progress
        optimizer_params=dict(lr=2e-2),
        scheduler_params={"step_size":10, "gamma":0.9},
        scheduler_fn=torch.optim.lr_scheduler.StepLR,
        mask_type='entmax'
    )
}

# Function to evaluate models
def evaluate_model(model, X_train, X_test, y_train, y_test):
    y_pred = model.predict(X_test)

    if hasattr(model, "predict_proba"):
        y_prob = model.predict_proba(X_test)
    else:
        y_prob = np.eye(len(np.unique(y_test)))[y_pred]

    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    roc_auc = roc_auc_score(y_test, y_prob, multi_class='ovr', average='weighted')
    mcc = matthews_corrcoef(y_test, y_pred)
    loss = log_loss(y_test, y_prob)

    conf_matrix = confusion_matrix(y_test, y_pred)

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'roc_auc': roc_auc,
        'mcc': mcc,
        'loss': loss,
        'conf_matrix': conf_matrix
    }

# Function to plot confusion matrix
def plot_confusion_matrix(conf_matrix, title):
    plt.figure(figsize=(10, 7))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix - {title}')
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.show()

# Function to plot ROC curve
def plot_roc_curve(y_test, y_prob, title):
    n_classes = y_prob.shape[1]
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test == i, y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    plt.figure(figsize=(10, 7))
    colors = ['blue', 'red', 'green', 'orange']
    for i, color in zip(range(n_classes), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label=f'ROC curve of class {i} (area = {roc_auc[i]:.2f})')

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'Receiver Operating Characteristic - {title}')
    plt.legend(loc="lower right")
    plt.show()

# Training and evaluation loop
n_epochs = 10
results = {name: {'metrics': [], 'final_model': None} for name in models.keys()}

for name, model in models.items():
    print(f"\nTraining {name} for {n_epochs} epochs...")

    for epoch in range(n_epochs):
        print(f"Epoch {epoch + 1}/{n_epochs}")

        if name == 'TabNet':
            model.fit(
                X_train_res, y_train_res,
                eval_set=[(X_test, y_test)],
                max_epochs=1,  # Train for 1 epoch at a time
                patience=n_epochs  # Prevent early stopping
            )
        else:
            # For non-TabNet models, we'll retrain from scratch each "epoch" to simulate epoch-based training
            model_clone = clone(model)
            model_clone.fit(X_train_res, y_train_res)
            model = model_clone

        # Evaluate the model after each epoch
        metrics = evaluate_model(model, X_train_res, X_test, y_train_res, y_test)
        results[name]['metrics'].append(metrics)

        print(f"Accuracy: {metrics['accuracy']:.4f}, F1 Score: {metrics['f1']:.4f}")

    # Store the final model
    results[name]['final_model'] = model

# Plot final results
for name, result in results.items():
    final_metrics = result['metrics'][-1]  # Get metrics from the last epoch
    final_model = result['final_model']

    print(f"\nFinal results for {name}:")
    for metric, value in final_metrics.items():
        if metric != 'conf_matrix':
            print(f"  {metric}: {value:.4f}")

    plot_confusion_matrix(final_metrics['conf_matrix'], name)
    if hasattr(final_model, "predict_proba"):
        plot_roc_curve(y_test, final_model.predict_proba(X_test), name)

# Plot learning curves
plt.figure(figsize=(12, 8))
for name, result in results.items():
    accuracies = [m['accuracy'] for m in result['metrics']]
    plt.plot(range(1, n_epochs + 1), accuracies, label=name)

plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Learning Curves')
plt.legend()
plt.show()

# Combine all results into a final dfFrame for comparison
final_results = pd.dfFrame({
    'Model': list(results.keys()),
    'Accuracy': [results[name]['metrics'][-1]['accuracy'] for name in results],
    'Precision': [results[name]['metrics'][-1]['precision'] for name in results],
    'Recall': [results[name]['metrics'][-1]['recall'] for name in results],
    'F1 Score': [results[name]['metrics'][-1]['f1'] for name in results],
    'ROC AUC': [results[name]['metrics'][-1]['roc_auc'] for name in results],
    'MCC': [results[name]['metrics'][-1]['mcc'] for name in results],
    'Loss': [results[name]['metrics'][-1]['loss'] for name in results]
})

print("\nFinal Comparative Analysis Results:")
print(final_results)

FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/My Drive/telematics_syn.csv'

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Load the dataset

# 1. Handle missing values
# For numerical columns, we'll use median imputation
# For categorical columns, we'll use mode imputation
numeric_features = df.select_dtypes(include=['int64', 'float64']).columns
categorical_features = df.select_dtypes(include=['object']).columns

# 2. Feature engineering
df['Age_Group'] = pd.cut(df['Insured.age'], bins=[0, 25, 35, 45, 55, 65, 100], labels=['18-25', '26-35', '36-45', '46-55', '56-65', '65+'])
df['Car_Age_Group'] = pd.cut(df['Car.age'], bins=[-1, 0, 3, 5, 10, 20, 100], labels=['New', '1-3', '4-5', '6-10', '11-20', '20+'])

# 3. Create preprocessing pipelines
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# 4. Combine preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# 5. Fit and transform the data
X = df.drop(['NB_Claim', 'AMT_Claim'], axis=1)  # Features
y_nb = df['NB_Claim']  # Target for classification
y_amt = df['AMT_Claim']  # Target for regression

X_preprocessed = preprocessor.fit_transform(X)

# 6. Convert to DataFrame for better interpretability
feature_names = (numeric_features.tolist() +
                 preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names(categorical_features).tolist())
X_preprocessed_df = pd.DataFrame(X_preprocessed, columns=feature_names)

# 7. Print info about the preprocessed data
print("Preprocessed data shape:", X_preprocessed_df.shape)
print("\nFirst few rows of preprocessed data:")
print(X_preprocessed_df.head())

# 8. Handle target variables
# For NB_Claim (classification)
y_nb = (y_nb > 0).astype(int)  # Convert to binary (0 for no claim, 1 for claim)

# For AMT_Claim (regression)
# We'll keep it as is, but you might want to consider log transformation if the distribution is skewed

print("\nUnique values in NB_Claim (after binarization):", y_nb.unique())
print("AMT_Claim statistics:")
print(y_amt.describe())

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import StratifiedKFold
from pytorch_tabnet.tab_model import TabNetClassifier
import matplotlib.pyplot as plt
import seaborn as sns

# Load the preprocessed data
train_data = pd.read_csv('train_data.csv')
val_data = pd.read_csv('val_data.csv')
test_data = pd.read_csv('test_data.csv')

# Combine all data for analysis
all_data = pd.concat([train_data, val_data, test_data], axis=0)

# Function to plot feature distributions
def plot_feature_distributions(data, feature, target):
    plt.figure(figsize=(10, 6))
    sns.boxplot(x=target, y=feature, data=data)
    plt.title(f'Distribution of {feature} by {target}')
    plt.show()

# Analyze top features
top_features = ['num__NB_Claim', 'num__AMT_Claim', 'cat__Region_0']
for feature in top_features:
    plot_feature_distributions(all_data, feature, 'Risk_Category')

# Calculate correlation with target
correlation = all_data.corr()['Risk_Category'].sort_values(ascending=False)
print("Top 10 correlations with Risk_Category:")
print(correlation[:10])

# Remove highly correlated features
threshold = 0.8
high_corr_features = correlation[abs(correlation) > threshold].index.tolist()
high_corr_features.remove('Risk_Category')  # Keep the target variable
print(f"Removing highly correlated features: {high_corr_features}")

X = all_data.drop(high_corr_features + ['Risk_Category'], axis=1)
y = all_data['Risk_Category']

# Split the data
X_train_val = X.iloc[:len(train_data) + len(val_data)]
y_train_val = y.iloc[:len(train_data) + len(val_data)]
X_test = X.iloc[len(train_data) + len(val_data):]
y_test = y.iloc[len(train_data) + len(val_data):]

# Initialize TabNet model
tabnet_model = TabNetClassifier(
    n_d=16, n_a=16, n_steps=3,
    gamma=1.5, n_independent=2, n_shared=2,
    lambda_sparse=1e-3,
    optimizer_fn=torch.optim.Adam,
    optimizer_params=dict(lr=2e-2),
    mask_type="entmax",
    scheduler_params=dict(mode="min", patience=5, min_lr=1e-5, factor=0.9),
    scheduler_fn=torch.optim.lr_scheduler.ReduceLROnPlateau,
    seed=42,
    verbose=10
)

# Perform stratified k-fold cross-validation
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

cv_scores = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train_val, y_train_val), 1):
    print(f"Fold {fold}")

    X_train, X_val = X_train_val.iloc[train_idx], X_train_val.iloc[val_idx]
    y_train, y_val = y_train_val.iloc[train_idx], y_train_val.iloc[val_idx]

    tabnet_model.fit(
        X_train=X_train, y_train=y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        eval_name=['train', 'valid'],
        eval_metric=['accuracy'],
        max_epochs=100,
        patience=10,
        batch_size=1024,
        virtual_batch_size=128,
        num_workers=0,
        drop_last=False
    )

    y_pred = tabnet_model.predict(X_val)
    fold_accuracy = accuracy_score(y_val, y_pred)
    cv_scores.append(fold_accuracy)
    print(f"Fold {fold} Validation Accuracy: {fold_accuracy:.4f}")

print(f"Mean CV Accuracy: {np.mean(cv_scores):.4f} (+/- {np.std(cv_scores):.4f})")

# Retrain on full training set
tabnet_model.fit(
    X_train=X_train_val, y_train=y_train_val,
    eval_set=[(X_train_val, y_train_val)],
    eval_name=['train'],
    eval_metric=['accuracy'],
    max_epochs=100,
    patience=10,
    batch_size=1024,
    virtual_batch_size=128,
    num_workers=0,
    drop_last=False
)

# Make predictions on the test set
y_pred = tabnet_model.predict(X_test)

# Calculate accuracy and F1 score
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test F1 Score: {f1:.4f}")

# Feature importance
feature_importances = tabnet_model.feature_importances_
importance_df = pd.DataFrame({'feature': X.columns, 'importance': feature_importances})
importance_df = importance_df.sort_values('importance', ascending=False).head(10)
print("Top 10 Important Features:")
print(importance_df)

# Save the model
saving_path_name = "tabnet_model"
saved_filepath = tabnet_model.save_model(saving_path_name)
print(f"Model saved to: {saved_filepath}")