In [1]:
import dataiku
import pandas as pd
import mlflow
import warnings
import os
import time

from datetime import datetime
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_validate, StratifiedKFold
from dataikuapi.dss.ml import DSSPredictionMLTaskSettings
warnings.filterwarnings('ignore')

In [2]:
# Replace these constants by your own values
EXPERIMENT_TRACKING_FOLDER_NAME = "tracking"
EXPERIMENT_TRACKING_FOLDER_CONNECTION = "dataiku-managed-storage"
EXPERIMENT_NAME = "custom-modeling_v000"

MLFLOW_CODE_ENV_NAME = "mlflow"
SAVED_MODEL_NAME = "custom-model"
DATASET_TRAINING = "train"

In [3]:
# Some utils
def now_str() -> str:
    return datetime.now().strftime("%Y%m%d%H%M%S")

# Experiment tracking (scikit-learn)

This notebook contains a simple example to showcase the new Experiment Tracking capabilities of Dataiku. It explains how to perform several runs with different parameters, select the best run and promote it as a Saved Model version in a Dataiku Flow. It leverages:
* the scikit-learn package

## Loading the training data

Our training data lives in the `labeled` Dataset, let's load it in a pandas DataFrame and see what it looks like:

In [4]:
#adding to reset in case code block runs again after running later code
os.environ.pop('MLFLOW_TRACKING_SERVER_CERT_PATH', None)
os.environ.pop('MLFLOW_TRACKING_SERVER_CERT_PATH', None)
os.environ.pop('MLFLOW_TRACKING_INSECURE_TLS', None)


client = dataiku.api_client()
project_key = dataiku.default_project_key()
project = client.get_project(project_key)

training_dataset = dataiku.Dataset(DATASET_TRAINING)
df = training_dataset.get_dataframe()
df.head()

Unnamed: 0,customer_id,age,price_first_item_purchased,gender,ip,ip_geopoint,ip_country_code,pages_visited,campaign,high_value
0,0008dd99a0,72,10.0,F,193.148.113.242,POINT(-3.684 40.4172),ES,5,1,0.0
1,00105d1128,34,28.0,M,150.197.87.28,POINT(126.9741 37.5112),KR,6,0,0.0
2,001097c31c,38,22.0,F,110.203.10.55,POINT(113.722 34.7732),CN,7,0,0.0
3,001261e788,34,57.0,M,81.116.60.67,POINT(12.1097 43.1479),IT,5,0,1.0
4,0013c8d438,44,15.5,M,101.225.27.121,POINT(113.722 34.7732),CN,14,1,0.0


We are working on a *binary classification* problem here, which is to predict whether or not a given customer is high value. This outcome is reflected by the `high_value` column which can either take the "0.0" or "1.0" values.

In [5]:
target_name = "high_value"
target = df[target_name]
data = df.drop(columns=[target_name])

In [6]:
# Get-or-create Managed Folder (WIP)
project_folders = project.list_managed_folders()
folder = None
if len(project_folders) > 0:
    for mf in project_folders:
        if mf["name"] == EXPERIMENT_TRACKING_FOLDER_NAME:
            folder_id = mf["id"]
            print(f"Found experiment tracking folder {EXPERIMENT_TRACKING_FOLDER_NAME} with id {mf['id']}")
            folder = project.get_managed_folder(odb_id=folder_id)
            break
        else:
            continue
    # -- If you reach this point, you didn't find the experiment tracking folder among the existing ones.
    if not folder:
        print("Experiment tracking folder not found. Creating it...")
        folder = project.create_managed_folder(EXPERIMENT_TRACKING_FOLDER_NAME,
                                   connection_name=EXPERIMENT_TRACKING_FOLDER_CONNECTION)
else:
    print("No folder found in project. Creating one for experiment tracking...")
    # Write the creation of the mf code here.
    folder = project.create_managed_folder(EXPERIMENT_TRACKING_FOLDER_NAME,
                                       connection_name=EXPERIMENT_TRACKING_FOLDER_CONNECTION)

Found experiment tracking folder tracking with id 9CmdjkWw


## Preparing the experiment

To prepare the grounds for our experiments, we need to create a few handles and define which MLFlow experiment we'll collect our runs into:

In [7]:
import logging
import requests
logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR)



def get_or_create_experiment():
    try:
        mlflow_extension = project.get_mlflow_extension()
        mlflow_handle = project.setup_mlflow(managed_folder=folder)
        experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
        print(experiment)
    except Exception as e:
        print("An exception occurred:", str(e))
        experiment = None
    
    if experiment is None:
        mlflow_extension = project.get_mlflow_extension()
        mlflow_handle = project.setup_mlflow(managed_folder=folder)
        print("Starting set experiment")
        experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
        print("Sleeping for 30s to ensure experiment is fully created")
        time.sleep(30)
        print("Sleep done")
    else:
        mlflow_extension = project.get_mlflow_extension()
        mlflow_handle = project.setup_mlflow(managed_folder=folder)
        experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
      
    return experiment

experiment = get_or_create_experiment()

# Set remote configuration
space_url = dataiku.get_custom_variables()["SPACE_URL"]
api_key = dataiku.get_custom_variables()["API_KEY"]
dataiku.set_remote_dss(space_url, api_key, no_check_certificate=True)
dataiku.set_default_project_key(project_key)
client = dataiku.api_client()
project = client.get_project(project_key)
print("Done getting remote client")


2023/06/21 12:05:24 INFO mlflow.tracking.fluent: Experiment with name 'custom-modeling_v000' does not exist. Creating a new experiment.


None
Starting set experiment
Sleeping for 30s to ensure experiment is fully created
Sleep done
Done getting remote client


## Experimenting

