# Task
Re-implement the SHIA pipeline from scratch in the notebook, ensuring it is correctly structured and includes steps for data loading, preprocessing, model training (base models with Dask, meta-learners, and a final blender), generating meta-features, simulating incremental data arrival, incremental training of the blender, evaluation, saving models, and logging results with MLflow. The pipeline should utilize Spark and Dask for distributed processing and parallel training where appropriate.

In [None]:
# ----------------------------
# IMPORTS
# ----------------------------
# !pip install pyspark dask distributed pandas scikit-learn joblib mlflow requests --quiet

import os
import requests
import joblib
import numpy as np
import pandas as pd
from pathlib import Path

# Spark
# from pyspark.sql import SparkSession # Not needed in this subtask

# Dask
# from dask.distributed import Client, LocalCluster # Not needed in this subtask

# ML
# from sklearn.model_selection import train_test_split # Not needed in this subtask
# from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier # Not needed in this subtask
# from sklearn.svm import SVC # Not needed in this subtask
# from sklearn.linear_model import LogisticRegression # Not needed in this subtask
# from sklearn.metrics import accuracy_score, f1_score # Not needed in this subtask
# from sklearn.preprocessing import StandardScaler # Not needed in this subtask

# Stacking / SHIA-level
# from sklearn.ensemble import StackingClassifier # Not needed in this subtask
# from xgboost import XGBClassifier # Not needed in this subtask

import mlflow

# Repro
RND = 42
np.random.seed(RND)

# ----------------------------
# 1) Descargar CSV localmente (Spark no lee https directamente)
# ----------------------------
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
local_path = "/tmp/titanic.csv"
os.makedirs("/tmp", exist_ok=True)

if not os.path.exists(local_path):
    print("Descargando dataset:", url)
    resp = requests.get(url)
    resp.raise_for_status()
    with open(local_path, "wb") as f:
        f.write(resp.content)
    print("Descarga completa ->", local_path)
else:
    print("Archivo ya existe:", local_path)

# ----------------------------
# 2) Cargar con Pandas (inspección rápida)
# ----------------------------
pdf = pd.read_csv(local_path)
print("Pandas df shape:", pdf.shape)
display(pdf.head(3))

Archivo ya existe: /tmp/titanic.csv
Pandas df shape: (891, 12)


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S


**Reasoning**:
Initialize a Spark session and convert the Pandas DataFrame to a Spark DataFrame for potential future distributed processing steps.



In [None]:
# ----------------------------
# 3) Inicializar Spark
# ----------------------------
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SHIA_Enterprise_Hybrid") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()
print("Spark versión:", spark.version)

# Convertir Pandas -> Spark DataFrame
spark_df = spark.createDataFrame(pdf)
print("Spark DF creado:")
spark_df.show(3)

Spark versión: 3.5.1
Spark DF creado:
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|  NaN|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|  NaN|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



**Reasoning**:
Convert the Pandas DataFrame to a Dask DataFrame for potential future parallel processing steps with Dask.



In [None]:
# ----------------------------
# 4) Convertir a Dask DataFrame (opcional)
# ----------------------------
import dask.dataframe as dd
ddf = dd.from_pandas(pdf, npartitions=4)
print("Dask DataFrame particionado en:", ddf.npartitions)

Dask DataFrame particionado en: 4


**Reasoning**:
Perform basic preprocessing steps on the Spark DataFrame (filling missing values and encoding 'Sex') and then convert the cleaned Spark DataFrame back to a Pandas DataFrame for subsequent steps.



In [None]:
# ----------------------------
# 5) Preprocesamiento (Spark/Pandas)
#    - Rellenar nulos, codificar sexo, seleccionar features
# ----------------------------
from pyspark.sql.functions import when, col

# Hacemos transformaciones en Spark (para simular pipeline a escala)
spark_df = spark_df.fillna({'Age': 30, 'Embarked': 'S'})
spark_df = spark_df.withColumn("Sex", when(col("Sex") == "male", 1).otherwise(0))

# Traer a Pandas para el resto del flujo (meta/SHIA)
pdf_clean = spark_df.toPandas()

features = ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"]
target = "Survived"

X = pdf_clean[features].copy()
y = pdf_clean[target].copy()
print("Features shape:", X.shape)

Features shape: (891, 6)


**Reasoning**:
Split the data into training, validation, and test sets to prepare for model training and evaluation.



In [None]:
# ----------------------------
# 6) Train/Val/Test split (Pandas)
# ----------------------------
from sklearn.model_selection import train_test_split

X_train, X_hold, y_train, y_hold = train_test_split(X, y, test_size=0.30, stratify=y, random_state=RND)
X_val, X_test, y_val, y_test = train_test_split(X_hold, y_hold, test_size=0.5, stratify=y_hold, random_state=RND)
print("Train/Val/Test shapes:", X_train.shape, X_val.shape, X_test.shape)

Train/Val/Test shapes: (623, 6) (134, 6) (134, 6)


**Reasoning**:
Initialize a Dask LocalCluster and Client to set up the environment for parallel model training.



In [None]:
# ----------------------------
# 7) Iniciar Dask (LocalCluster)
# ----------------------------
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=4, threads_per_worker=1, memory_limit="2GB")
client = Client(cluster)
print("Dask dashboard:", client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40623 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:35843
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:40623/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45777'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:32995'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44823'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:36443'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:44493 name: 3
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44493
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42024
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:37567 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://

Dask dashboard: http://127.0.0.1:40623/status


**Reasoning**:
Define and train the base models in parallel using Dask, then collect the trained models and save them locally.



In [None]:
# ----------------------------
# 8) Entrenamiento de modelos base en paralelo con Dask
#    - Función que entrena y devuelve el modelo (se recoge en el driver)
# ----------------------------

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression

def train_model_return(name, model, X_tr, y_tr):
    """
    Entrena un modelo (pandas DataFrame/Series) y devuelve el objeto ajustado.
    """
    model.fit(X_tr, y_tr)
    return (name, model)

# modelos base
base_models = {
    "M1_rf": RandomForestClassifier(n_estimators=100, random_state=RND),
    "M2_gb": GradientBoostingClassifier(random_state=RND),
    "M3_svc": SVC(probability=True, kernel='rbf', C=2, random_state=RND),
    "M4_lr": LogisticRegression(max_iter=500, random_state=RND)
}

# enviar tareas a Dask (los datos X_train,y_train se serializan hacia los workers)
futures = []
for name, model in base_models.items():
    fut = client.submit(train_model_return, name, model, X_train, y_train)
    futures.append(fut)

# recoger resultados (modelos ajustados en driver)
trained = client.gather(futures)
models_trained = {name: mdl for name, mdl in trained}

# Guardar localmente (driver) con joblib
models_dir = Path("models")
models_dir.mkdir(exist_ok=True)
for name, mdl in models_trained.items():
    path = models_dir / f"{name}.joblib"
    joblib.dump(mdl, path)
    print("Guardado:", path)

print("Modelos base entrenados:", list(models_trained.keys()))

Guardado: models/M1_rf.joblib
Guardado: models/M2_gb.joblib
Guardado: models/M3_svc.joblib
Guardado: models/M4_lr.joblib
Modelos base entrenados: ['M1_rf', 'M2_gb', 'M3_svc', 'M4_lr']


**Reasoning**:
Generate out-of-fold predictions for the training set using cross-validation with the trained base models to create meta-features for the meta-learners.



In [None]:
# ----------------------------
# 9) Generar meta-features con validación cruzada (para entrenamiento del meta-learner)
# ----------------------------
from sklearn.model_selection import StratifiedKFold

# Initialize StratifiedKFold
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=RND)

# Dictionary to store out-of-fold predictions
meta_train_oof = {}

