# The Regular usage of collie

In [1]:
import sys
import os
sys.path.append("../..")

import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset

from collie import (
    Transformer,
    Trainer,
    Evaluator,
    Pusher,
    TrainerPayload,
    TransformerPayload,
    TunerPayload,
    Tuner,
    EvaluatorPayload,
    PusherPayload,
    Orchestrator
)

from collie import Event

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
num_samples = 1000
input_dim = 20   
num_classes = 4

## Transformer

In [2]:
class MLPTransformer(Transformer):
    def __init__(self) -> None:
        description = "MLP Transformer for generating synthetic data"
        super().__init__(description=description)

    def handle(self, event) -> Event:

        X = torch.randn(num_samples, input_dim)
        y = torch.randint(0, num_classes, (num_samples,))

        X_data = pd.DataFrame(X.numpy(), columns=[f"feature_{i}" for i in range(input_dim)])
        y_data = pd.DataFrame(y.numpy(), columns=["label"])

        train_data = pd.concat([X_data, y_data], axis=1)

        return Event(
            payload=TransformerPayload(
                train_data=train_data,
                validation_data=None,
                test_data=None
            )
        )

# Tuner

In [3]:
class MLPTuner(Tuner):
    def __init__(self) -> None:
        description = "MLP Tuner for hyperparameter optimization"
        super().__init__(description=description)

    def handle(self, event: Event) -> Event:
        # Find the best hyperparameters (dummy example)
        hyperparameters = {
            "learning_rate": 0.001,
            "batch_size": 32,
        }
        # Need to pass train, validation, test data to the next stage
        return Event(
            payload=TunerPayload(
                hyperparameters=hyperparameters,
                train_data=event.payload.train_data,
                validation_data=event.payload.validation_data,
                test_data=event.payload.test_data
            )
        )

## Trainer

In [5]:
class SimpleClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        return self.fc2(x)


class MLPTrainer(Trainer):
    def __init__(self):

        description = "MLP Trainer for training the model"
        super().__init__(description=description)
        self.model = SimpleClassifier()
        self.criterion = nn.CrossEntropyLoss()
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.optimizer = None
        self.scheduler = None

    def handle(self, event):

        learning_rate = event.payload.hyperparameters.get("learning_rate")
        batch_size = event.payload.hyperparameters.get("batch_size")
        print(f"learning_rate: {learning_rate}, batch_size: {batch_size}")
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=10, gamma=0.1)
        
        train_data = event.payload.train_data

        X = torch.tensor(train_data.drop("label", axis=1).values, dtype=torch.float32)
        y = torch.tensor(train_data["label"].values, dtype=torch.long) 

        dataset = TensorDataset(X, y)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        epochs = 10
        for epoch in range(1, epochs + 1):
            self.model.train()
            total_loss = 0.0
            for xb, yb in dataloader:
                xb, yb = xb.to(self.device), yb.to(self.device)
                self.optimizer.zero_grad()
                logits = self.model(xb)
                loss = self.criterion(logits, yb)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()

            self.mlflow.log_metric("learning rate", self.scheduler.get_last_lr()[0], step=epoch)
            self.mlflow.log_metric("loss", round(total_loss/len(dataloader), 3), step=epoch)
            
        return Event(
            payload=TrainerPayload(
                model=self.model,
                train_loss=total_loss/len(dataloader),
                val_loss=None
            )
        )

## Evaluator

In [6]:
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import accuracy_score
from collie.core.enums.ml_models import ModelFlavor

class MLPEvaluator(Evaluator):
    def __init__(
        self,
    ) -> None:
        description = "MLP Evaluator for evaluating the model"
        super().__init__(description=description)

    def handle(self, event):
        from collie.core.enums.ml_models import MLflowModelStage
        
        experiment_model = event.payload.model
        
        production_model = self.load_latest_model(
            model_name=self.registered_model_name, 
            stage=MLflowModelStage.PRODUCTION,
            flavor=ModelFlavor.PYTORCH
        )
        print(f"Production model: {production_model}")
        
        validation_data = self._generate_validation_data()
        
        experiment_accuracy = self._calculate_accuracy(experiment_model, validation_data)
        
        if production_model is not None:
            production_accuracy = 0#self._calculate_accuracy(production_model, validation_data)
        else:
            production_accuracy = 0
        
        is_better = experiment_accuracy >= production_accuracy
        
        return Event(
            payload=EvaluatorPayload(
                metrics=[
                    {
                        "experiment_accuracy": experiment_accuracy,
                        "production_accuracy": production_accuracy,
                        "accuracy_improvement": experiment_accuracy - production_accuracy
                    }
                ],
                is_better_than_production=is_better
            )
        )
    
    def _generate_validation_data(self):
        torch.manual_seed(42) 
        
        validation_samples = 200
        X_val = torch.randn(validation_samples, input_dim)
        y_val = torch.randint(0, num_classes, (validation_samples,))
        
        return {
            'features': X_val,
            'labels': y_val
        }
    
    def _calculate_accuracy(self, model, validation_data):
        model.eval() 
        
        with torch.no_grad():
            features = validation_data['features']
            labels = validation_data['labels']
        
            device = next(model.parameters()).device
            features = features.to(device)
            
            outputs = model(features)
            probabilities = F.softmax(outputs, dim=1)
            predicted_labels = torch.argmax(probabilities, dim=1).cpu().numpy()
            
            true_labels = labels.numpy()
            accuracy = accuracy_score(true_labels, predicted_labels)
            
            return float(accuracy)
    
    

## Pusher

In [7]:
from collie.core.enums.ml_models import MLflowModelStage
class MLPPusher(Pusher):
    def __init__(self) -> None:
        
        description = "MLP Pusher for pushing the model to registry"
        super().__init__(
            description=description,
            target_stage=MLflowModelStage.PRODUCTION
        )

    def handle(self, event):
        return Event(
            payload=PusherPayload(
                model_uri="mlp_model_uri",
            )
        )

## Main

In [9]:
orchestrator = Orchestrator(
    tracking_uri="http://localhost:5001",
    components=[
        MLPTransformer(),
        MLPTuner(),
        MLPTrainer(),
        MLPEvaluator(),
        MLPPusher()
    ],
    mlflow_tags={"Example": "MLP"},
    experiment_name="MLP",
    registered_model_name="MLPClassifier"
)
orchestrator.run()

2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Skip logging GPU metrics. Set logger level to DEBUG for more details.
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Started monitoring system metrics.
  return _dataset_source_registry.resolve(
  return _dataset_source_registry.resolve(
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Stopping system metrics monitoring...
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Successfully terminated system metrics monitoring!
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Skip logging GPU metrics. Set logger level to DEBUG for more details.
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Started monitoring system metrics.
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Stopping system metrics monitoring...
2025/10/20 21:35:40 INFO mlflow.system_metrics.system_metrics_monitor: Successfu

🏃 View run Transformer at: http://localhost:5001/#/experiments/1/runs/f23df0c47fca4cb5902ad184f13dba2f
🧪 View experiment at: http://localhost:5001/#/experiments/1
🏃 View run Tuner at: http://localhost:5001/#/experiments/1/runs/a6aba244fce443f4bb37f6b841ec6818
🧪 View experiment at: http://localhost:5001/#/experiments/1
learning_rate: 0.001, batch_size: 32


2025/10/20 21:35:48 INFO mlflow.system_metrics.system_metrics_monitor: Stopping system metrics monitoring...
2025/10/20 21:35:48 INFO mlflow.system_metrics.system_metrics_monitor: Successfully terminated system metrics monitoring!
2025/10/20 21:35:48 INFO mlflow.system_metrics.system_metrics_monitor: Skip logging GPU metrics. Set logger level to DEBUG for more details.
2025/10/20 21:35:48 INFO mlflow.system_metrics.system_metrics_monitor: Started monitoring system metrics.
  latest_versions = self._mlflow_client.get_latest_versions(model_name, stages=[stage])


🏃 View run Trainer at: http://localhost:5001/#/experiments/1/runs/2872f14f4549455cb654827df4ac8665
🧪 View experiment at: http://localhost:5001/#/experiments/1
Production model: SimpleClassifier(
  (fc1): Linear(in_features=20, out_features=64, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=64, out_features=4, bias=True)
)


Registered model 'MLPClassifier' already exists. Creating a new version of this model...
2025/10/20 21:35:49 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: MLPClassifier, version 7
Created version '7' of model 'MLPClassifier'.
  latest_versions = self._mlflow_client.get_latest_versions(model_name, stages=stages)
  self._mlflow_client.transition_model_version_stage(
2025/10/20 21:35:49 INFO mlflow.system_metrics.system_metrics_monitor: Stopping system metrics monitoring...
2025/10/20 21:35:49 INFO mlflow.system_metrics.system_metrics_monitor: Successfully terminated system metrics monitoring!


🏃 View run Evaluator at: http://localhost:5001/#/experiments/1/runs/0f2d8da658f14478a44f161414f2b03f
🧪 View experiment at: http://localhost:5001/#/experiments/1
🏃 View run Orchestrator at: http://localhost:5001/#/experiments/1/runs/c2b80ebe7e094b92be954f38f8d556c9
🧪 View experiment at: http://localhost:5001/#/experiments/1


OrchestratorError: [Orchestrator] Component error in orchestration: [Evaluator] Evaluator failed: [MLflow Operation] Failed to transition model 'MLPClassifier' v4 to Staging: RESOURCE_DOES_NOT_EXIST: Model Version (name=MLPClassifier, version=4) not found

In [2]:
from typing import (
    Dict, 
    Any,
    Optional,
    List,
)
from pydantic import BaseModel, ConfigDict

import pandas as pd

class TrainerPayload(BaseModel):
    model: Any = None
   

    model_config = ConfigDict(arbitrary_types_allowed=True)


In [3]:
TrainerPayload.a = 1