In [None]:
import os, subprocess, sys

os.environ["JAVA_HOME"] = os.path.expanduser("~/.sdkman/candidates/java/17.0.17-tem")
os.environ["PATH"] = os.path.join(os.environ["JAVA_HOME"], "bin") + os.pathsep + os.environ["PATH"]
subprocess.run(["java","-version"], check=False)

In [None]:
import pandas as pd
from sklearn.datasets import load_breast_cancer

breast_cancer = load_breast_cancer(as_frame=True)
data = breast_cancer.frame

# Make column names ML friendly
data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)

In [None]:
data.head()

In [None]:
target_names = dict(enumerate(breast_cancer.target_names))
class_labels = data['target'].map(target_names)
class_labels.value_counts()

In [None]:
feature_cols = [col for col in data.columns if col != 'target']
feature_cols[:5]

In [None]:
data[feature_cols].describe().T.head()

In [None]:
import seaborn as sns
sns.countplot(x=class_labels)

The breast cancer dataset has more benign cases than malignant ones.

We will treat `target` as the binary label where 0 = malignant and 1 = benign.


In [None]:
# The target column is already binary, so no additional label engineering is required.
sorted(data.target.unique())

In [None]:
import matplotlib.pyplot as plt

selected_cols = feature_cols[:12]
dims = (3, 4)

fig, axes = plt.subplots(dims[0], dims[1], figsize=(25, 15))
for i, col in enumerate(selected_cols):
  r, c = divmod(i, dims[1])
  sns.boxplot(x=class_labels, y=data[col], ax=axes[r, c])
  axes[r, c].set_title(col.replace('_', ' '))

# Hide any unused subplots
for j in range(len(selected_cols), dims[0] * dims[1]):
  r, c = divmod(j, dims[1])
  axes[r, c].axis('off')

fig.tight_layout()

Several of the mean-based features (for example mean radius and mean perimeter) show clear separation between malignant and benign tumors, indicating they should be useful predictors.


In [None]:
data.isna().any()

In [None]:
from sklearn.model_selection import train_test_split
 
X = data[feature_cols]
y = data.target
 
# Split out the training data
X_train, X_rem, y_train, y_rem = train_test_split(X, y, train_size=0.6, random_state=123)
 
# Split the remaining data equally into validation and test
X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, test_size=0.5, random_state=123)


### Build a baseline model

This task seems well suited to a random forest classifier, since the output is binary and there may be interactions between multiple variables.

The following code builds a simple classifier using scikit-learn. It uses MLflow to keep track of the model accuracy, and to save the model for later use.

In [None]:
import mlflow
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time
 
# The predict method of sklearn's RandomForestClassifier returns a binary classification (0 or 1). 
# The following code creates a wrapper function, SklearnModelWrapper, that uses 
# the predict_proba method to return the probability that the observation belongs to each class. 
 
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
  def __init__(self, model):
    self.model = model
    
  def predict(self, context, model_input):
    return self.model.predict_proba(model_input)[:,1]


# mlflow.start_run creates a new MLflow run to track the performance of this model. 
# Within the context, you call mlflow.log_param to keep track of the parameters used, and
# mlflow.log_metric to record metrics like accuracy.
with mlflow.start_run(run_name='untuned_random_forest'):
    n_estimators = 10
    model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123))
    model.fit(X_train, y_train)

    # predict_proba returns [prob_negative, prob_positive], so slice the output with [:, 1]
    predictions_test = model.predict_proba(X_test)[:,1]
    auc_score = roc_auc_score(y_test, predictions_test)
    mlflow.log_param('n_estimators', n_estimators)
    # Use the area under the ROC curve as a metric.
    mlflow.log_metric('auc', auc_score)
    wrappedModel = SklearnModelWrapper(model)
    # Log the model with a signature that defines the schema of the model's inputs and outputs. 
    # When the model is deployed, this signature will be used to validate inputs.
    signature = infer_signature(X_train, wrappedModel.predict(None, X_train))
    
    # MLflow contains utilities to create a conda environment used to serve models.
    # The necessary dependencies are added to a conda.yaml file which is logged along with the model.
    conda_env =  _mlflow_conda_env(
            additional_conda_deps=None,
            additional_pip_deps=["cloudpickle=={}".format(cloudpickle.__version__), "scikit-learn=={}".format(sklearn.__version__)],
            additional_conda_channels=None,
        )
    mlflow.pyfunc.log_model("random_forest_model",
                            python_model=wrappedModel,
                            conda_env=conda_env,
                            signature=signature)

