# Modeling and Evaluation

## Objectives
Predict medical insurance charges using customer profile information.

## Inputs
- Processed customer dataset with feature engineering.

## Outputs
- Trained ML regression model.
- Feature importance ranking.

# Change working directory

In [None]:
import os
current_dir = os.getcwd()
current_dir

We want to make the parent of the current directory the new current directory
* os.path.dirname() gets the parent directory
* os.chir() defines the new current directory

In [None]:
os.chdir(os.path.dirname(current_dir))
print("You set a new current directory")

Confirm the new current directory

In [None]:
current_dir = os.getcwd()
current_dir

---

# Load Engineered Dataset

In [None]:
import pandas as pd
df_path = 'outputs/datasets/cleaned/insurance_cleaned.csv'
df = pd.read_csv(df_path)
df.head()

Remove warning messages

In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

## Split the Dataset

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(df.drop(['charges'], axis=1),
                                                    df['charges'],
                                                    test_size=0.2,
                                                    random_state=0,
                                                   )

print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)

In [None]:
print(f"{X_test.head()}\n\n {X_train.head()}")

---

# ML Pipeline for Data Cleaning and Feature Engineering

Based on the last notebook, we will create our ML pipeline for data cleaning and feature engineering.

In [None]:
from sklearn.pipeline import Pipeline

# Feature Engineering
from feature_engine.encoding import OrdinalEncoder
from sklearn.preprocessing import StandardScaler
from feature_engine.wrappers import SklearnTransformerWrapper

def PipelineDataCleaningAndFeatureEngineering():
    categorical_vars = ['sex', 'smoker', 'region']
    numerical_vars = ['age', 'bmi']

    pipeline = Pipeline([
        ('ordinal_encoder', OrdinalEncoder(encoding_method='arbitrary',
                                           variables=categorical_vars)),
        ('scaler', SklearnTransformerWrapper(transformer=StandardScaler(),
                                             variables=numerical_vars))
    ])
    
    return pipeline

PipelineDataCleaningAndFeatureEngineering()

**Fit Pipeline**

In [None]:
pipeline_data_cleaning_feat_eng = PipelineDataCleaningAndFeatureEngineering()

X_train = pipeline_data_cleaning_feat_eng.fit_transform(X_train)
X_test = pipeline_data_cleaning_feat_eng.transform(X_test)


print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)


Check if the pipeline does the feature engineering correctly.

In [None]:
print(f"{X_test.head()}\n\n {X_train.head()}")

# ML Pipeline for Modelling and Hyperparameter Optimisation

- **SmartCorrelation:** removes features with high correlation to avoid multicollinearity.
- **model:** the ML algorithm

In [None]:

# Feat Selection
from feature_engine.selection import SmartCorrelatedSelection

def PipelineClf(model):
    return Pipeline([
        ('correlation_filter', SmartCorrelatedSelection(
            method='pearson',
            threshold=0.8,
            selection_method='variance')),
        ('model', model)
    ])

---

## Hyperparameter Optimisation

**Hyperparameter Optimisation**

This is the process of tuning the hyperparameters of a machine learning model to improve its performance. It involves searching for the best combination of hyperparameters that yield the highest performance on a validation set.

In [None]:
from sklearn.model_selection import GridSearchCV
import numpy as np

class HyperparameterOptimizationSearch:
    """
    Custom class provided by CI
    """
    def __init__(self, models, params):
        self.models = models
        self.params = params
        self.keys = models.keys()
        self.grid_searches = {}

    def fit(self, X, y, cv, n_jobs, verbose=1, scoring=None, refit=False):
        for key in self.keys:
            print(f"\nRunning GridSearchCV for {key} \n")

            model = PipelineClf(self.models[key])
            params = self.params[key]
            gs = GridSearchCV(model, params, cv=cv, n_jobs=n_jobs,
                              verbose=verbose, scoring=scoring, )
            gs.fit(X, y)
            self.grid_searches[key] = gs

    def score_summary(self, sort_by='mean_score'):
        def row(key, scores, params):
            d = {
                'estimator': key,
                'min_score': min(scores),
                'max_score': max(scores),
                'mean_score': np.mean(scores),
                'std_score': np.std(scores),
            }
            return pd.Series({**params, **d})

        rows = []
        for k in self.grid_searches:
            params = self.grid_searches[k].cv_results_['params']
            scores = []
            for i in range(self.grid_searches[k].cv):
                key = "split{}_test_score".format(i)
                r = self.grid_searches[k].cv_results_[key]
                scores.append(r.reshape(len(params), 1))

            all_scores = np.hstack(scores)
            for p, s in zip(params, all_scores):
                rows.append((row(k, s, p)))

        df = pd.concat(rows, axis=1).T.sort_values([sort_by], ascending=False)
        columns = ['estimator', 'min_score',
                   'mean_score', 'max_score', 'std_score']
        columns = columns + [c for c in df.columns if c not in columns]
        return df[columns], self.grid_searches

