# Import dependencies

In [67]:
import torch
import torch.nn as nn
import mlflow.pytorch
import torch.optim as optim
from mlflow.tracking import MlflowClient
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from safetensors.torch import save_file

# Create dataset

In [68]:
scaler = MinMaxScaler()
X, y = make_classification(n_samples=100_000, n_features=20, n_informative=18, n_redundant=2, n_repeated=0, n_classes=2, random_state=42)
X = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)

cuda


# Model


In [69]:
class Model(nn.Module):
    def __init__(self, hidden_layer=64):
        super(Model, self).__init__()
        self.fc1 = nn.Linear(20, hidden_layer)
        self.fc2 = nn.Linear(hidden_layer, hidden_layer)
        self.fc3 = nn.Linear(hidden_layer, hidden_layer)
        self.fc4 = nn.Linear(hidden_layer, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.sigmoid(self.fc4(x))
        return x

In [70]:
GRID_CONFIG = {
  'hidden_layer': [64, 128, 512],
  'learning_rate': [0.01, 0.005, 0.001],
}

EPOCH = 30

# Train & Test

In [72]:
for hidden_layer in GRID_CONFIG['hidden_layer']:
  for lr in GRID_CONFIG['learning_rate']:
    with mlflow.start_run():
      # Create model
      model = Model(hidden_layer=hidden_layer).to(device)

      # Loss function and optimizer
      criterion = nn.BCELoss()
      optimizer = optim.AdamW(model.parameters(), lr=lr)

      # Train model
      model.train()
      for epoch in range(EPOCH):
        optimizer.zero_grad()
        y_pred = model(X_train_tensor).squeeze()
        loss = criterion(y_pred, y_train_tensor)
        loss.backward()
        optimizer.step()

      # Evaluate
      model.eval()
      with torch.no_grad():
        y_pred_test = model(X_test_tensor).squeeze()
        y_pred_test = (y_pred_test > 0.5).float()  # Convert sigmoid output to binary (0 or 1)

      # Calculate accuracy
      accuracy = accuracy_score(y_test_tensor.cpu(), y_pred_test.cpu())
      print(f"Hidden Layer: {hidden_layer:>4}   Learning Rate: {lr:<7} Accuracy: {accuracy:<5.4f}")

      # Log parameters
      mlflow.log_param("hidden_layer", hidden_layer)
      mlflow.log_param("learning_rate", lr)

      # Log metric
      mlflow.log_metric("accuracy", accuracy)

      # Log model
      mlflow.pytorch.log_model(model, artifact_path="model")



Hidden Layer:   64   Learning Rate: 0.01    Accuracy: 0.7921




Hidden Layer:   64   Learning Rate: 0.005   Accuracy: 0.7940




Hidden Layer:   64   Learning Rate: 0.001   Accuracy: 0.7412




Hidden Layer:  128   Learning Rate: 0.01    Accuracy: 0.7929




Hidden Layer:  128   Learning Rate: 0.005   Accuracy: 0.8142




Hidden Layer:  128   Learning Rate: 0.001   Accuracy: 0.7782




Hidden Layer:  512   Learning Rate: 0.01    Accuracy: 0.7319




Hidden Layer:  512   Learning Rate: 0.005   Accuracy: 0.7715




Hidden Layer:  512   Learning Rate: 0.001   Accuracy: 0.8457




# Save best run to registry

In [75]:
experiment_name = "Default"
MODEL_NAME = "bestModel"
metric_name = "accuracy"
model_subpath = "model"

client = MlflowClient()

In [76]:
# Get experiment by name
experiment = client.get_experiment_by_name(experiment_name)
if experiment is None:
  raise ValueError(f"Experiment '{experiment_name}' not found.")
experiment_id = experiment.experiment_id

# Search runs, sorted by metric descending
runs = client.search_runs(
  experiment_ids=[experiment_id],
  order_by=[f"metrics.{metric_name} DESC"],
  max_results=1
)

if not runs:
  raise ValueError(f"No runs found in experiment '{experiment_name}'.")

# Best run
best_run = runs[0]
run_id = best_run.info.run_id
best_metric = best_run.data.metrics.get(metric_name)
params = best_run.data.params

print(f"Best run ID: {run_id} with {metric_name}: {best_metric}")
print(f"Best run params: {params}")

Best run ID: 3620c7fec6fd41cda740da97cc8f0c8c with accuracy: 0.8456666666666667
Best run params: {'hidden_layer': '512', 'learning_rate': '0.001'}


In [77]:
try:
  production_model_version = client.get_latest_versions(MODEL_NAME, stages=["Production"])[0]
  production_model_version = production_model_version.version
  print(f"Current production model version: {production_model_version}")

  # Get the production model's accuracy (or other metric)
  current_prod_run_id = client.get_model_version(MODEL_NAME, production_model_version).run_id
  current_prod_run = client.get_run(current_prod_run_id)
  current_prod_accuracy = current_prod_run.data.metrics.get(metric_name)

  print(f"Current production model {metric_name}: {current_prod_accuracy}")

  # Compare and promote if the new model is better
  if best_metric > current_prod_accuracy:
    print(f"\nNew model is better, promoting to production.\n")
    # New production model version
    new_model_version = client.create_model_version(
      name=MODEL_NAME,
      source=f"runs:/{run_id}/{model_subpath}",
      run_id=run_id,
    ).version

    # Archive the current production model
    client.transition_model_version_stage(
      name=MODEL_NAME,
      version=production_model_version,
      stage="Archived"
    )
    # Promote new model to production
    client.transition_model_version_stage(
      name=MODEL_NAME,
      version=new_model_version,
      stage="Production"
    )

    print(f"New production model version: {new_model_version}")
    print(f"New production model {metric_name}: {best_metric}")
  else:
    print(f"\nCurrent production model is the best. Keeping the same.\n")
except:
  # If no production model exists, we can directly set the new model as production
  print(f"No production model exists, setting the new best model as production.")
  client.create_registered_model(MODEL_NAME)
  new_model_version = client.create_model_version(
    name=MODEL_NAME,
    source=f"runs:/{run_id}/{model_subpath}",
    run_id=run_id
  ).version

  client.transition_model_version_stage(
    name=MODEL_NAME,
    version=new_model_version,
    stage="Production"
  )

  print(f"New production model version: {new_model_version}")
  print(f"New production model {metric_name}: {best_metric}")

No production model exists, setting the new best model as production.
New production model version: 1
New production model accuracy: 0.8456666666666667


  production_model_version = client.get_latest_versions(MODEL_NAME, stages=["Production"])[0]
  client.transition_model_version_stage(


# Save best model

In [79]:
# Get the latest production model version
production_model_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])

if production_model_versions:
  # Get the most recent production version (latest version with stage 'Production')
  latest_production_version = production_model_versions[0].version
  print(f"Latest production model version: {latest_production_version}")

  # Load the model from the model registry
  model_subpath = "model"
  model_uri = f"models:/{MODEL_NAME}/{latest_production_version}"
  model = mlflow.pytorch.load_model(model_uri)

  save_file(model.state_dict(), "local_prod_backup.safetensors")
  torch.save({
    'X_test': X_test_tensor,
    'y_test': y_test_tensor,
    }, "test_data.pt")

  print(f"Model saved successfully")
else:
  print(f"No production model found for {MODEL_NAME}.")

Latest production model version: 1
Model saved successfully


  production_model_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
