In [None]:
from uuid import uuid4

import numpy as np
import pandas as pd
from distributed import LocalCluster
from sklearn.base import BaseEstimator
from sklearn.cluster import KMeans
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.metrics import mean_absolute_error
from sklearn.pipeline import Pipeline

from modelforge.experiments.pipeline_builders.pipeline_factory import PipelineFactory
from modelforge.experiments.pipeline_builders.prediction_loss_set_pipeline_builder import \
    PredictionLossSetPipelineBuilder
from modelforge.model_clustering.consolidation.retraining_consolidation_strategy import RetrainingConsolidationStrategy
from modelforge.model_clustering.entity.model_dataset import ModelDataSet
from modelforge.model_clustering.entity.model_entity import ModelEntity
from modelforge.model_clustering.evaluator.model_entity_data_merger import PandasModelEntityDataMerger
from modelforge.model_clustering.transformer.sampler.set.criterion_selector.target_selector import TargetSelector
from modelforge.model_clustering.transformer.sampler.set.measure.entropy import Entropy
from modelforge.model_clustering.transformer.sampler.set.uniform_set_sampler import UniformSetSampler


In [None]:
# Wrap a regular sklearn model as kwargs are not allowed for sklearn pipelines
class ModelWrapper(Pipeline, BaseEstimator):
    def __init__(self, model: BaseEstimator):
        super().__init__(steps=[])
        self.model = model
        self._is_fitted = False

    def fit(self, x_train: pd.DataFrame, y_train: pd.Series = None, **kwargs):
        self.model.fit(x_train, y_train)
        self._is_fitted = True
        return self

    def predict(self, x: pd.DataFrame, **kwargs) -> pd.Series:
        if not self._is_fitted:
            raise ValueError("Model is not fitted yet. Call 'fit' before 'predict'.")
        return self.model.predict(x)

# Generate some random data. Use your own data here
def generate_random_data() -> pd.DataFrame:
    data = {
        "a": np.random.random(100),
        "b": np.random.random(100),
        "c": np.random.random(100),
    }
    return pd.DataFrame(data)


# Train a dummy model
def train_model(x: pd.DataFrame, y: pd.Series) -> ExtraTreesRegressor:
    model = ExtraTreesRegressor()
    model.fit(x, y)
    return model


def create_model_entity() -> ModelEntity:
    # Generate random data
    x = generate_random_data()
    y = pd.Series(np.random.random(100))

    # Train test split
    x_train = x[:80]
    y_train = y[:80]
    x_test = x[80:]
    y_test = y[80:]

    model = train_model(x_train, y_train)
    # Save model, training data and loss function to a ModelEntity. ModelEntity will cache the data to "save_dir"
    model_entity = ModelEntity(
        # Change the save_dir
        path="save_dir",
        id=str(uuid4()),
        pipeline=model,
        train_x=x_train,
        train_y=y_train,
        test_x=x_test,
        test_y=y_test,
        loss=mean_absolute_error,
        feature_list=["a", "b", "c"],
        # Add any other metadata you want to store
        metadata={"description": "Test model"},
    )

    return model_entity

# Create a dataset of ModelEntity objects
dataset = ModelDataSet.from_iterable([create_model_entity() for _ in range(100)])

In [None]:
# Set up a local Dask cluster
client = LocalCluster(n_workers=1, threads_per_worker=1).get_client()

In [None]:
pipeline_factory = PipelineFactory(
    dataset,
    client,
    # We want to retrain for consolidation and merge the models' data
    RetrainingConsolidationStrategy(ModelWrapper(ExtraTreesRegressor()), PandasModelEntityDataMerger()),
)
# Create the embedding pipeline
pipeline = PredictionLossSetPipelineBuilder(
    pipeline_factory,
    # We use 10 clusters. Change this to your needs
    KMeans(n_clusters=10),
    # The embedding dimension is 3. Change this to your needs
    UniformSetSampler(3, TargetSelector(), Entropy()),
).build_pipeline()
pipeline.fit(dataset)

In [None]:
from modelforge.model_clustering.cluster.grid_search import ModelConsolidationScore

# Calculate the score
score: dict = pipeline.score(dataset)

consolidation_score = ModelConsolidationScore.from_dict(score)

print("Cluster loss", consolidation_score.cluster_loss)
clustering_df, embedding_df = consolidation_score.clustering.to_dataframe()
embedding_df

In [None]:
clustering_df