# Modelo de predicción de esperanza de vida

## Construyendo el *pipeline* (objeto)

In [1]:
import numpy as np

def engineer_features(X):
    X = X.copy()
    # 1. Combine thinness features
    X['thinness'] = X[['thinness1-19', 'thinness5-9']].mean(axis=1)
    X.drop(columns=['thinness1-19', 'thinness5-9'], inplace=True)
    # 2. Handle Income zeros
    X['Income'] = X['Income'].replace(0, np.nan)
    return X

# Create custom transformer from function
from sklearn.preprocessing import FunctionTransformer
feature_engineer_transformer = FunctionTransformer(engineer_features)

FuntionTransformer es una clase de scikit-learn que nos permite crear transformadores a partir de funciones que definamos. 

Otra forma de hacerlo sería definiendo directamente el transformador personalizado:

In [2]:
from sklearn.base import BaseEstimator, TransformerMixin

class FeatureEngineerTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._feature_names_in = None

    def fit(self, X, y=None):
        # Store feature names using scikit-learn's built-in validation
        self._check_feature_names(X, reset=True)
        return self

    def transform(self, X):
        X = X.copy()
        
        # 1. Combine thinness features
        X['thinness'] = X[['thinness1-19', 'thinness5-9']].mean(axis=1)
        X.drop(columns=['thinness1-19', 'thinness5-9'], inplace=True)
        # 2. Handle Income zeros
        X['Income'] = X['Income'].replace(0, np.nan)
        
        return X
    
feature_engineer_transformer = FeatureEngineerTransformer() # Create instance (object) of custom transformer

Hemos decidido utilizar KNNImputer. Podríamos simplemente añadirlo al pipeline:

In [3]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler

# Create preprocessing pipeline
preprocessor = Pipeline([
    # Apply custom feature engineering
    ('feature_engineering', feature_engineer_transformer),
    
    # First handle known good imputations
    ('initial_imputer', ColumnTransformer([
        ('median_imputer', SimpleImputer(strategy='median'), ['Schooling', 'Income', 'Total expenditure'])
    ], remainder='passthrough')),
    
    # Then scale all features (required for KNN imputation)
    ('scaler', StandardScaler()),
    
    # Finally handle remaining missing values with KNN on scaled data
    ('knn_imputer', KNNImputer(n_neighbors=5))
])

Sin embargo, como hemos decidido calcular el número k de vecinos más cercanos de forma dinámica, necesitamos crear un transformador personalizado.

In [4]:
class DynamicKNNImputer(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.knn = None
        
    def fit(self, X, y=None):
        # Calculate k based on number of samples in training data
        k = int(np.sqrt(X.shape[0]))
        self.knn = KNNImputer(n_neighbors=k)
        self.knn.fit(X)
        return self
    
    def transform(self, X):
        return self.knn.transform(X)

Definimos ahora el pipeline completo de preprocesamiento:

In [5]:
preprocessor = Pipeline([
    # Apply custom feature engineering
    ('feature_engineering', feature_engineer_transformer),
    
    # First handle known good imputations
    ('initial_imputer', ColumnTransformer([
        ('median_imputer', SimpleImputer(strategy='median'), ['Schooling', 'Income', 'Total expenditure'])
    ], remainder='passthrough')),
    
    # Then scale all features (required for KNN imputation)
    ('scaler', StandardScaler()),
    
    # Finally handle remaining missing values with KNN on scaled data
    ('knn_imputer', DynamicKNNImputer())
])


y lo integramos con dos modelos para comparar su rendimiento.

In [6]:
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor

pipeline1 = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', LinearRegression())
])

pipeline2 = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
])


## *Pipeline* de entrenamiento (proceso)

### Carga y preparación de datos

In [7]:
import pandas as pd
df = pd.read_csv("../data/life_expectancy.csv")
df.dropna(subset=['LifeExpectancy'], inplace=True)

Normalmente eliminaremos las columnas con las que no trabajaremos: 'Country', 'Year' y 'Status' porque no las consideramos parte del modelo. Sin embargo, en este caso mantendremos 'Country' en una variable para poder separar los grupos de entrenamiento, validación y test garantizando que no se mezclen datos de un mismo país en diferentes conjuntos, evitando así la fuga de datos.

In [8]:
non_used_features = ['Country', 'Year', 'Status',
            'InfantDeaths', # highly correlated with 'infantDeaths'
            'Population', # low correlation and lots of missing values
            'HepatitisB', # low correlation and lots of missing values
            'Measles', # odd distribution and low correlation
]

groups = df['Country']  # guardada para separación de conjuntos
df.drop(columns=["Country", "Year", "Status"], inplace=True)


### Separación de conjuntos de entrenamiento y test

In [9]:
X = df.drop(columns=['LifeExpectancy'])  # Features (exclude target)
y = df['LifeExpectancy']                # Target

In [10]:
from sklearn.model_selection import GroupShuffleSplit

gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

for train_idx, test_idx in gss.split(X, y, groups):
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

### Elección de modelo mediante *cross-validation*

Realizamos una validación cruzada usando como *fold* la separación por países.

In [11]:
from sklearn.model_selection import cross_val_score, GroupKFold

# Create group-aware cross-validation
group_kfold = GroupKFold(n_splits=5)

# Get groups for training set only
groups_train = groups.iloc[train_idx]

# Cross-validate both models
scores1 = cross_val_score(
    pipeline1,
    X_train,
    y_train,
    cv=group_kfold,
    groups=groups_train,
    scoring='neg_mean_absolute_error'
)

scores2 = cross_val_score(
    pipeline2,
    X_train,
    y_train,
    cv=group_kfold,
    groups=groups_train,
    scoring='neg_mean_absolute_error'
)

# Convert to positive MAE values
mae1 = -scores1
mae2 = -scores2

# Print cross-validation results
print("Linear Regression CV Results:")
print(f"  Mean MAE: {mae1.mean():.2f} ± {mae1.std():.2f}")
print("\nRandom Forest CV Results:")
print(f"  Mean MAE: {mae2.mean():.2f} ± {mae2.std():.2f}")

Linear Regression CV Results:
  Mean MAE: 2.82 ± 0.33

Random Forest CV Results:
  Mean MAE: 2.07 ± 0.14


### Evaluación final del modelo elegido

Finalmente, tras la evaluación de los modelos, seleccionamos el mejor y lo entrenamos con todos los datos de entrenamiento para obtener el modelo final.

In [12]:
# After cross-validation, select best model
best_model = pipeline1 if mae1.mean() < mae2.mean() else pipeline2

# Final evaluation on test set
best_model.fit(X_train, y_train)
from sklearn.metrics import mean_absolute_error
y_pred = best_model.predict(X_test)
test_mae = mean_absolute_error(y_test, y_pred)

print("\nTest Set Results:")
print(f"Best Model: {'Linear Regression' if best_model == pipeline1 else 'Random Forest'}")
print(f"Test MAE: {test_mae:.2f}")


Test Set Results:
Best Model: Random Forest
Test MAE: 2.12


## Guardando el modelo

In [13]:
import joblib
joblib.dump(best_model, "../production/life_expectancy.joblib")

['../production/life_expectancy.joblib']