<a href="https://colab.research.google.com/github/deshm084/Distributed-ML-Pipeline-with-Ray-Tune-MLflow/blob/main/Distributed_ML_Pipeline_with_Ray_Tune_MLflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
!pip install ray[tune] mlflow



In [25]:
import os
import time
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from ray import tune
from ray.tune.search.optuna import OptunaSearch  # Smart search algorithm
import numpy as np # Import numpy for sqrt

# --- 1. The Training Function (The "Minion" Logic) ---
# This function runs on a separate process (Worker)
def train_model(config):
    # Ensure MLflow experiment is set for each worker process
    mlflow.set_experiment("Diabetes_Distributed_tuning")

    # 1. Load Data (Each worker loads its own copy or reads from shared storage)
    data = load_diabetes()
    X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.2)

    # 2. Build Model using hyperparams passed in 'config'
    model = RandomForestRegressor(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"]
    )

    # 3. Train
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    # Fix: Calculate RMSE by taking the square root of MSE
    mse = mean_squared_error(y_test, predictions)
    rmse = np.sqrt(mse)

    # 4. Log to MLflow (The "Registry")
    # We create a new run for every trial
    with mlflow.start_run(nested=True):
        mlflow.log_params(config)
        mlflow.log_metric("rmse", rmse)
        # We tag this model so we can find it later
        mlflow.set_tag("mode", "distributed_trial")

    # 5. Report back to Ray (The "Boss")
    tune.report({"rmse": rmse})

# --- 2. The Orchestrator (The "Boss") ---
def run_distributed_pipeline():
    print("üöÄ Initializing Ray Cluster...")
    # Define the search space (The Grid)
    search_space = {
        "n_estimators": tune.randint(50, 500),
        "max_depth": tune.randint(2, 20),
        "min_samples_split": tune.randint(2, 10)
    }

    # The mlflow.set_experiment call has been moved to train_model function
    # mlflow.set_experiment("Diabetes_Distributed_tuning")

    print("‚ö° Starting Distributed Tuning (Running parallel trials)...")

    # This executes the training function in parallel across available cores
    analysis = tune.run(
        train_model,
        config=search_space,
        metric="rmse",
        mode="min",            # We want to minimize Error
        num_samples=10,        # Run 10 different experiments
        resources_per_trial={"cpu": 1}, # 1 CPU per worker
        verbose=1
    )

    print("\n‚úÖ Tuning Complete.")
    best_trial = analysis.get_best_trial("rmse", "min", "last")
    print(f"üèÜ Best Hyperparameters found: {best_trial.config}")
    print(f"üìâ Lowest RMSE: {best_trial.last_result['rmse']:.4f}")

    return best_trial.config
# --- 3. Production Deployment Simulation (FIXED) ---
def register_and_serve(best_config):
    print("\nüì¶ Promoting Best Model to Production Registry...")

    # Retrain final model on all data
    data = load_diabetes()
    model = RandomForestRegressor(
        n_estimators=best_config["n_estimators"],
        max_depth=best_config["max_depth"],
        min_samples_split=best_config["min_samples_split"]
    )
    model.fit(data.data, data.target)

    # Log the final "Production" model
    # We capture the 'run' object to get the ID safely
    with mlflow.start_run(run_name="PRODUCTION_MODEL") as run:
        mlflow.log_params(best_config)
        mlflow.sklearn.log_model(model, "random_forest_model")

        # --- THE FIX ---
        # Instead of guessing the file path, we construct the official URI
        # Format: runs:/<run_id>/<artifact_path>
        run_id = run.info.run_id
        model_uri = f"runs:/{run_id}/random_forest_model"

        print(f"üîí Model Versioned at: {model_uri}")

        # SIMULATE SERVING
        print("\nü§ñ Mock Inference Server Online...")

        # This instructs MLflow to look up the run internally
        loaded_model = mlflow.sklearn.load_model(model_uri)

        sample_data = data.data[0:1]
        prediction = loaded_model.predict(sample_data)
        print(f"   Input: Patient Data [0.03, 0.05, ...]")
        print(f"   Prediction: Disease Progression = {prediction[0]:.2f}")

