In [1]:
import os
import yaml

import emoji
import tempfile

import mlflow
from mlflow import MlflowClient
from mlflow.entities import ViewType
from mlflow.models import infer_signature
from mlflow.artifacts import download_artifacts

from pyarrow import fs

import sklearn.datasets
from sklearn.metrics import accuracy_score, recall_score
from sklearn.model_selection import train_test_split

import xgboost as xgb

import ray
from ray import train, tune
from ray.air.integrations.mlflow import setup_mlflow

## Glossary

* Ds: Data Scientist

In [5]:
def write_best_run_spec(
    experiment_id: str, run_id: str, artifact_path: str, mode: str = "best_run"
) -> None:
    """
    Creates (or overwrites) a file named '.env.{mode}' with the specified content.
    This is necessary for CI/CD.

    Parameters
    ==========
    experiment_id: str
      Mlflow experiment id to which Ray tune trials have been logged.
    run_id: str
        A run id in the 'experiment_id'.
    mode: str
        mode for the .env file to be created or overwritten (e.g., .env.mode).
    """

    with open(f"best_model_artifacts/.env.{mode}", "w") as f:
        f.write(f"EXPERIMENT_ID={experiment_id}\n")
        f.write(f"RUN_ID={run_id}\n")
        f.write(f"ARTIFACT_PATH={artifact_path}\n")


def get_best_model_artifacts(s3_path: str) -> None:
    """
    Saves the best mlflow model's requirements.txt, conda.yaml, and
    .python-version in the best_model_artifacts directory.
    This action is optional.


    Parameters
    ==========
    s3_path: str
        S3 path where the best mlflow model's artifactories are located.
    """

    download_artifacts(
        artifact_uri=s3_path + "requirements.txt", dst_path="./best_model_artifacts"
    )
    print("Saved requirements.txt in /best_model_artifacts")

    download_artifacts(
        artifact_uri=s3_path + "conda.yaml", dst_path="./best_model_artifacts"
    )
    print("Saved conda.yaml in /best_model_artifacts")

    with tempfile.TemporaryDirectory() as td:
        download_artifacts(artifact_uri=s3_path + "python_env.yaml", dst_path=td)
        python_env_file_path = os.path.join(td, "python_env.yaml")
        with open(python_env_file_path, "r") as f:
            data = yaml.safe_load(f)
            python_version = data.get("python")
            if not python_version:
                print("Error: No 'python' key found in the YAML file.")
            else:
                python_version_file_path = os.path.join(
                    "./best_model_artifacts", ".python-version"
                )
                with open(python_version_file_path, "w") as f:
                    f.write(str(python_version) + "\n")
    print("Saved .python-version in /best_model_artifacts")

In [None]:
# URI for the mlflow tracking server
TRACKING_URI = "http://0.0.0.0:5000"
EXP_NAME = "ml_platform"
# Name for the mlflow registered model
MODEL_NAME = "iris-classifer"
# Alias for the mlflow registered model
ALIAS = "champion"
# Minimum accuracy before registering a new version of a model
MINIMUM_REQUIRED_ACCURACY = 0.95
# Model artifactory path
ARTIFACT_PATH = "iris_xgb"

In [4]:
mlflow.set_tracking_uri(TRACKING_URI)
# Ds sets the experiment to which the mlflow runs' metrics and artifacts should be logged
mlflow.set_experiment(experiment_name=EXP_NAME)

client = MlflowClient()

2025/02/08 23:11:36 INFO mlflow.tracking.fluent: Experiment with name 'ml_platform' does not exist. Creating a new experiment.


In [5]:
# Start a local ray cluster
ray.init(num_cpus=6)

2025-02-08 23:11:40,168	INFO worker.py:1841 -- Started a local Ray instance.


0,1
Python version:,3.9.1
Ray version:,2.41.0


In [6]:
# Ds reads the preprocessed iris data from s3 (make sure the spark job is over before executing this)
filesystem = fs.S3FileSystem(
    region="eu-central-1",  # Same region as for the Spark job
    endpoint_override=os.environ[
        "S3_ENDPOINT_URL"
    ],  # Same endpoint as for the spark job
)

# Concurrent read from the target s3 path (Spark may have written multiple csv files in this path)
ds = ray.data.read_csv(
    "s3://customerintelligence/ml_platform/etl/preprocessed/", filesystem=filesystem
)