## Grid Search CV - Sklearn

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from xgboost import XGBRegressor

models_quick_search = {
    "LinearRegression": LinearRegression(),
    "DecisionTreeRegressor": DecisionTreeRegressor(random_state=0),
    "RandomForestRegressor": RandomForestRegressor(random_state=0),
    "GradientBoostingRegressor": GradientBoostingRegressor(random_state=0),
    "XGBRegressor": XGBRegressor(random_state=0),
}

params_quick_search = {
    "LinearRegression": {},
    "DecisionTreeRegressor": {
        'model__max_depth': [None, 10, 20],
        'model__min_samples_split': [2, 5],
        'model__min_samples_leaf': [1, 2]
    },
    "RandomForestRegressor": {
        'model__n_estimators': [100, 200],
        'model__max_depth': [None, 10, 20],
        'model__min_samples_split': [2, 5],
        'model__min_samples_leaf': [1, 2],
        'model__bootstrap': [True]
    },
    "GradientBoostingRegressor": {
        'model__n_estimators': [100, 200],
        'model__learning_rate': [0.1, 0.2],
        'model__max_depth': [3, 5],
        'model__subsample': [0.8, 1.0],
        'model__min_samples_split': [2, 5],
        'model__min_samples_leaf': [1, 2]
    },
    "XGBRegressor": {
        'model__n_estimators': [100, 200],
        'model__learning_rate': [0.1, 0.2],
        'model__max_depth': [3, 5],
        'model__subsample': [0.8, 1.0],
        'model__colsample_bytree': [0.8, 1.0]
    },
}

**Run Grid Search CV**

In [None]:
# Grid Search
search = HyperparameterOptimizationSearch(
    models=models_quick_search,
    params=params_quick_search
)
search.fit(X_train, y_train, scoring='r2', n_jobs=-1, cv=5)

Check the results

In [None]:
grid_search_summary, grid_search_pipelines = search.score_summary(sort_by='mean_score')
print(grid_search_summary)

**Evaluate the Best Model**

In [None]:
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error

def regression_performance(X_train, y_train, X_test, y_test, pipeline):
    print("Model Evaluation \n")
    print("* Train Set")
    regression_evaluation(X_train, y_train, pipeline)
    print("* Test Set")
    regression_evaluation(X_test, y_test, pipeline)

def regression_evaluation(X, y, pipeline):
    prediction = pipeline.predict(X)
    print('R2 Score:', r2_score(y, prediction).round(3))
    print('Mean Absolute Error:', mean_absolute_error(y, prediction).round(3))
    print('Mean Squared Error:', mean_squared_error(y, prediction).round(3))
    print('Root Mean Squared Error:', np.sqrt(mean_squared_error(y, prediction)).round(3))
    print("\n")

best_model = grid_search_summary.iloc[0]['estimator']
print("Best Model:", best_model)
best_regressor_pipeline = grid_search_pipelines[best_model].best_estimator_

regression_performance(X_train, y_train, X_test, y_test, best_regressor_pipeline)

Parameters for best model

In [None]:
best_parameters = grid_search_pipelines[best_model].best_params_
best_parameters

The best clf pipeline

In [None]:
pipeline_clf = grid_search_pipelines[best_model].best_estimator_
pipeline_clf

In [None]:
pipeline_clf.steps

The XGBRegressor model has demonstrated strong predictive performance on both the training and test datasets.
- **R² Score: 0.891:** The model explains 89.1% of the variance in insurance costs on the training data, indicating a strong fit. 
- Low gap between train/test scores — not overfitting
- Low error metrics — strong predictions
- Stable RMSE on train/test — very balanced model

XGBRegressor is an optimal choice for this regression task based on both accuracy and stability.With these strong evaluation scores, you are already in excellent shape and an extensive search is not going to be necessary.

---

## Assess feature importance

In [None]:
# Get transformed feature matrix
X_transformed = pipeline_clf[:-1].transform(X_train)

# Get feature mask from correlation filter
model_selector = pipeline_clf.named_steps['correlation_filter']
selected_mask = model_selector.get_support()

# Apply mask to original feature names
all_features = X_train.columns
selected_features = all_features[selected_mask]

# Get feature importances
model = pipeline_clf.named_steps['model']
importances = model.feature_importances_