The goal of experiment tracking is to *instrument the iterative process of ML model training* by collecting all parameters and results of each trial. To be more specific, within an **experiment**, you perform multiple **runs**, each run being different from the others because of the **parameters** you use for it. You also need to specific which **metrics** to track, they will reflect the performance of the model for a given set of parameters.

In this notebook example, if you want to produce experiment runs:
* edit the parameters in the 3.1 cell and run it
* run the 3.2 cell to effectively... perform the run 🙂

### Defining the parameters of our run

In [9]:
# Create run name
run_params = {}
run_metrics = {}

# Define run parameters
# -- Which columns to retain ?
categorical_cols = ["gender", "ip_country_code"]
run_params["categorical_cols"] = categorical_cols
numerical_cols = ["age", "price_first_item_purchased", "pages_visited", "campaign"]
run_params["numerical_cols"] = numerical_cols

# --Which algorithm to use? Which hyperparameters for this algo to try?
# ---Example: Gradient Boosting
hparams = {"n_estimators": 300,
          "loss": "exponential",
          "learning_rate": 0.1,
          "max_depth": 3,
          "random_state": 42}
clf = GradientBoostingClassifier(**hparams)
model_algo = type(clf).__name__
run_params["model_algo"] = model_algo
for hp in hparams.keys():
    run_params[hp] = hparams[hp]

# --Which cross-validation settings to use?
n_cv_folds = 5
cv = StratifiedKFold(n_splits=n_cv_folds)
run_params["n_cv_folds"] = n_cv_folds
metrics = ["f1_macro", "roc_auc"]

# --Let's print all of that to get a recap:
print(f"Parameters to log:\n {run_params}")
print(100*'-')
print(f"Metrics to log:\n {metrics}")

Parameters to log:
 {'categorical_cols': ['gender', 'ip_country_code'], 'numerical_cols': ['age', 'price_first_item_purchased', 'pages_visited', 'campaign'], 'model_algo': 'GradientBoostingClassifier', 'n_estimators': 300, 'loss': 'exponential', 'learning_rate': 0.1, 'max_depth': 3, 'random_state': 42, 'n_cv_folds': 5}
----------------------------------------------------------------------------------------------------
Metrics to log:
 ['f1_macro', 'roc_auc']


### Performing the run and logging parameters, metrics and the model

In [10]:
import urllib3
dataiku.set_remote_dss((dataiku.get_custom_variables()["SPACE_URL"]), (dataiku.get_custom_variables()["API_KEY"]), no_check_certificate=True)
dataiku.set_default_project_key(project_key)
client = dataiku.api_client()
client._session.verify = False
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

#call setup experiment again


import os

os.environ.pop('MLFLOW_TRACKING_SERVER_CERT_PATH', None)
os.environ.pop('MLFLOW_TRACKING_CLIENT_CERT_PATH', None)
os.environ["MLFLOW_TRACKING_INSECURE_TLS"] = "true"

run_ts = now_str()
run_name = f"run-{run_ts}"
with mlflow.start_run(run_name=run_name) as run:
    run_id = run.info.run_id
    print(f"Starting run {run_name} (id: {run_id})...")
    # --Preprocessing
    categorical_preprocessor = OneHotEncoder(handle_unknown="ignore")
    preprocessor = ColumnTransformer([
        ('categorical', categorical_preprocessor, categorical_cols),
        ('numerical', 'passthrough', numerical_cols)])
    
    # --Pipeline definition (preprocessing + model)
    pipeline = make_pipeline(preprocessor, clf)
    
    # --Cross-validation
    print(f"Running cross-validation...")
    scores = cross_validate(pipeline, data, target, cv=cv, scoring=metrics)
    for m in [f"test_{mname}" for mname in metrics]:
        run_metrics[f"mean_{m}"] = scores[m].mean()
        run_metrics[f"std_{m}"] = scores[m].std()
        
    # --Pipeline fit
    print("pipeline fit")
    pipeline.fit(X=data, y=target)
    # --Log the order of the class label
    run_params["class_labels"] = [str(c) for c in pipeline.classes_.tolist()]
    
    # --Log parameters, metrics and model
    print("log parameters, metrics and model")
    mlflow.log_params(params=run_params)
    print("log params")
    mlflow.log_metrics(metrics=run_metrics)
    print("log metrics")
    artifact_path = f"{model_algo}-{run_id}"
    print("artifact path complete")
    # Set up MLflow
    mlflow_extension = project.get_mlflow_extension()
    mlflow_handle = project.setup_mlflow(managed_folder=folder)
    # Get or create the experiment with the remote client
    experiment = get_or_create_experiment()
    print("Starting set experiment with remote client")
    experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)
    print("Done set experiment")
    mlflow.sklearn.log_model(sk_model=pipeline, artifact_path=artifact_path, )
    # --Set useful information to faciliate run promotion
    print("facilitate run promotion")
    mlflow_extension.set_run_inference_info(run_id=run_id,
                                            prediction_type="BINARY_CLASSIFICATION",
                                            classes=run_params["class_labels"],
                                            code_env_name=MLFLOW_CODE_ENV_NAME,
                                            target="high_value")
    print(f"DONE! Your artifacts are available at {run.info.artifact_uri}")

Starting run run-20230621120603 (id: run_20230621120603)...
Running cross-validation...
pipeline fit
log parameters, metrics and model
log params
log metrics
artifact path complete
<Experiment: artifact_location='dss-managed-folder://9CmdjkWw/custom_modeling_v00', creation_time=1687349124971, experiment_id='custom_modeling_v00', last_update_time=1687349124971, lifecycle_stage='active', name='custom-modeling_v000', tags={}>
Starting set experiment with remote client
Done set experiment
facilitate run promotion
DONE! Your artifacts are available at dss-managed-folder://9CmdjkWw/custom_modeling_v00/run_20230621120603/artifacts