# Iterate through base models
for name, model in models_trained.items():
    print(f"Generating OOF predictions for {name}...")
    # Array to store OOF predictions for the current model
    oof_preds = np.zeros(X_train.shape[0])

    # Iterate through cross-validation splits
    for fold, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train)):
        # Create a copy of the model to train on the current fold
        model_copy = model.__class__(**model.get_params())

        # Train the model copy on the training fold
        model_copy.fit(X_train.iloc[train_idx], y_train.iloc[train_idx])

        # Make predictions (probabilities for the positive class) on the validation fold
        # Check if the model has predict_proba, otherwise use predict
        if hasattr(model_copy, 'predict_proba'):
            predictions = model_copy.predict_proba(X_train.iloc[val_idx])[:, 1]
        else:
            predictions = model_copy.predict(X_train.iloc[val_idx])

        # Store predictions in the OOF array
        oof_preds[val_idx] = predictions

    # Store OOF predictions for the current model
    meta_train_oof[name] = oof_preds
    print(f"Finished OOF predictions for {name}.")

print("\nMeta-features (OOF predictions) generated:", meta_train_oof.keys())

Generating OOF predictions for M1_rf...
Finished OOF predictions for M1_rf.
Generating OOF predictions for M2_gb...
Finished OOF predictions for M2_gb.
Generating OOF predictions for M3_svc...
Finished OOF predictions for M3_svc.
Generating OOF predictions for M4_lr...
Finished OOF predictions for M4_lr.

Meta-features (OOF predictions) generated: dict_keys(['M1_rf', 'M2_gb', 'M3_svc', 'M4_lr'])


## Generación de meta-features (validación y prueba)

### Subtask:
Generar las meta-features para los conjuntos de validación (`X_val`) y prueba (`X_test`) utilizando las predicciones de los modelos base *completamente entrenados* (en todo el conjunto de entrenamiento, disponibles en `models_trained`).


**Reasoning**:
Iterate through the trained base models, predict probabilities on the validation and test sets, and store these predictions in dictionaries. Then, stack these predictions into NumPy arrays and print their shapes.



In [None]:
# Iterate through fully trained base models
meta_val_preds = {}
meta_test_preds = {}

for name, model in models_trained.items():
    print(f"Generating meta-features for validation and test sets using {name}...")
    # Predict probabilities on validation set
    if hasattr(model, 'predict_proba'):
        meta_val_preds[name] = model.predict_proba(X_val)[:, 1]
        meta_test_preds[name] = model.predict_proba(X_test)[:, 1]
    else:
        meta_val_preds[name] = model.predict(X_val)
        meta_test_preds[name] = model.predict(X_test)
    print(f"Finished meta-feature generation for {name}.")

# Stack the meta-features into NumPy arrays
meta_val = np.column_stack(list(meta_val_preds.values()))
meta_test = np.column_stack(list(meta_test_preds.values()))

# Print the shapes to verify
print("\nShape of stacked meta_val:", meta_val.shape)
print("Shape of stacked meta_test:", meta_test.shape)

Generating meta-features for validation and test sets using M1_rf...
Finished meta-feature generation for M1_rf.
Generating meta-features for validation and test sets using M2_gb...
Finished meta-feature generation for M2_gb.
Generating meta-features for validation and test sets using M3_svc...
Finished meta-feature generation for M3_svc.
Generating meta-features for validation and test sets using M4_lr...
Finished meta-feature generation for M4_lr.

Shape of stacked meta_val: (134, 4)
Shape of stacked meta_test: (134, 4)


## Entrenamiento de meta-learners (h1 y h2)

### Subtask:
Entrenar los modelos H1 y H2 utilizando las meta-features generadas a partir de la validación cruzada en el conjunto de entrenamiento (`meta_train_oof`) y el target de entrenamiento (`y_train`).


**Reasoning**:
Train the meta-learners H1 and H2 using the generated meta-features from the training set.



In [None]:
# ----------------------------
# 9) Entrenar meta-learners H1 y H2 (Pandas/scikit-learn)
#    - Usar meta-features OOF (meta_train) y targets (y_train)
# ----------------------------
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression

# Ensure meta_train is a numpy array or pandas DataFrame
if isinstance(meta_train_oof, dict):
    # If meta_train is still a dictionary of arrays, stack it
    meta_train = np.column_stack(list(meta_train_oof.values()))
else:
    # If it's already a numpy array, just use it
    meta_train = meta_train_oof


