# Data and Model Drift Detection for Tabular Data

The environment of our world is constantly changing. For machine learning, this means that deployed models are confronted with unknown data and can become outdated over time. A proactive drift management approach is required to ensure that productive AI services deliver consistent business value in the long term. Check out our background article [Getting traction on Data and Model Drift with Azure Machine Learning](https://medium.com/p/ebd240176b8b/edit) for an in-depth discussion about key concepts.

This notebook provides the following mechanisms to detect and mitigate data and model drift:

- Retrieve sample datasets from CSV files and managed AML datasets
- Statistical tests and expressive visualizations to detect and analyze drift in features and model predictions
- Predictive approach to identify the impact of data and concept drift on the model
- Create automated pipelines to identify data drift regularly as part of an MLOps solution using Azure Machine Learning

The notebook was developed and tested using the ``Python 3.8-AzureML`` kernel on a Azure ML Compute Instance.

# Setup

In [1]:
from matplotlib import pyplot as plt
%config InlineBackend.figure_format='retina'

import pandas as pd
import numpy as np
import os

from scipy.stats import ks_2samp, chisquare, chi2_contingency, gaussian_kde

#!pip install scikit-learn --upgrade
import sklearn
from sklearn.metrics import classification_report
from sklearn.compose import ColumnTransformer, make_column_selector as selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
print('The scikit-learn version is {}.'.format(sklearn.__version__))

from lightgbm import LGBMClassifier

from azureml.core import Workspace, Dataset

The scikit-learn version is 1.1.1.


  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)


# Data drift identification as part of an AzureML MLOps Pipeline
We will now focus on embedding drift detection and mitigation in a MLOps architecture with Azure Machine Learning. The following section leverages concepts such as Azure ML Datasets, Models and Pipelines. The following code provides an example of an Azure ML pipeline for generating the data drift detection plots.

An Azure account with an active subscription is needed to run the following cells. [Create an account for free](https://azure.microsoft.com/en-us/free/).

## Create a new Azure ML Workspace
If the Workspace already exists, it will be reused.

In [2]:
from azureml.core import Workspace

"""
ws = Workspace.create(name='myworkspace',
            subscription_id='<enter your Azure subscription ID>',
            resource_group='myresourcegroup',
            create_resource_group=True,
            location='westeurope',
            exist_ok=True)
"""

# alternatively use 
ws = Workspace.from_config()

## Provision a new Azure ML Compute Cluster

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
cpu_cluster_name = "cpu-cluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4,
                                                           location='westeurope')
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


## Upload predicitve maintenance sample data to datastore

In [3]:
os.chdir("../")
print(os.getcwd()) 

# local data to be uploaded to the datastore
data_folder =  './data/data_raw/predictive_maintenance/'

filename_A = 'reference_pred_maintenance.csv'
filename_B = 'current_pred_maintenance.csv'

# target in datastore
datastore_path = 'amldatasets/pred_maintenance/'

# dataset infos
name_A = 'reference_pred_maintenance'
name_B = 'current_pred_maintenance'

description = 'Data drift testing dataset'
tags = {'Project' : 'pred_maintenance',
        'Format' : 'CSV'}

# get default datastore
datastore = ws.get_default_datastore()

files = [os.path.join(data_folder, filename_A), os.path.join(data_folder, filename_B)]

# upload data file to AML datastore:
datastore.upload_files(files = files, 
                       target_path = datastore_path,
                       overwrite = True,
                       show_progress = True)

# register as AML dataset
dataset_A = Dataset.Tabular.from_delimited_files(path = (datastore, datastore_path + filename_A))

dataset_A.register(ws, 
                 name=name_A, 
                 description=description,
                 tags = tags,
                 create_new_version = True)


# register as AML dataset
dataset_B = Dataset.Tabular.from_delimited_files(path = (datastore, datastore_path + filename_B))

dataset_B.register(ws, 
                 name=name_B, 
                 description=description,
                 tags = tags,
                 create_new_version = True)

/mnt/batch/tasks/shared/LS_root/mounts/clusters/natashasavic2/code/Users/natashasavic/data-model-drift/tabular-data
Uploading an estimated of 2 files
Uploading ./data/data_raw/predictive_maintenance/current_pred_maintenance.csv
Uploaded ./data/data_raw/predictive_maintenance/current_pred_maintenance.csv, 1 files out of an estimated total of 2
Uploading ./data/data_raw/predictive_maintenance/reference_pred_maintenance.csv
Uploaded ./data/data_raw/predictive_maintenance/reference_pred_maintenance.csv, 2 files out of an estimated total of 2
Uploaded 2 files


"datastore.upload_files" is deprecated after version 1.0.69. Please use "FileDatasetFactory.upload_directory" instead. See Dataset API change notice at https://aka.ms/dataset-deprecation.


{
  "source": [
    "('workspaceblobstore', 'amldatasets/pred_maintenance/current_pred_maintenance.csv')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ParseDelimited",
    "DropColumns",
    "SetColumnTypes"
  ],
  "registration": {
    "id": "67362965-0fd7-46ce-8511-7625a70de917",
    "name": "current_pred_maintenance",
    "version": 1,
    "description": "Data drift testing dataset",
    "tags": {
      "Project": "pred_maintenance",
      "Format": "CSV"
    },
    "workspace": "Workspace.create(name='Natasha_ANZ', subscription_id='3a0dc8b1-97e1-4225-b97e-ab8bacd270f6', resource_group='NetworkWatcherRG')"
  }
}

## Run an AML Experiment to test code on a remote compute instance

In [8]:
from azureml.core import Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline, PipelineParameter, PublishedPipeline, Schedule, ScheduleRecurrence
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
from azureml.core.runconfig import RunConfiguration