In [None]:
feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance'])
feature_importances.sort_values('importance', ascending=False)

The exploratory plots above highlight that the malignant class typically has larger values for metrics like mean radius while the benign class tends to have lower values.

During model training, MLflow logs the Area Under the ROC Curve (AUC). Open the Experiment Runs sidebar to inspect the logged metrics for each run.

Register the model in MLflow Model Registry
By registering this model in Model Registry, you can easily reference the model from anywhere within Databricks.

The following section shows how to do this programmatically, but you can also register a model using the UI. See "Create or register a model using the UI" (AWS|Azure|GCP).


In [None]:
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"').iloc[0].run_id

In [None]:
run_id

In [None]:
# If you see the error "PERMISSION_DENIED: User does not have any permission level assigned to the registered model", 
# the cause may be that a model already exists with the name "breast_cancer_classifier". Try using a different name.
model_name = "breast_cancer_classifier"
model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)
 
# Registering the model takes a few seconds, so add a small delay
time.sleep(15)


In [None]:
"""
The MlflowClient class allows you to interact with the MLflow Tracking Server programmatically. 
You can use it to perform various tasks, such as creating and managing experiments, starting 
and managing runs, logging metrics and parameters, and querying information about experiments and runs.

"""
from mlflow.tracking import MlflowClient
client = MlflowClient()

client.transition_model_version_stage(
  name=model_name,
  version=model_version.version,
  stage="Production",
)

The Models page now shows the model version in stage "Production".

You can now refer to the model using the path "models:/breast_cancer_classifier/production".


In [None]:
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
 
# Sanity-check: This should match the AUC logged by MLflow
print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
AUC: 0.8540300975814177

In [None]:
# !mlflow ui --port=5001

## Experiment with a new model

The random forest model performed well even without hyperparameter tuning.

The following code uses the xgboost library to train a more accurate model. It runs a parallel hyperparameter sweep to train multiple models in parallel, using Hyperopt and SparkTrials. As before, the code tracks the performance of each parameter configuration with MLflow.

In [None]:
# hyperopt library imports for defining and executing hyperparameter optimization
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
from hyperopt.pyll import scope  
from math import exp  
import mlflow.xgboost
import numpy as np
import xgboost as xgb

# search_space dictionary defines the range and distribution of hyperparameters for the model
search_space = {
  'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),  # Integer range for tree depth
  'learning_rate': hp.loguniform('learning_rate', -3, 0),  # Log-uniform distribution for learning rate
  'reg_alpha': hp.loguniform('reg_alpha', -5, -1),  # Log-uniform for L1 regularization term
  'reg_lambda': hp.loguniform('reg_lambda', -6, -1),  # Log-uniform for L2 regularization term
  'min_child_weight': hp.loguniform('min_child_weight', -1, 3),  # Log-uniform for minimum sum of instance weight(hessian) needed in a child
  'objective': 'binary:logistic',  # Objective function for binary classification
  'seed': 123,  # Set a seed for deterministic training
}



Nested Runs: When you use mlflow.start_run(nested=True) within the main run, you create nested runs. These runs are associated with the main run and inherit some of its parameters and context. Nested runs are typically used to explore variations or sub-experiments within the main experiment.

python
Copy code
import mlflow

with mlflow.start_run():
    # Your main experiment code goes here

    with mlflow.start_run(nested=True):
        # Nested experiment code goes here