# Ds uses this pandas df for hyperparameter tuning and training
df = ds.to_pandas()

2025-02-08 23:11:42,049	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-02-08_23-11-39_465253_212928/logs/ray-data
2025-02-08 23:11:42,049	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]


Running 0: 0.00 row [00:00, ? row/s]

- ReadCSV->SplitBlocks(12) 1: 0.00 row [00:00, ? row/s]

In [None]:
# Ds sets the dependencies for the logged artifacts
conda_env = {
    "name": "mlflow-env",
    "channels": ["conda-forge"],
    "dependencies": [
        "python=3.9.1",
        "pip<=24.0",
        {
            "pip": ["xgboost==2.1.3", "scikit-learn==1.5.2"]
        },  # Minimum dependencies for xgboost model
    ],
}


# Training for the selected hyperparameters happens here (this func will be passed to ray.remote)
def train_function_mlflow(config: dict) -> None:
    setup_mlflow(
        config,
        experiment_name=EXP_NAME,
        tracking_uri=TRACKING_URI,
    )

    train_x, test_x, train_y, test_y = train_test_split(
        df.drop("target", axis=1), df.target, test_size=0.2
    )

    # Pass the trial's config (hyperparameters for the xgb classifier)
    model = xgb.XGBClassifier(**config)
    model.fit(train_x, train_y)
    predictions = model.predict(test_x)

    # Measure accuracy and recall for this trial
    accuracy = accuracy_score(test_y, predictions)
    recall = recall_score(test_y, predictions, average="micro")

    signature = infer_signature(train_x, model.predict(train_x))

    # Log the metrics as well as artifacts for this trial
    mlflow.log_metrics({"recall": recall, "accuracy": accuracy})
    mlflow.xgboost.log_model(
        model,
        artifact_path=ARTIFACT_PATH,
        conda_env=conda_env,
        signature=signature,
        model_format="json",
    )

    # Get the best result later based on the following metrics
    train.report({"accuracy": accuracy, "recall": recall})


def tune_with_setup() -> tune.ResultGrid:
    """Concurrent hyperparameter tunning starts here"""
    # Each trial uses 2 cpus. Therfore, we have at most 3 trials running concurrently (there are 6 cpus available in the local cluster)
    trainable_with_resources = tune.with_resources(train_function_mlflow, {"cpu": 2})
    tuner = tune.Tuner(
        trainable_with_resources,
        tune_config=tune.TuneConfig(
            num_samples=20,  # Total number of trials
        ),
        run_config=train.RunConfig(
            name="mlflow",
        ),
        param_space={
            "objective": "multi:softmax",  # Multi-class classification
            "eval_metric": ["logloss", "error"],
            "max_depth": tune.randint(1, 9),
            "min_child_weight": tune.choice([1, 2, 3]),
            "subsample": tune.uniform(0.5, 1.0),
            "eta": tune.loguniform(1e-4, 1e-1),
        },
    )

    # Start concurrent hyperparameter tunning
    results = tuner.fit()
    # Return the ray train results for later inspection
    return results

In [8]:
results = tune_with_setup()

0,1
Current time:,2025-02-08 23:12:13
Running for:,00:00:27.80
Memory:,8.1/62.5 GiB

Trial name,status,loc,eta,max_depth,min_child_weight,subsample,iter,total time (s),accuracy,recall
train_function_mlflow_af404_00000,TERMINATED,10.23.68.39:213647,0.000271381,6,2,0.911532,1,1.38923,0.966667,0.966667
train_function_mlflow_af404_00001,TERMINATED,10.23.68.39:213646,0.0145442,7,1,0.553902,1,1.37193,0.966667,0.966667
train_function_mlflow_af404_00002,TERMINATED,10.23.68.39:213645,0.00116733,6,1,0.816308,1,1.18987,1.0,1.0
train_function_mlflow_af404_00003,TERMINATED,10.23.68.39:214060,0.00410377,6,1,0.549572,1,0.900568,0.966667,0.966667
train_function_mlflow_af404_00004,TERMINATED,10.23.68.39:214066,0.00643781,4,1,0.59454,1,0.881835,0.933333,0.933333
train_function_mlflow_af404_00005,TERMINATED,10.23.68.39:214072,0.000778958,7,3,0.583762,1,0.819847,1.0,1.0
train_function_mlflow_af404_00006,TERMINATED,10.23.68.39:214473,0.0980219,3,3,0.678861,1,1.4206,0.933333,0.933333
train_function_mlflow_af404_00007,TERMINATED,10.23.68.39:214475,0.0250209,4,2,0.926443,1,1.7856,0.966667,0.966667
train_function_mlflow_af404_00008,TERMINATED,10.23.68.39:214474,0.00286612,1,1,0.641153,1,1.51822,0.9,0.9
train_function_mlflow_af404_00009,TERMINATED,10.23.68.39:214824,0.000663911,6,2,0.705037,1,0.960829,0.966667,0.966667