#SET UP THE EXPERIMENT
exp_folder = './SDK-V1/experiment'
compute_name = cpu_cluster_name

# workspace = Workspace.from_config()   

# Create a Python environment for the experiment (from a .yml file)
env = Environment.from_conda_specification("experiment_env", f"{exp_folder}/environment_pipeline.yml")

script_params = [
    '--reference_dataset', "reference_pred_maintenance",
     '--current_dataset', "current_pred_maintenance",
     '--threshold', 0.01,
     '--shortlist', "None"]

# Create a script config
script_config = ScriptRunConfig(source_directory=exp_folder,
                                script='data_drift_pipeline.py',
                                environment=env,
                                compute_target = compute_name, 
                                arguments = script_params)

# submit the experiment
experiment = Experiment(ws, name = 'data_drift_experiment_v1')
run = experiment.submit(config=script_config)
RunDetails(run).show()
run.wait_for_completion(show_output=False)

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'â€¦

{'runId': 'Data_Drift_Experiment_1659586539_950d9984',
 'target': 'cpu-cluster',
 'status': 'Completed',
 'startTimeUtc': '2022-08-04T04:29:27.034024Z',
 'endTimeUtc': '2022-08-04T04:31:10.46476Z',
 'services': {},
 'properties': {'_azureml.ComputeTargetType': 'amlctrain',
  'ContentSnapshotId': 'c2ea9b4d-e7b1-43f9-95f7-1a22563479fd',
  'azureml.git.repository_uri': 'git@github.com:natasha-savic-msft/data-model-drift.git',
  'mlflow.source.git.repoURL': 'git@github.com:natasha-savic-msft/data-model-drift.git',
  'azureml.git.branch': 'sdk_upgrade',
  'mlflow.source.git.branch': 'sdk_upgrade',
  'azureml.git.commit': '204d32f9bb24eb207e4b5fdd8dc8ed699e741d3c',
  'mlflow.source.git.commit': '204d32f9bb24eb207e4b5fdd8dc8ed699e741d3c',
  'azureml.git.dirty': 'False',
  'ProcessInfoFile': 'azureml-logs/process_info.json',
  'ProcessStatusFile': 'azureml-logs/process_status.json'},
 'inputDatasets': [{'dataset': {'id': '01d33eb3-4873-4795-b7ed-7566e3b103a8'}, 'consumptionDetails': {'type': '

## Define an Azure ML Pipeline

In [9]:
from azureml.core import Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline, PipelineParameter, PublishedPipeline, Schedule, ScheduleRecurrence
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
from azureml.core.runconfig import RunConfiguration

#SET UP THE EXPERIMENT
exp_folder = './SDK-v1/experiment'

# Create a Python environment for the experiment (from a .yml file)
env = Environment.from_conda_specification("experiment_env", f"{exp_folder}/environment_pipeline.yml")

aml_run_config = RunConfiguration()
aml_run_config.environment = env

pipeline_parameters = {
     'reference_dataset': "reference_pred_maintenance",
     'current_dataset': "current_pred_maintenance",
     'threshold': 0.01,
     'shortlist': "None"}


# Step to run a Python script
step1 = PythonScriptStep(name = 'detect_data_drift',
                         source_directory = exp_folder,
                         script_name = 'data_drift_pipeline.py',
                        arguments=[
                            "--reference_dataset", PipelineParameter(name="reference_dataset", default_value=pipeline_parameters["reference_dataset"]),
                            "--current_dataset", PipelineParameter(name="current_dataset", default_value=pipeline_parameters["current_dataset"]),
                            "--threshold", PipelineParameter(name="threshold", default_value=pipeline_parameters["threshold"]),
                            "--shortlist", PipelineParameter(name="shortlist", default_value=pipeline_parameters["shortlist"]),
                                                     ],
                         runconfig = aml_run_config,
                         compute_target = compute_name,
                         allow_reuse=False)

# Construct the pipeline
data_drift_pipeline = Pipeline(workspace = ws, steps = [step1])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'data_drift_analysis_pipeline')

pipeline_run = experiment.submit(data_drift_pipeline, pipeline_parameters=pipeline_parameters)

Created step detect_data_drift [5b56778a][60ecdf05-3c44-40f0-83b3-e34daa84a76d], (This step will run and generate new outputs)
Submitted PipelineRun 47c48471-8088-434b-8ace-d0e84a1ab5f4
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/47c48471-8088-434b-8ace-d0e84a1ab5f4?wsid=/subscriptions/3a0dc8b1-97e1-4225-b97e-ab8bacd270f6/resourcegroups/NetworkWatcherRG/workspaces/Natasha_ANZ&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


## Publish the Azure ML Pipeline

In [None]:
# Publish the pipeline
published_pipeline = data_drift_pipeline.publish(name='detect_data_drift_pipeline',
                                                  description='Generic data drift pipeline',
                                                  version='1.0')

rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

##  Schedule the Azure ML Pipeline for regular execution

In [20]:
# Schedule pipeline for periodic intervals

monthly = ScheduleRecurrence(frequency='Month', interval=1)
pipeline_schedule = Schedule.create(ws, name='Monthly drift detection',
                                        description='Monthly data drift detection',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='detect_data_drift',
                                        recurrence=monthly)

In [None]:
# list schedules
schedule = Schedule.list(ws) 
print(schedule)

## Clean up resources
<span style='color:Red'> Warning. The following cell deletes the Azure ML Workspace and its dependencies!  </span>

Note that the container registry and resource group are not deleted.

In [1]:
# Uncomment the following line to delete the Workspace and all related resources
# ws.delete(delete_dependent_resources=True, no_wait=False)