Parameters and metrics logged in the nested run are associated with that specific run and can be accessed separately from the main run.
You can create multiple nested runs within a main run to represent different variations or configurations of your experiment.
python
Copy code
import mlflow

with mlflow.start_run():
    # Your main experiment code goes here

    with mlflow.start_run(nested=True):
        # Nested experiment code 1 goes here

    with mlflow.start_run(nested=True):
        # Nested experiment code 2 goes here

In [None]:
def train_model(params):
      # With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow.
    mlflow.xgboost.autolog()
    with mlflow.start_run(nested=True):
        train = xgb.DMatrix(data=X_train, label=y_train)
        validation = xgb.DMatrix(data=X_val, label=y_val)
        # Pass in the validation set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric
        # is no longer improving.
        booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
                            evals=[(validation, "validation")], early_stopping_rounds=50)
        validation_predictions = booster.predict(validation)
        auc_score = roc_auc_score(y_val, validation_predictions)
        mlflow.log_metric('auc', auc_score)

        signature = infer_signature(X_train, booster.predict(train))
        mlflow.xgboost.log_model(booster, "model", signature=signature)

        # Set the loss to -1*auc_score so fmin maximizes the auc_score
        return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}


In [None]:
# Importing pyspark, the Python API for Spark which lets you write Spark applications using Python
from pyspark import SparkContext, SparkConf  

# Setting up the configuration for Spark:
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1")  # Configures the Spark driver host address to localhost

# Creating a SparkContext using the above configuration, essential for connecting to a Spark cluster:
sc = SparkContext(conf=conf_spark)  # Initializes the main entry point for Spark functionality

TPE stands for Tree-structured Parzen Estimator. It's a Bayesian optimization technique that models the probability distribution of the hyperparameters given the observed metrics. It is particularly effective for high-dimensional spaces and has become a popular choice in machine learning for hyperparameter tuning.


In [None]:
# Greater parallelism will lead to speedups, but a less optimal hyperparameter sweep. 
# A reasonable value for parallelism is the square root of max_evals.
spark_trials = SparkTrials(parallelism=10)

# Run fmin within an MLflow run context so that each hyperparameter configuration is logged as a child run of a parent
# run called "xgboost_models" .
with mlflow.start_run(run_name='xgboost_models'):
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest,
    max_evals=96,
    trials=spark_trials,
  )

In [None]:
best_run = mlflow.search_runs(order_by=['metrics.auc DESC']).iloc[0]
print(f'AUC of Best Run: {best_run["metrics.auc"]}')

## Update the production breast_cancer_classifier model in MLflow Model Registry

Earlier, you saved the baseline model to Model Registry with the name `breast_cancer_classifier`. Now that you have created a more accurate model, update `breast_cancer_classifier`.



In [None]:
new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)

# Registering the model takes a few seconds, so add a small delay
time.sleep(10)

Click Models in the left sidebar to see that the `breast_cancer_classifier` model now has two versions.

The following code promotes the new version to production.


In [None]:
client.transition_model_version_stage(
    name=model_name,
    version=model_version.version,
    stage='Archived'
)

client.transition_model_version_stage(
    name=model_name,
    version=new_model_version.version,
    stage='Production'
)

Clients that call load_model now receive the new model.



In [None]:
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
print(f"AUC: {roc_auc_score(y_test, model.predict(X_test))}")

## Batch inference

There are many scenarios where you might want to evaluate a model on a corpus of new data. For example, you may have a fresh batch of data, or may need to compare the performance of two models on the same corpus of data.

The following code evaluates the model on data stored in a Delta table, using Spark to run the computation in parallel.

In [None]:
mlflow.pyfunc.spark_udf

<function mlflow.pyfunc.spark_udf(spark, model_uri, result_type=None, env_manager=None, params: dict[str, typing.Any] | None = None, extra_env: dict[str, str] | None = None, prebuilt_env_uri: str | None = None, model_config: str | pathlib.Path | dict[str, typing.Any] | None = None)>