2025-02-08 23:12:13,628	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/ssafarveisi/ray_results/mlflow' in 0.0053s.
2025-02-08 23:12:13,633	INFO tune.py:1041 -- Total run time: 27.84 seconds (27.79 seconds for the tuning loop).


In [9]:
# Best run after tunning
best_result = results.get_best_result(metric="accuracy", mode="max")

In [10]:
# Hyperparameters for the best run
best_result.config

{'objective': 'multi:softmax',
 'eval_metric': ['logloss', 'error'],
 'max_depth': 6,
 'min_child_weight': 1,
 'subsample': 0.8163075503837736,
 'eta': 0.0011673332791750281}

In [11]:
# Best run metrics (e.g., accuracy and recall)
best_result.metrics_dataframe

Unnamed: 0,accuracy,recall,timestamp,checkpoint_dir_name,done,training_iteration,trial_id,date,time_this_iter_s,time_total_s,...,hostname,node_ip,time_since_restore,iterations_since_restore,config/objective,config/eval_metric,config/max_depth,config/min_child_weight,config/subsample,config/eta
0,1.0,1.0,1739052709,,False,1,af404_00002,2025-02-08_23-11-49,1.189866,1.189866,...,LXKA-J9SYDX3,10.23.68.39,1.189866,1,multi:softmax,"[logloss, error]",6,1,0.816308,0.001167


In [12]:
# Stop the local ray cluster
ray.shutdown()

In [13]:
# Ds gathers all runs in the experiment
experiment_id = mlflow.get_experiment_by_name(name=EXP_NAME).experiment_id
runs = mlflow.search_runs(
    experiment_ids=[experiment_id], run_view_type=ViewType.ACTIVE_ONLY
)

In [None]:
# Ds finds the run id that maximizes the accuracy (performance metric)
best_run = runs.loc[runs["metrics.accuracy"].idxmax()]
best_run_id = best_run.run_id
best_run_accuracy = best_run["metrics.accuracy"]
model_uri = f"runs:/{best_run_id}/{ARTIFACT_PATH}"

In [15]:
# Ds registers the run id model if it meets the minimum required accuracy
if best_run_accuracy >= MINIMUM_REQUIRED_ACCURACY:
    print(emoji.emojize("Model accuracy met the required minimum accuracy :fire:"))
    result = mlflow.register_model(model_uri, MODEL_NAME)
else:
    print(
        emoji.emojize(
            "Best run did not meet the required minimum accuracy :sad_but_relieved_face:"
        )
    )

Successfully registered model 'iris-classifer'.
2025/02/08 23:12:31 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: iris-classifer, version 1


Model accuracy met the required minimum accuracy 🔥


Created version '1' of model 'iris-classifer'.


In [16]:
print(
    f"Updated the version of the registered model '{MODEL_NAME}' to '{result.version}'"
)

Updated the version of the registered model 'iris-classifer' to '1'


In [17]:
# Ds selects an alias for the latest model's version (optional as the model deployment is based on the artifact s3 path)
client.set_registered_model_alias(MODEL_NAME, ALIAS, result.version)

In [27]:
# Ds updates the best model's dependencies (mandatory for the model deployment into Kserve cluster)
mlflow.models.update_model_requirements(
    model_uri=model_uri,
    operation="add",
    requirement_list=[
        "mlserver==1.6.1",
        "mlserver-mlflow==1.6.1",
        "pydantic==2.7.1",
        "conda-pack==0.8.1",
    ],
)