# Create importance DataFrame
importance_df = pd.DataFrame({
    'Feature': selected_features,
    'Importance': importances
}).sort_values(by='Importance', ascending=False)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.barplot(data=importance_df, x='Importance', y='Feature', palette='viridis')
plt.title(f'Feature Importance - {model.__class__.__name__}')
plt.xlabel('Importance Score')
plt.ylabel('Feature')
plt.tight_layout()
plt.show()
print(importance_df)

---

## Evaluate Pipeline on Train and Test Sets

In [None]:
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import numpy as np

def regression_evaluation(X, y, pipeline):
    prediction = pipeline.predict(X)
    print('R2 Score:', r2_score(y, prediction).round(3))
    print('Mean Absolute Error:', mean_absolute_error(y, prediction).round(3))
    print('Mean Squared Error:', mean_squared_error(y, prediction).round(3))
    print('Root Mean Squared Error:', np.sqrt(mean_squared_error(y, prediction)).round(3))
    print("\n")

def regression_performance(X_train, y_train, X_test, y_test, pipeline):
    print("Model Evaluation \n")
    print("* Train Set")
    regression_evaluation(X_train, y_train, pipeline)
    print("* Test Set")
    regression_evaluation(X_test, y_test, pipeline)


In [None]:
regression_performance(X_train, y_train, X_test, y_test, pipeline_clf)


We used R2 Score, MAE and RMSE to evaluate the model performance, since Confusion Matrix and Accuracy are not suitable for regression problems.

**Evaluation Conclusion**

The XGBRegressor demonstrates strong predictive performance and generalization capability in estimating medical insurance costs. Its test set R² score of 0.891 indicates that the model explains approximately 89% of the variance in insurance charges for unseen data, which is excellent for a real-world regression task. Low and consistent error across training and test sets shows good generalization with minimal overfitting. Feature importance analysis confirms which variables most influence the cost, helping the business better understand risk drivers.


# Push files to Repo

We will generate the following file
* Train set
* Test set
* Data cleaning and Feature Engineering pipeline
* Modeling pipeline
* features importance plot

In [None]:
import joblib
import os

version = "v1"
file_path = f"outputs/ml_pipelines/{version}"

try:
    os.makedirs(file_path)
except Exception as e:
    print(e)

## Train Set

In [None]:
print(X_train.shape)
X_train.head()

In [None]:
X_train.to_csv(f"{file_path}/X_train.csv", index=False)

In [None]:
y_train

In [None]:
y_train.to_csv(f"{file_path}/y_train.csv", index=False)

## Test Set

In [None]:
print(X_test.shape)
X_test.head()

In [None]:
X_test.to_csv(f"{file_path}/X_test.csv", index=False)

In [None]:
y_test

In [None]:
y_test.to_csv(f"{file_path}/y_test.csv", index=False)

## ML Pipelines

### Feature Engineering Pipeline

In [None]:
joblib.dump(pipeline_data_cleaning_feat_eng, 
            f"{file_path}/pipeline_data_cleaning_feat_eng.pkl")

Pipeline responsible to transform the predicted target back to the original scale.

In [None]:
from sklearn.preprocessing import PowerTransformer

pt = PowerTransformer(method='yeo-johnson')

joblib.dump(pt, f"{file_path}/power_transformer.pkl")

### Modeling Pipeline

In [None]:
joblib.dump(pipeline_clf, f"{file_path}/clf_pipeline_model.pkl")

## Feature Importance Plot and CSV

In [None]:
plt.figure(figsize=(10, 6))
sns.barplot(data=importance_df, x='Importance', y='Feature', palette='viridis')
plt.title('Feature Importance - XGBRegressor')
plt.xlabel('Importance Score')
plt.ylabel('Feature')
plt.tight_layout()

# Save to PNG
plt.savefig(f"{file_path}/feature_importance_xgb.png", dpi=300)
plt.show()

In [None]:
importance_df.to_csv(f"{file_path}/feature_importance_xgb.csv", index=False)

---

Testing the prediction on a single raw value

In [None]:
df_sample = df.sample(1, random_state=421)
print("Sample Input Data:\n", df_sample)

predict_sample = df_sample.drop(['charges'], axis=1)

# Transform using the already fitted pipeline
fe_df = pipeline_data_cleaning_feat_eng.transform(predict_sample)
fe_df = pd.DataFrame(fe_df, columns=predict_sample.columns, index=predict_sample.index)
print("Transformed Sample Input Data:\n", fe_df)

# Predict using the full pipeline
predicted_charges = pipeline_clf.predict(fe_df)
print("Predicted Charges:\n", predicted_charges)
print("Actual Charges:\n", df_sample['charges'].values)