In [None]:
import mlflow.pyfunc

from pyspark.sql import SparkSession

# Create or retrieve a Spark session
spark = SparkSession.builder \
    .appName("MLflow Integration") \
    .config("spark.some.config.option", "config-value") \
    .getOrCreate()

apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/production")

Downloading artifacts: 100%|██████████| 6/6 [00:00<00:00, 2401.09it/s] 
Downloading artifacts: 100%|██████████| 6/6 [00:00<00:00, 3121.92it/s] 
2025/11/03 21:31:46 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


In [59]:
# new_data = spark.read.format("csv").load(table_path) # table_path is path to the delta table

In [60]:
new_model_version

<ModelVersion: aliases=[], creation_timestamp=1762222824419, current_stage='None', deployment_job_state=None, description=None, last_updated_timestamp=1762222824419, metrics=[<Metric: dataset_digest=None, dataset_name=None, key='validation-logloss', model_id='m-d4913e98fefa467ea505dd56a5e0cfe8', run_id='3614da3b817d4cddbbd0e5c8a15c9f3b', step=0, timestamp=1762222783851, value=0.3957874968386533>,
 <Metric: dataset_digest='bf39842e', dataset_name='dataset', key='validation-logloss', model_id='m-d4913e98fefa467ea505dd56a5e0cfe8', run_id='3614da3b817d4cddbbd0e5c8a15c9f3b', step=0, timestamp=1762222788677, value=0.05244956117096815>,
 <Metric: dataset_digest='bf39842e', dataset_name='dataset', key='stopped_iteration', model_id='m-d4913e98fefa467ea505dd56a5e0cfe8', run_id='3614da3b817d4cddbbd0e5c8a15c9f3b', step=0, timestamp=1762222788677, value=78.0>,
 <Metric: dataset_digest=None, dataset_name=None, key='auc', model_id='m-d4913e98fefa467ea505dd56a5e0cfe8', run_id='3614da3b817d4cddbbd0e5c8

In [61]:
new_model_version.run_id

'3614da3b817d4cddbbd0e5c8a15c9f3b'

In [None]:
# Serve the model using the MLflow Model Serving

# Run the following in terminal outiside jupyter and after activating the virtual environment
# mlflow models serve --env-manager=local -m models:/breast_cancer_classifier/production -h 0.0.0.0 -p 5001

- Here **model_name** is `breast_cancer_classifier`


In [None]:
import requests
import json

url = 'http://localhost:5002/invocations'

datads_dict = {"dataframe_split": X_test.to_dict(orient='split')}

response = requests.post(url, json=datads_dict)
predictions = response.json()

print(predictions)

{'predictions': [0.977245032787323, 0.0018812912749126554, 0.0004756989947054535, 0.08485926687717438, 0.9392794370651245, 0.9962750673294067, 0.002048053778707981, 0.03223269805312157, 0.9994556307792664, 0.0013605729909613729, 0.005775751080363989, 0.9937363862991333, 0.9992714524269104, 0.0009298113873228431, 0.9968335032463074, 0.9934667944908142, 0.9963095784187317, 0.9948203563690186, 0.9991143345832825, 0.01945744827389717, 0.0036211004480719566, 0.9987717270851135, 0.004347221460193396, 0.002124169608578086, 0.9989701509475708, 0.2190239578485489, 0.9990696310997009, 0.9985594153404236, 0.001434369245544076, 0.9997492432594299, 0.999270498752594, 0.995508074760437, 0.996904194355011, 0.9988119602203369, 0.9850181937217712, 0.0034078070893883705, 0.9961591958999634, 0.33427298069000244, 0.016329968348145485, 0.0011782258516177535, 0.01714150235056877, 0.0007764332112856209, 0.9968259334564209, 0.00026044456171803176, 0.9909340143203735, 0.9950013756752014, 0.9859433174133301, 0.

In [None]:
!mlflow ui