In [3]:
import os
import mlflow
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import product

# Sklearn modules
from sklearn.model_selection import train_test_split, ParameterGrid, cross_validate
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, make_scorer
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.pipeline import Pipeline


# if git is not installed in docker container
os.environ['GIT_PYTHON_REFRESH'] = 'quiet'

# silence warnings
warnings.filterwarnings('ignore')


In [4]:
# metrics related code

def plot_cv_metrics(cv_metrics: list[dict]):
    with plt.style.context(style='fivethirtyeight'):
        rows_needed = int(np.ceil(len(cv_metrics)/2))
        
        fig, ax = plt.subplots(rows_needed, 2, figsize=(15, rows_needed*3)) 
        for index, metric in enumerate(cv_metrics):
            y_values = cv_metrics[metric]
            x_values = np.arange(len(y_values))
    
            ax[index//2, index%2].plot(x_values, y_values) 
            ax[index//2, index%2].set_title(metric) 
    
        plt.tight_layout()
        plt.close(fig)

    return fig

import matplotlib.pyplot as plt
import numpy as np

def plot_cv_metrics(cv_metrics: list[dict]) -> plt.Figure:
    """
    Plots cross-validation metrics.

    Parameters:
        cv_metrics (list[dict]): A list of dictionaries containing cross-validation metrics.
        
    Returns:
        fig (plt.Figure): The generated matplotlib figure.
    """
    # Set plot style to 'fivethirtyeight'
    with plt.style.context(style='fivethirtyeight'):
        # Calculate the number of rows needed for subplots
        rows_needed = int(np.ceil(len(cv_metrics) / 2))
        
        # Create a subplot figure with the desired dimensions
        fig, ax = plt.subplots(rows_needed, 2, figsize=(15, rows_needed * 3))
        
        # Iterate over each metric in cv_metrics
        for index, metric in enumerate(cv_metrics):
            # Extract y values for the current metric
            y_values = cv_metrics[metric]
            
            # Generate x values for plotting
            x_values = np.arange(len(y_values))
    
            # Plot the metric on the corresponding subplot
            ax[index // 2, index % 2].plot(x_values, y_values) 
            
            # Set title for the subplot
            ax[index // 2, index % 2].set_title(metric) 
    
        # Adjust subplot layout for better spacing
        plt.tight_layout()
        
        # Close the figure to release memory
        plt.close(fig)

    # Return the generated figure
    return fig


In [None]:
def parse_search_space(search_space: dict) -> list:
    parsed_steps = {}
    for step, step_objects in search_space.items():
        step_data = []
        for step_object in step_objects:
            obj = step_object.get('object')
            params = step_object.get('params')
            if obj:
                if params:
                    step_data += [obj(**p) for p in ParameterGrid(params)]
                else:
                    step_data.append(obj())
            else:
                step_data.append(obj)
        parsed_steps[step] = step_data

    return [
        tuple(zip(parsed_steps.keys(), combination)) 
        for combination in product(*parsed_steps.values())
    ]

In [None]:
def root_mean_squared_error(y_true: pd.Series, y_pred: pd.Series) -> float:
    return np.sqrt(mean_squared_error(y_true, y_pred))  

In [None]:
def compute_metrics(y_true: pd.Series, y_pred: pd.Series, metrics: list, decimals: int = 3, prefix: str = '') -> dict:
    return {f"{prefix}{metric['name']}": round(metric['function'](y_true, y_pred), decimals) for metric in metrics}

In [None]:
def compute_aggregated_metrics(cv_metrics: dict, decimals: int = 3) -> dict:
    stats = {}
    for metric_name, values in cv_metrics.items():
        stats[f'{metric_name}_mean'] = round(np.mean(values), decimals)
        stats[f'{metric_name}_std'] = round(np.std(values), decimals)
        stats[f'{metric_name}_median'] = round(np.median(values), decimals)
    
    return stats

In [None]:
def make_scorers_dict(metrics: list) -> dict:
    return {metric['name']: metric['scorer'] for metric in metrics}

In [None]:
search_space = {
    'scaler': [
        {
            'object': None
        },
        {
            'object': StandardScaler
        },
        {
            'object': MinMaxScaler
        }
    ],
    'model': [
        {
            'object': RandomForestRegressor,
            'params': {
                'n_estimators': [10, 100, 1000],
                'max_depth': [10, None]
            }
        },
        {
            'object': ElasticNet,
            'params': {
                'alpha': np.arange(0, 1, 0.2),
                'l1_ratio': np.arange(0, 1, 0.2)
            }
        }
    ]
}

In [None]:
metrics = [
    {
        'name': 'mean_absolute_error',
        'function': mean_absolute_error,
        'scorer': make_scorer(mean_absolute_error, greater_is_better=False)
    },
    {
        'name': 'mean_squared_error',
        'function': mean_squared_error,
        'scorer': make_scorer(mean_squared_error, greater_is_better=False)
    },
    {
        'name': 'root_mean_squared_error',
        'function': root_mean_squared_error,
        'scorer': make_scorer(root_mean_squared_error, greater_is_better=False)
    },
    {
        'name': 'r2_score',
        'function': r2_score,
        'scorer': make_scorer(r2_score, greater_is_better=True)
    },
]

In [None]:
TRACKING_URI = "http://tracking_server:5000"
EXPERIMENT_NAME = "diabetes"
MODEL_NAME = "diabetes_model"
MODEL_ARTIFACT_PATH = 'model'

In [None]:
mlflow.set_tracking_uri(TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)

In [None]:
filename = './data/diabetes.csv'
target_variable = 'target'

In [None]:
df = pd.read_csv(filename)

In [None]:
df_train, df_holdout = train_test_split(
    df, test_size=0.1, random_state=42
)

X_train = df_train.drop(target_variable, axis=1)
y_train = df_train[target_variable]

X_holdout = df_holdout.drop(target_variable, axis=1)
y_holdout = df_holdout[target_variable]

In [None]:
train_dataset = mlflow.data.from_pandas(df_train, source=filename, targets=target_variable)
holdout_dataset = mlflow.data.from_pandas(df_holdout, source=filename, targets=target_variable)

In [None]:
for pipeline_steps in parse_search_space(search_space)[:2]:
    pipeline = Pipeline(pipeline_steps)
    
    tags = {
        'estimator_name': type(pipeline['model']).__name__,
        'estimator_class': str(type(pipeline['model']))
    }
    
    with mlflow.start_run(tags=tags) as run:
        pipeline.fit(X_train, y_train)
        
        # log pipeline
        mlflow.sklearn.log_model(
            sk_model = pipeline, 
            artifact_path = MODEL_ARTIFACT_PATH, 
            signature = mlflow.models.infer_signature(
                model_input = X_train, 
                model_output = pipeline.predict(X_train)
            )
        )

        # pipeline params
        pipeline_params = pipeline.get_params()
        mlflow.log_params(pipeline_params)

        # metrics train + test 
        cv_metrics = cross_validate(
            estimator = pipeline, 
            X = X_train, 
            y = y_train, 
            cv = 5,
            return_train_score = True, 
            scoring = make_scorers_dict(metrics)
        )
        cv_metrics_aggregated = compute_aggregated_metrics(cv_metrics)
        mlflow.log_metrics(cv_metrics_aggregated)

        # cv metrics plot
        cv_fig = plot_cv_metrics(cv_metrics)
        mlflow.log_figure(cv_fig, "graphs/cross_validation_metrics.png")

        # metrics holdout
        holdout_metrics = compute_metrics(
            y_true = y_holdout, 
            y_pred = pipeline.predict(X_holdout), 
            metrics = metrics, 
            prefix = 'holdout_'
        )
        mlflow.log_metrics(holdout_metrics)

        # shap values
        mlflow.shap.log_explanation(pipeline.predict, X_holdout)

        # log dataset
        mlflow.log_input(train_dataset, context="training")
        mlflow.log_input(holdout_dataset, context="holdout")

In [None]:
from mlflow import MlflowClient

client = MlflowClient()

In [None]:
try:
    client.create_registered_model(
        name = MODEL_NAME,
        tags = {
            'expriment': EXPERIMENT_NAME,
        },
        description = 'Model for diabetes prediction'
    )
except Exception as e:
    print(e)

In [None]:
experiment = client.get_experiment_by_name(name=EXPERIMENT_NAME)

In [None]:
df_runs = mlflow.search_runs(experiment_ids=experiment.experiment_id)

In [None]:
df_best_run = df_runs.sort_values(by='metrics.holdout_mean_absolute_error').head(1).reset_index(drop=True)

In [None]:
run_id = df_best_run.loc[0, 'run_id']
artifact_uri = df_best_run['artifact_uri'][0]
model_source = f"{artifact_uri}/{MODEL_ARTIFACT_PATH}"

In [None]:
model_source

In [None]:
mv = client.create_model_version(
    name = MODEL_NAME, 
    source = model_source, 
    run_id = run_id
)

In [None]:
client.set_registered_model_alias(
    name = MODEL_NAME, 
    alias = "staging", 
    version = mv.version
)

In [None]:
model_staging = mlflow.pyfunc.load_model(f"models:/{MODEL_NAME}@staging")

In [None]:
model_staging.predict(X_holdout)

In [None]:
model_staging.metadata.to_dict()