mid = max(1, meta_train.shape[1] // 2)
print(f"Dividing meta-features at column index: {mid}")

# modelos meta
H1 = GradientBoostingClassifier(random_state=RND)
H2 = LogisticRegression(max_iter=1000, random_state=RND)

# Para entrenar H1/H2 usamos las meta-features generadas con OOF (meta_train) y y_train
# Ensure y_train is a numpy array or pandas Series
if isinstance(y_train, pd.Series):
    y_train_np = y_train.values
else:
    y_train_np = y_train

print("Training H1...")
H1.fit(meta_train[:, :mid], y_train_np)
print("H1 trained.")

print("Training H2...")
H2.fit(meta_train[:, mid:], y_train_np)
print("H2 trained.")

print("Meta-learners H1 and H2 trained.")

Dividing meta-features at column index: 2
Training H1...
H1 trained.
Training H2...
H2 trained.
Meta-learners H1 and H2 trained.


## Entrenamiento del blender final (inicial)

### Subtask:
Entrenar el modelo `StackingClassifier` utilizando las predicciones probabilísticas de H1 y H2 sobre el conjunto de validación (`meta_val`) como features de entrada, y el target de validación (`y_val`).


**Reasoning**:
Instantiate and fit the StackingClassifier using the predictions of H1 and H2 on the validation set as input features and the validation target.



In [None]:
from sklearn.ensemble import StackingClassifier
from xgboost import XGBClassifier

# ----------------------------
# 10) Entrenar blender final Hf (inicial)
# ----------------------------

# features for Hf (outputs probabilísticos de H1/H2 sobre validación)
# Ensure meta_val is a numpy array
if isinstance(meta_val, dict):
    # If meta_val is still a dictionary of arrays, stack it
    meta_val = np.column_stack(list(meta_val.values()))

# Calculate the mid point based on the number of columns in meta_val
mid = max(1, meta_val.shape[1] // 2)

# Ensure y_val is a numpy array or pandas Series
if isinstance(y_val, pd.Series):
    y_val_np = y_val.values
else:
    y_val_np = y_val

# Generate the input features for the final blender on the validation set
meta_for_Hf_val = np.column_stack([
    H1.predict_proba(meta_val[:, :mid])[:, 1],
    H2.predict_proba(meta_val[:, mid:])[:, 1]
])

# Final blender Hf: StackingClassifier with H1/H2 as estimators
final_blender = StackingClassifier(
    estimators=[('h1', H1), ('h2', H2)],
    final_estimator=XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=RND),
    cv=3,
    n_jobs=1
)

print("Training final blender (StackingClassifier)...")
# Fit the StackingClassifier to the meta_for_Hf_val (predictions of H1/H2 on validation) and y_val
final_blender.fit(meta_for_Hf_val, y_val_np)
print("Final blender (StackingClassifier) trained.")

Training final blender (StackingClassifier)...


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


Final blender (StackingClassifier) trained.


**Reasoning**:
Evaluate the trained StackingClassifier on the test set using the predictions of H1 and H2 on meta_test as input features.



In [None]:
from sklearn.metrics import accuracy_score, f1_score

# Ensure meta_test is a numpy array or pandas DataFrame
if isinstance(meta_test, dict):
    # If meta_test is still a dictionary of arrays, stack it
    meta_test = np.column_stack(list(meta_test.values()))

# Calculate the mid point based on the number of columns in meta_test
mid = max(1, meta_test.shape[1] // 2)

# Generate the input features for the final blender on the test set
meta_for_Hf_test = np.column_stack([
    H1.predict_proba(meta_test[:, :mid])[:, 1],
    H2.predict_proba(meta_test[:, mid:])[:, 1]
])

# Use the trained final_blender model to make predictions on the test set meta-features
y_pred_test = final_blender.predict(meta_for_Hf_test)

# Ensure y_test is a numpy array or pandas Series
if isinstance(y_test, pd.Series):
    y_test_np = y_test.values
else:
    y_test_np = y_test

# Calculate the accuracy score
acc = accuracy_score(y_test_np, y_pred_test)

# Calculate the F1 score
f1 = f1_score(y_test_np, y_pred_test)

# Print the results
print("SHIA Final - Test Accuracy:", acc, "F1:", f1)

SHIA Final - Test Accuracy: 0.7164179104477612 F1: 0.6274509803921569


**Reasoning**:
Generate a synthetic dataset by adding noise to a subset of the training data, create a corresponding target variable by sampling from the training target, and generate meta-features for the new data using the trained base models. Then, stack the meta-features into a NumPy array and print the shapes of the new data, target, and meta-features.



In [None]:
# ----------------------------
# 11) Simular llegada de nuevos datos (para entrenamiento incremental)
#     - Generar datos sintéticos con ruido
#     - Generar meta-features para los nuevos datos
# ----------------------------

# 1. Generate new synthetic dataset by adding random noise to a subset of the original training data
new_data_size = int(0.20 * X_train.shape[0])
X_new = X_train.sample(n=new_data_size, random_state=RND, replace=True).reset_index(drop=True)

# Calculate standard deviation for noise column-wise
std_dev = X_train.std()
noise_level = 0.01
noise = np.random.normal(0, noise_level * std_dev, size=X_new.shape)
X_new = X_new + noise
# Ensure numerical columns remain numerical after adding noise
X_new = X_new[features]

# 2. Create a corresponding target variable for the simulated data
y_new = y_train.sample(n=new_data_size, random_state=RND, replace=True).reset_index(drop=True)

# 3. Generate meta-features for the new data using the trained base models
meta_new_preds = {}
for name, model in models_trained.items():
    if hasattr(model, 'predict_proba'):
        meta_new_preds[name] = model.predict_proba(X_new)[:, 1]
    else:
        meta_new_preds[name] = model.predict(X_new)

# 4. Stack the meta-features from meta_new_preds into a NumPy array
meta_new = np.column_stack(list(meta_new_preds.values()))

# 5. Print the shapes of X_new, y_new, and meta_new
print("Shape of simulated new data (X_new):", X_new.shape)
print("Shape of simulated new target (y_new):", y_new.shape)
print("Shape of new meta-features (meta_new):", meta_new.shape)

Shape of simulated new data (X_new): (124, 6)
Shape of simulated new target (y_new): (124,)
Shape of new meta-features (meta_new): (124, 4)


**Reasoning**:
Concatenate the original validation meta-features and target with the new simulated data and then train a new StackingClassifier incrementally.



In [None]:
# 1. Concatenate original validation meta-features and new simulated meta-features
# Ensure meta_val is a numpy array
if isinstance(meta_val, dict):
    meta_val = np.column_stack(list(meta_val.values()))

combined_meta = np.vstack([meta_val, meta_new])
print("Shape of combined meta-features:", combined_meta.shape)

# 2. Concatenate original validation target and new simulated target
# Ensure y_val and y_new are numpy arrays
if isinstance(y_val, pd.Series):
    y_val_np = y_val.values
else:
    y_val_np = y_val

if isinstance(y_new, pd.Series):
    y_new_np = y_new.values
else:
    y_new_np = y_new

combined_y = np.concatenate([y_val_np, y_new_np])
print("Shape of combined target:", combined_y.shape)


# 3. Instantiate a new StackingClassifier model (Hf_new)
# Use the same configuration as the initial blender
Hf_new = StackingClassifier(
    estimators=[('h1', H1), ('h2', H2)],
    final_estimator=XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=RND),
    cv=3,
    n_jobs=1
)

# 4. Train the new StackingClassifier (Hf_new) using the combined data
print("Training new StackingClassifier (Hf_new) incrementally...")
# The meta-features for the final blender are the probabilities from H1 and H2.
# Need to re-generate these probabilities on the combined_meta data.

mid = max(1, combined_meta.shape[1] // 2)

meta_for_Hf_combined = np.column_stack([
    H1.predict_proba(combined_meta[:, :mid])[:, 1],
    H2.predict_proba(combined_meta[:, mid:])[:, 1]
])

Hf_new.fit(meta_for_Hf_combined, combined_y)
print("New StackingClassifier (Hf_new) trained.")

Shape of combined meta-features: (258, 4)
Shape of combined target: (258,)
Training new StackingClassifier (Hf_new) incrementally...
New StackingClassifier (Hf_new) trained.


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


**Reasoning**:
Evaluate the incrementally trained StackingClassifier (Hf_new) on the test set using the specified steps.



In [None]:
# Ensure meta_test is a numpy array or pandas DataFrame
if isinstance(meta_test, dict):
    # If meta_test is still a dictionary of arrays, stack it
    meta_test = np.column_stack(list(meta_test.values()))

# Calculate the mid point based on the number of columns in meta_test
mid = max(1, meta_test.shape[1] // 2)

# Generate the input features for the incremental blender on the test set
meta_for_Hf_test = np.column_stack([
    H1.predict_proba(meta_test[:, :mid])[:, 1],
    H2.predict_proba(meta_test[:, mid:])[:, 1]
])

# Use the incrementally trained Hf_new model to make predictions on the test set meta-features
y_pred_test_inc = Hf_new.predict(meta_for_Hf_test)

# Ensure y_test is a numpy array or pandas Series
if isinstance(y_test, pd.Series):
    y_test_np = y_test.values
else:
    y_test_np = y_test

# Calculate the accuracy score of the incremental blender
acc_inc = accuracy_score(y_test_np, y_pred_test_inc)

# Calculate the F1 score of the incremental blender
f1_inc = f1_score(y_test_np, y_pred_test_inc)

# Print the calculated incremental test accuracy and F1 score
print("SHIA Incremental - Test Accuracy:", acc_inc, "F1:", f1_inc)

SHIA Incremental - Test Accuracy: 0.7388059701492538 F1: 0.6728971962616822


**Reasoning**:
Save the trained models and log metrics and artifacts using MLflow as per the instructions.



In [None]:
# ----------------------------
# 12) Guardar modelos y registrar en MLflow (local)
# ----------------------------
models_dir = Path("models")
models_dir.mkdir(exist_ok=True)

# Save base models
for name, mdl in models_trained.items():
    p = models_dir / f"{name}.joblib"
    joblib.dump(mdl, p)

# Save meta models & final
joblib.dump(H1, models_dir / "H1.joblib")
joblib.dump(H2, models_dir / "H2.joblib")
joblib.dump(final_blender, models_dir / "Hf_initial.joblib")
joblib.dump(Hf_new, models_dir / "Hf_new.joblib")
# joblib.dump(Hf_final, models_dir / "Hf_final.joblib") # Hf_final was not defined in previous steps
print("Modelos guardados en", models_dir.resolve())

# MLflow logging (local store)
mlflow.set_experiment("SHIA_Enterprise_Hybrid")
with mlflow.start_run(run_name="shia_hybrid_run"):
    mlflow.log_metric("shia_initial_acc", float(acc))
    mlflow.log_metric("shia_initial_f1", float(f1))
    mlflow.log_metric("shia_incremental_acc", float(acc_inc))
    mlflow.log_metric("shia_incremental_f1", float(f1_inc))
    # log artifacts (models)
    for file in models_dir.glob("*.joblib"):
        mlflow.log_artifact(str(file), artifact_path="models")
print("Registro MLflow completado. Revisa la carpeta ./mlruns por detalles.")

Modelos guardados en /content/models
Registro MLflow completado. Revisa la carpeta ./mlruns por detalles.


**Reasoning**:
Close the Dask client and cluster, and stop the Spark session to release resources.



In [None]:
# ----------------------------
# 13) Cleanup recursos
# ----------------------------
client.close()
cluster.close()
spark.stop()
print("Recursos liberados. Fin del pipeline SHIA.")

INFO:distributed.scheduler:Remove client Client-d4edeb42-b065-11f0-80ea-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:42064; closing.
INFO:distributed.scheduler:Remove client Client-d4edeb42-b065-11f0-80ea-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-d4edeb42-b065-11f0-80ea-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1761261351.5833685') (0, 1, 2, 3)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:45777'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:32995'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:44823'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:36443'

Recursos liberados. Fin del pipeline SHIA.


## Summary:

## Summary of Solving Process

This task involved implementing the initial steps of a SHIA (Stacking Hybrid Incremental Adaptation) pipeline, including data loading, preprocessing, distributed base model training using Dask, generating meta-features, training meta-learners (H1 and H2), training the initial final blender, simulating incremental data, training an incremental blender, evaluating both blenders, saving models, and logging results with MLflow.

### Data Analysis Key Findings

*   The dataset was successfully downloaded and loaded into Pandas, Spark, and Dask DataFrames.
*   Basic preprocessing (handling missing values and encoding) was applied to the data using Spark.
*   The data was split into training, validation, and test sets.
*   Base models (Random Forest, Gradient Boosting, SVC, Logistic Regression) were trained in parallel using Dask.
*   Out-of-fold predictions were generated from base models on the training set to create meta-features for training meta-learners.
*   Meta-features were generated for the validation and test sets using the fully trained base models.
*   Two meta-learners (H1 and H2) were trained using the OOF meta-features from the training set, splitting the meta-features at the midpoint (using the first 2 for H1 and the remaining for H2).
*   The initial final blender (`StackingClassifier`) was trained using the predicted probabilities of H1 and H2 on the validation set as input features.
*   Simulated incremental data was generated by adding noise to a subset of the training data, and meta-features were generated for this new data using the base models.
*   An incremental blender was trained on the combined original validation data and the simulated new data.
*   Evaluation on the test set showed the incremental blender achieved a higher F1 score (0.673) compared to the initial blender (0.627).
*   All trained models were saved locally, and evaluation metrics (accuracy and F1 for both initial and incremental blenders) and model artifacts were logged using MLflow.

### Insights or Next Steps

*   The incremental training step demonstrated a positive impact on the F1 score, suggesting the SHIA approach could be beneficial for adapting to new data streams.
*   Further steps would involve evaluating the performance of the combined base models and meta-learners on the incremental data and potentially implementing a more sophisticated strategy for updating the meta-learners or base models in an incremental setting.


In [None]:
# ----------------------------
# 1) Configuración inicial y carga de datos
# ----------------------------
# Imports
import os
import requests
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import mlflow

# Repro
RND = 42
np.random.seed(RND)

# Descargar CSV localmente (Spark no lee https directamente)
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
local_path = "/tmp/titanic.csv"
os.makedirs("/tmp", exist_ok=True)

if not os.path.exists(local_path):
    print("Descargando dataset:", url)
    resp = requests.get(url)
    resp.raise_for_status()
    with open(local_path, "wb") as f:
        f.write(resp.content)
    print("Descarga completa ->", local_path)
else:
    print("Archivo ya existe:", local_path)

# Cargar con Pandas (inspección rápida)
pdf = pd.read_csv(local_path)
print("Pandas df shape:", pdf.shape)
display(pdf.head(3))

Archivo ya existe: /tmp/titanic.csv
Pandas df shape: (891, 12)


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S


In [None]:
# ----------------------------
# 3) Inicializar Dask
# ----------------------------
from dask.distributed import Client, LocalCluster, default_client

# Inicializar Dask LocalCluster
# Allow Dask to find an available port

# Close any existing client and cluster
try:
    client = default_client()
    client.close()
    cluster = client.cluster
    cluster.close()
except:
    pass # No existing client or cluster


cluster = LocalCluster(n_workers=4, threads_per_worker=1, memory_limit="2GB")
client = Client(cluster)
print("Dask dashboard:", client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34537 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:39643
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:34537/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39273'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41453'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40913'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:32867'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:35447 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35447
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:53016
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:37041 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://

Dask dashboard: http://127.0.0.1:34537/status


In [None]:
# ----------------------------
# 2) Inicializar Spark
# ----------------------------
from pyspark.sql import SparkSession

# Inicializar Spark
spark = SparkSession.builder \
    .appName("SHIA_Enterprise_Hybrid") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()
print("Spark versión:", spark.version)

# Convertir Pandas -> Spark DataFrame (para usar Spark en preprocesamiento si es necesario)
spark_df = spark.createDataFrame(pdf)
print("Spark DF creado:")
spark_df.show(3)

Spark versión: 3.5.1
Spark DF creado:
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|  NaN|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|  NaN|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows

