In [1]:
import mlflow 
from mlflow.tracking import MlflowClient

In [2]:
mlflow.set_tracking_uri("http://localhost:5080")
client = MlflowClient()


In [3]:
def load_model_by_version(model_name: str, version: str):
    model_uri = f"models:/{model_name}/{version}"
    return mlflow.pyfunc.load_model(model_uri)

In [4]:
def load_challenger_model(model_name):
    versions = client.search_model_versions(f"name='{model_name}'")

    challengers = [
        v for v in versions
        if v.current_stage == "None"
        and v.tags.get("candidate") == "challenger"
    ]
    if not challengers:
        raise RuntimeError("No challenger model found")
    
    challenger = max(challengers, key=lambda v: int(v.version))

    model = load_model_by_version(model_name, challenger.version)

    return {
        "model": model,
        "model_name": model_name,
        "model_version": challenger.version,
        "run_id": challenger.run_id
    }


In [5]:
def load_production_model(model_name):
    prod_versions = client.get_latest_versions(name=model_name, stages=["Production"])

    if not prod_versions:
        print(f"[INFO] No Production model found for '{model_name}'.")
        return None
    
    production = prod_versions[0]

    model = load_model_by_version(model_name, production.version)
    return {
        "model": model,
        "model_name": model_name,
        "model_version": production.version,
        "run_id": production.run_id
    }


In [6]:
from sklearn.metrics import precision_score

def evaluate(model, X_test, y_test):
    preds = model.predict(X_test)
    return precision_score(y_test, preds)


In [7]:
from pprint import pprint
challenger_model_data = load_challenger_model(model_name="eur_usd_direction_model")
pprint(challenger_model_data)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

 - cloudpickle (current: 3.1.2, required: cloudpickle==3.1.1)
 - numpy (current: 2.3.5, required: numpy==2.4.1)
 - pyarrow (current: 22.0.0, required: pyarrow==18.1.0)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


{'model': mlflow.pyfunc.loaded_model:
  artifact_path: mlflow-artifacts:/0/models/m-bb67f34ce1a74004b765e7167f26905e/artifacts
  flavor: mlflow.sklearn
  run_id: 16b4d325e0924c03a145dd1f413f106a
,
 'model_name': 'eur_usd_direction_model',
 'model_version': '1',
 'run_id': '16b4d325e0924c03a145dd1f413f106a'}


In [9]:
production_model_data = load_production_model(model_name="eur_usd_direction_model")
pprint(production_model_data)

[INFO] No Production model found for 'eur_usd_direction_model'.
None


  prod_versions = client.get_latest_versions(name=model_name, stages=["Production"])


In [10]:
def promote(model_name, challenger, production):
    if production:
        client.transition_model_version_stage(
            name=production["model_name"],
            version=production["model_version"],
            stage="Archived"
        )

    client.transition_model_version_stage(
        name=challenger["model_name"],
        version=challenger["model_version"],
        stage="Production"
    )

    client.set_model_version_tag(
        name=challenger["model_name"],
        version=challenger["model_version"],
        key="candidate",
        value="champion"
    )
    print(f"Promoted version {challenger["model_version"]} to Production")

In [11]:
def archive_challenger(model_name, challenger):
    client.transition_model_version_stage(
        name=challenger["model_name"],
        version=challenger["model_version"],
        stage="Archived"
    )
    print(f"Archived challenger version {challenger["model_version"]}")

In [13]:
from mlflow.data import from_pandas

def promote_if_better(model_name, X_test, y_test):
    challenger = load_challenger_model(model_name)
    production = load_production_model(model_name)

    with mlflow.start_run(run_name="model_promotion_evaluation"):
        challenger_score = evaluate(challenger["model"], X_test, y_test)
        mlflow.log_metric("challenger_test_precision_score", challenger_score)

        test_df = X_test.copy()
        test_df["target"] = y_test 

        test_dataset = from_pandas(
            test_df,
            source="eur_usd_test"
        )
        mlflow.log_input(
            test_dataset,
            context="testing"
        )

        if production:
            production_score = evaluate(production["model"], X_test, y_test)
        else:
            production_score = None 
        mlflow.log_metric("production_test_precision_score", production_score)

        mlflow.log_param("challenger_version", challenger["model_version"])
        if production:
            mlflow.log_param("production_version", production["model_version"])

        print(f"Challenger score: {challenger_score}")
        print(f"Production score: {production_score}")

        decision = (
            "promote_challenger" 
            if production_score is None or challenger_score > production_score
            else "retain_production"
        )
        mlflow.log_param("promotion_decision", decision)

    if decision == "promote_challenger":
        promote(challenger, production)
        return f"challenger promoted '{challenger["model_name"]}'"
    else:
        archive_challenger(challenger)
        return "production model retained"