2025/02/08 23:25:42 INFO mlflow.models.model: Retrieving model requirements files from mlflow-artifacts:/1/40f3ca3e3717432b8032255ba01032e6/artifacts/iris_xgb...


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

2025/02/08 23:25:43 INFO mlflow.models.model: Done updating requirements!

Old requirements:
['mlflow==2.20.0', 'xgboost==2.1.3', 'scikit-learn==1.5.2']

Updated requirements:
['mlflow==2.20.0',
 'xgboost==2.1.3',
 'scikit-learn==1.5.2',
 'mlserver==1.6.1',
 'mlserver-mlflow==1.6.1',
 'pydantic==2.7.1',
 'conda-pack==0.8.1']

2025/02/08 23:25:43 INFO mlflow.models.model: Uploading updated requirements files to mlflow-artifacts:/1/40f3ca3e3717432b8032255ba01032e6/artifacts/iris_xgb...


In [28]:
# Ds reloads the data to do a sanity check
_, test_x, _, test_y = train_test_split(
    df.drop("target", axis=1), df.target, test_size=0.2
)

# Ds validates the model before deployment
mlflow.models.predict(
    model_uri=model_uri,
    input_data=test_x,
    env_manager="uv",
    install_mlflow=False,
)

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

2025/02/08 23:25:52 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

2025/02/08 23:25:52 INFO mlflow.utils.virtualenv: Environment /tmp/tmpeincv6hw/envs/virtualenv_envs/mlflow-d717a062c77bc52456092013ae112c7d3d814986 already exists
2025/02/08 23:25:52 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /tmp/tmpeincv6hw/envs/virtualenv_envs/mlflow-d717a062c77bc52456092013ae112c7d3d814986/bin/activate && python -c ""']'
2025/02/08 23:25:52 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /tmp/tmpeincv6hw/envs/virtualenv_envs/mlflow-d717a062c77bc52456092013ae112c7d3d814986/bin/activate && python /home/ssafarveisi/Desktop/Projects/ML_PLATFORM/.venv/lib/python3.9/site-packages/mlflow/pyfunc/_mlflow_pyfunc_backend_predict.py --model-uri file:///tmp/tmp9sl7k8cs/iris_xgb --content-type json --input-path /tmp/tmposx8xu_3/input.json']'


{"predictions": [0, 0, 1, 1, 0, 2, 0, 0, 2, 0, 0, 1, 2, 0, 2, 0, 1, 1, 1, 0, 2, 1, 0, 2, 1, 1, 0, 1, 0, 1]}

Ds downloads the best model's artifacts (e.g., `conda.yaml`). This step is optional and is done as a sanity check (e.g., checking the dependencies for the model).

In [None]:
# Ds writes the spec for the best run into a designated directory
s3_path = f"s3://customerintelligence/ml_platform/mlartifacts/{experiment_id}/{best_run_id}/artifacts/{ARTIFACT_PATH}/"
print(f"Artifactory path: {s3_path}")
get_best_model_artifacts(s3_path=s3_path)

Artifactory path: s3://customerintelligence/ml_platform/mlartifacts/1/40f3ca3e3717432b8032255ba01032e6/artifacts/iris_xgb/


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Saved requirements.txt in /best_model_artifacts


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Saved conda.yaml in /best_model_artifacts


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Saved .python-version in /best_model_artifacts


In [32]:
# Ds checks the artifacts for the best run (recommended)
mlflow.artifacts.list_artifacts(artifact_uri=s3_path)

[<FileInfo: file_size=987, is_dir=False, path='MLmodel'>,
 <FileInfo: file_size=240, is_dir=False, path='conda.yaml'>,
 <FileInfo: file_size=678265889, is_dir=False, path='environment.tar.gz'>,
 <FileInfo: file_size=203217, is_dir=False, path='model.json'>,
 <FileInfo: file_size=111, is_dir=False, path='python_env.yaml'>,
 <FileInfo: file_size=122, is_dir=False, path='requirements.txt'>]

In [None]:
# Ds updates the run sepc for the best run in best_model_artifacts/.env.best_run.
# This file is versioned. 
write_best_run_spec(
    experiment_id=experiment_id,
    run_id=best_run_id,
    artifact_path=ARTIFACT_PATH,
    mode="best_run",
)