if __name__ == "__main__":
    # If best_config is already in memory from the previous cell, you can just run:
    # register_and_serve(best_config)

    # Otherwise, re-run the full pipeline:
    best_config = run_distributed_pipeline()
    register_and_serve(best_config)

üöÄ Initializing Ray Cluster...
‚ö° Starting Distributed Tuning (Running parallel trials)...
+--------------------------------------------------------------------+
| Configuration for experiment     train_model_2026-01-11_21-47-02   |
+--------------------------------------------------------------------+
| Search algorithm                 BasicVariantGenerator             |
| Scheduler                        FIFOScheduler                     |
| Number of trials                 10                                |
+--------------------------------------------------------------------+

View detailed results here: /root/ray_results/train_model_2026-01-11_21-47-02
To visualize your results with TensorBoard, run: `tensorboard --logdir /tmp/ray/session_2026-01-11_20-08-27_551419_858/artifacts/2026-01-11_21-47-02/train_model_2026-01-11_21-47-02/driver_artifacts`

Trial status: 10 PENDING
Current time: 2026-01-11 21:47:03. Total running time: 0s
Logical resource usage: 0/2 CPUs, 0/0 GPUs
+---

[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO mlflow.store.db.utils: Updating database tables
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Context impl SQLiteImpl.
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Will assume non-transactional DDL.



Trial train_model_108ae_00001 started with configuration:
+----------------------------------------------+
| Trial train_model_108ae_00001 config         |
+----------------------------------------------+
| max_depth                                 19 |
| min_samples_split                          7 |
| n_estimators                             139 |
+----------------------------------------------+


[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade  -> 451aebb31d03, add metric step
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger limit
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembic.runtime.migration: Running upgrade 7ac759974ad8 -> 89d4b8295536, create latest metrics table
[36m(train_model pid=62148)[0m 2026/01/11 21:47:30 INFO alembi


Trial status: 2 RUNNING | 8 PENDING
Current time: 2026-01-11 21:47:33. Total running time: 30s
Logical resource usage: 2.0/2 CPUs, 0/0 GPUs
+-----------------------------------------------------------------------------------------+
| Trial name                status       n_estimators     max_depth     min_samples_split |
+-----------------------------------------------------------------------------------------+
| train_model_108ae_00000   RUNNING               161            19                     8 |
| train_model_108ae_00001   RUNNING               139            19                     7 |
| train_model_108ae_00002   PENDING               120             2                     9 |
| train_model_108ae_00003   PENDING               438            18                     6 |
| train_model_108ae_00004   PENDING               113            13                     8 |
| train_model_108ae_00005   PENDING               159            11                     8 |
| train_model_108ae_00006   PEN

[36m(train_model pid=62148)[0m 2026/01/11 21:47:33 INFO mlflow.tracking.fluent: Experiment with name 'Diabetes_Distributed_tuning' does not exist. Creating a new experiment.



Trial train_model_108ae_00000 completed after 1 iterations at 2026-01-11 21:47:34. Total running time: 31s
+--------------------------------------------------+
| Trial train_model_108ae_00000 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                          6.1198 |
| time_total_s                              6.1198 |
| training_iteration                             1 |
| rmse                                     59.0551 |
+--------------------------------------------------+

Trial train_model_108ae_00001 completed after 1 iterations at 2026-01-11 21:47:36. Total running time: 33s
+--------------------------------------------------+
| Trial train_model_108ae_00001 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         6.40005 |
| time_total_s                            

[36m(train_model pid=62429)[0m 2026/01/11 21:47:58 INFO mlflow.store.db.utils: Creating initial MLflow database tables...[32m [repeated 2x across cluster][0m
[36m(train_model pid=62192)[0m 2026/01/11 21:47:31 INFO mlflow.store.db.utils: Updating database tables
[36m(train_model pid=62192)[0m 2026/01/11 21:47:34 INFO alembic.runtime.migration: Context impl SQLiteImpl.[32m [repeated 3x across cluster][0m
[36m(train_model pid=62192)[0m 2026/01/11 21:47:34 INFO alembic.runtime.migration: Will assume non-transactional DDL.[32m [repeated 3x across cluster][0m
[36m(train_model pid=62192)[0m 2026/01/11 21:47:31 INFO alembic.runtime.migration: Running upgrade  -> 451aebb31d03, add metric step
[36m(train_model pid=62192)[0m 2026/01/11 21:47:31 INFO alembic.runtime.migration: Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
[36m(train_model pid=62192)[0m 2026/01/11 21:47:31 INFO alembic.runtime.migration: Running upgrade 90e64c465722 -> 181f10493468, a


Trial status: 2 TERMINATED | 2 RUNNING | 6 PENDING
Current time: 2026-01-11 21:48:03. Total running time: 1min 0s
Logical resource usage: 2.0/2 CPUs, 0/0 GPUs
Current best trial: 108ae_00002 with rmse=56.214296467306326 and params={'n_estimators': 120, 'max_depth': 2, 'min_samples_split': 9}
+---------------------------------------------------------------------------------------------------------------------------------+
| Trial name                status         n_estimators     max_depth     min_samples_split     iter     total time (s)      rmse |
+---------------------------------------------------------------------------------------------------------------------------------+
| train_model_108ae_00002   RUNNING                 120             2                     9        1            6.86208   56.2143 |
| train_model_108ae_00003   RUNNING                 438            18                     6                                       |
| train_model_108ae_00000   TERMINATED        

[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
[36m(train_model pid=62468)[0m 2026/01/11 21:48:03 INFO alembic.runtime.migration: Context impl SQLiteImpl.[32m [repeated 4x across cluster][0m
[36m(train_model pid=62468)[0m 2026/01/11 21:48:03 INFO alembic.runtime.migration: Will assume non-transactional DDL.[32m [repeated 4x across cluster][0m



Trial train_model_108ae_00003 completed after 1 iterations at 2026-01-11 21:48:08. Total running time: 1min 5s
+--------------------------------------------------+
| Trial train_model_108ae_00003 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         9.74288 |
| time_total_s                             9.74288 |
| training_iteration                             1 |
| rmse                                     48.2815 |
+--------------------------------------------------+

Trial train_model_108ae_00004 started with configuration:
+----------------------------------------------+
| Trial train_model_108ae_00004 config         |
+----------------------------------------------+
| max_depth                                 13 |
| min_samples_split                          8 |
| n_estimators                             113 |
+----------------------------------------------+

Trial tr

[36m(train_model pid=62687)[0m 2026/01/11 21:48:25 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO mlflow.store.db.utils: Updating database tables
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO alembic.runtime.migration: Running upgrade  -> 451aebb31d03, add metric step
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO alembic.runtime.migration: Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO alembic.runtime.migration: Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO alembic.runtime.migration: Running upgrade 181f10493468 -> df50e92ffc5e, Add Experiment Tags Table
[36m(train_model pid=62468)[0m 2026/01/11 21:48:00 INFO alembic.runtime.migration: Running upgrade df50e92ffc5e -> 7ac759974ad8, Update run tags with larger


Trial train_model_108ae_00004 completed after 1 iterations at 2026-01-11 21:48:32. Total running time: 1min 29s
+--------------------------------------------------+
| Trial train_model_108ae_00004 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         8.36073 |
| time_total_s                             8.36073 |
| training_iteration                             1 |
| rmse                                     60.0188 |
+--------------------------------------------------+

Trial train_model_108ae_00005 completed after 1 iterations at 2026-01-11 21:48:32. Total running time: 1min 29s
+--------------------------------------------------+
| Trial train_model_108ae_00005 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         8.02774 |
| time_total_s                  

[36m(train_model pid=62956)[0m 2026/01/11 21:48:55 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INFO alembic.runtime.migration: Running upgrade 2b4d017a5e9b -> cfd24bdc0731, Update run status constraint with killed
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INFO alembic.runtime.migration: Running upgrade cfd24bdc0731 -> 0a8213491aaa, drop_duplicate_killed_constraint
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INFO alembic.runtime.migration: Running upgrade 0a8213491aaa -> 728d730b5ebd, add registered model tags table
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INFO alembic.runtime.migration: Running upgrade 728d730b5ebd -> 27a6a02d2cf1, add model version tags table
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INFO alembic.runtime.migration: Running upgrade 27a6a02d2cf1 -> 84291f40a231, add run_link to model_version
[36m(train_model pid=62696)[0m 2026/01/11 21:48:27 INF


Trial train_model_108ae_00007 completed after 1 iterations at 2026-01-11 21:49:03. Total running time: 2min 0s
+--------------------------------------------------+
| Trial train_model_108ae_00007 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         9.39268 |
| time_total_s                             9.39268 |
| training_iteration                             1 |
| rmse                                     57.7876 |
+--------------------------------------------------+

Trial status: 7 TERMINATED | 1 RUNNING | 2 PENDING
Current time: 2026-01-11 21:49:03. Total running time: 2min 0s
Logical resource usage: 1.0/2 CPUs, 0/0 GPUs
Current best trial: 108ae_00003 with rmse=48.2814553526582 and params={'n_estimators': 438, 'max_depth': 18, 'min_samples_split': 6}
+---------------------------------------------------------------------------------------------------------------------

[36m(train_model pid=63234)[0m 2026/01/11 21:49:27 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
[36m(train_model pid=62959)[0m 2026/01/11 21:48:56 INFO mlflow.store.db.utils: Updating database tables
[36m(train_model pid=62959)[0m 2026/01/11 21:49:00 INFO alembic.runtime.migration: Context impl SQLiteImpl.[32m [repeated 3x across cluster][0m
[36m(train_model pid=62959)[0m 2026/01/11 21:49:00 INFO alembic.runtime.migration: Will assume non-transactional DDL.[32m [repeated 3x across cluster][0m
[36m(train_model pid=62959)[0m 2026/01/11 21:48:56 INFO alembic.runtime.migration: Running upgrade  -> 451aebb31d03, add metric step
[36m(train_model pid=62959)[0m 2026/01/11 21:48:56 INFO alembic.runtime.migration: Running upgrade 451aebb31d03 -> 90e64c465722, migrate user column to tags
[36m(train_model pid=62959)[0m 2026/01/11 21:48:56 INFO alembic.runtime.migration: Running upgrade 90e64c465722 -> 181f10493468, allow nulls for metric values
[36m(tra


Trial train_model_108ae_00008 completed after 1 iterations at 2026-01-11 21:49:32. Total running time: 2min 29s
+--------------------------------------------------+
| Trial train_model_108ae_00008 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         7.17649 |
| time_total_s                             7.17649 |
| training_iteration                             1 |
| rmse                                     63.0379 |
+--------------------------------------------------+


2026-01-11 21:49:32,591	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/root/ray_results/train_model_2026-01-11_21-47-02' in 0.0128s.



Trial train_model_108ae_00009 completed after 1 iterations at 2026-01-11 21:49:32. Total running time: 2min 29s
+--------------------------------------------------+
| Trial train_model_108ae_00009 result             |
+--------------------------------------------------+
| checkpoint_dir_name                              |
| time_this_iter_s                         7.32038 |
| time_total_s                             7.32038 |
| training_iteration                             1 |
| rmse                                     59.3184 |
+--------------------------------------------------+

Trial status: 10 TERMINATED
Current time: 2026-01-11 21:49:32. Total running time: 2min 29s
Logical resource usage: 1.0/2 CPUs, 0/0 GPUs
Current best trial: 108ae_00003 with rmse=48.2814553526582 and params={'n_estimators': 438, 'max_depth': 18, 'min_samples_split': 6}
+---------------------------------------------------------------------------------------------------------------------------------+
| Trial



üîí Model Versioned at: runs:/a8d3661f24074317ba88419f561873b8/random_forest_model

ü§ñ Mock Inference Server Online...


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

   Input: Patient Data [0.03, 0.05, ...]
   Prediction: Disease Progression = 184.37
