First we fetch the data:

In [None]:
import shap
from sklearn.model_selection import train_test_split


X, y = shap.datasets.adult()
print("Data fetched")
target_feature = "income"
y = [1 if y_i else 0 for y_i in y]

full_data = X.copy()
full_data[target_feature] = y

data_train, data_test = train_test_split(
    full_data, test_size=1000, random_state=96132, stratify=full_data[target_feature]
)

# Don't write out the row indices to the CSV.....
print("Saving to files")
data_train.to_parquet("adult_train.parquet", index=False)
data_test.to_parquet("adult_test.parquet", index=False)

Now create an MLClient:

In [None]:
subscription_id = 'SUB_ID'
resource_group = 'RG_NAME'
workspace_name = 'WS_NAME'

In [None]:
from azure.ml import MLClient
from azure.identity import DefaultAzureCredential
ml_client = MLClient(credential=DefaultAzureCredential(exclude_shared_token_cache_credential=True),
                     subscription_id=subscription_id,
                     resource_group_name=resource_group,
                     workspace_name=workspace_name,
                     logging_enable=True)

Upload the datasets:

In [None]:
from azure.ml.entities import Dataset

train_dataset = Dataset(
    name="Adult_Train_from_Notebook",
    local_path="adult_train.parquet",
)
ml_client.datasets.create_or_update(train_dataset)

In [None]:
test_dataset = Dataset(
    name="Adult_Test_from_Notebook",
    local_path="adult_test.parquet",
)
ml_client.datasets.create_or_update(test_dataset)

# Creating the Model

To simplify the model creation process, we're going to use a pipeline.

Before we do anything else, we need to specify the version of the RAI components:

In [None]:
version_string = '1639421365'

Now we can create the training script:

In [None]:
%%writefile training_script.py

import argparse
import os
import shutil
import tempfile


from azureml.core import Run

import mlflow
import mlflow.sklearn

import pandas as pd
from sklearn.linear_model import LogisticRegression

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", type=str, help="Path to training data")
    parser.add_argument("--target_column_name", type=str, help="Name of target column")
    parser.add_argument("--model_output", type=str, help="Path of output model")

    # parse args
    args = parser.parse_args()

    # return args
    return args


def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    print("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)

    # Read in data
    print("Reading data")
    all_data = pd.read_parquet(args.training_data)

    print("Extracting X_train, y_train")
    print("all_data cols: {0}".format(all_data.columns))
    y_train = all_data[args.target_column_name]
    X_train = all_data.drop(labels=args.target_column_name, axis="columns")
    print("X_train cols: {0}".format(X_train.columns))

    print("Training model")
    # The estimator can be changed to suit
    model = LogisticRegression(solver="liblinear")
    model.fit(X_train, y_train)

    # Saving model with mlflow - leave this section unchanged
    with tempfile.TemporaryDirectory() as td:
        print("Saving model with MLFlow to temporary directory")
        tmp_output_dir = os.path.join(td, "my_model_dir")
        mlflow.sklearn.save_model(sk_model=model, path=tmp_output_dir)

        print("Copying MLFlow model to output path")
        for file_name in os.listdir(tmp_output_dir):
            print("  Copying: ", file_name)
            # As of Python 3.8, copytree will acquire dirs_exist_ok as
            # an option, removing the need for listdir
            shutil.copy2(src=os.path.join(tmp_output_dir, file_name), dst=os.path.join(args.model_output, file_name))


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Now, we want to place this into a component:

In [None]:
from azure.ml.entities import Code, CommandComponent

training_code = Code(
    local_path='training_script.py'
)

training_inputs = {
    'training_data': { 'type': 'path'},
    'target_column_name': { 'type': 'string'}
}

training_outputs = {
    'model_output': { 'type': 'path'}
}

training_component = CommandComponent(
    name="MyTrainingComponent",
    version="2",
    display_name="Simple training component",
    code=training_code,
    environment=f"AML-RAI-Environment:{version_string}",
    inputs=training_inputs,
    outputs=training_outputs,
    command="python training_script.py " \
            "--training_data ${{inputs.training_data}} " \
            "--target_column_name ${{inputs.target_column_name}} " \
            "--model_output ${{outputs.model_output}}"
)

ml_client.components.create_or_update(training_component)

# Running a training pipeline
Now we have a script which can train a model, we need to run it:

In [None]:
import time

from azure.ml.entities import JobInput, ComponentJob, PipelineJob

model_name_suffix = int(time.time())
model_name = 'my_trained_nb_model'

This is going to be a two component pipeline. The first will be the one we created above, which will train our model. The second will register it in AzureML:

In [None]:
# The overall inputs for the pipeline

pipeline_inputs = {
    'target_column_name': 'income',
    'my_training_data': JobInput(dataset=f"Adult_Train_from_Notebook:1"),
    'my_test_data': JobInput(dataset=f"Adult_Test_from_Notebook:1")
}

# Specify the training job
train_job_inputs = {
    'target_column_name': '${{inputs.target_column_name}}',
    'training_data': '${{inputs.my_training_data}}',
}
train_job_outputs = {
    'model_output': None
}
train_job = ComponentJob(
    component=f"MyTrainingComponent:2",
    inputs=train_job_inputs,
    outputs=train_job_outputs
)

# The model registration job
register_job_inputs = {
    'model_input_path': '${{jobs.train-model-job.outputs.model_output}}',
    'model_base_name': model_name,
    'model_name_suffix': model_name_suffix
}
register_job_outputs = {
    'model_info_output_path': None
}
register_job = ComponentJob(
    component=f"RegisterModel:{version_string}",
    inputs=register_job_inputs,
    outputs=register_job_outputs
)

With our jobs specified, assemble them into a pipeline:

In [None]:
model_registration_pipeline_job = PipelineJob(
    experiment_name=f"Register_Model_From_Notebook_01",
    description="Create and register a model from a notebook",
    jobs={
        'train-model-job': train_job,
        'register-model-job': register_job,
    },
    inputs=pipeline_inputs,
    outputs=register_job_outputs,
    compute="cpucluster"
)

And submit it:

In [None]:
from azure.ml.entities import PipelineJob

def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in ['Completed', 'Failed', 'Canceled', 'NotResponding']:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))
    assert created_job.status == 'Completed'
    return created_job

In [None]:
# This is the actual submission

training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

# Creating the RAI Insights
We have a registered model, and can now run a pipeline to create the RAI insights. First off, compute the name of the model we registered (this is not straightforward since the Register Model component is used in testing):

In [None]:
expected_model_id = f'{model_name}_{model_name_suffix}:1'

Now, we create the RAI pipeline itself. There are three 'component stages' in this pipeline:

1. Fetch the model
1. Construct an empty RAI dashboard
1. Run the RAI tool components

The job to fetch the registered model is:

In [None]:
# This won't be necessary once models are types within the pipeline graph

fetch_job_inputs = {
    'model_id': expected_model_id
}
fetch_job_outputs = {
    'model_info_output_path': None
}
fetch_job = ComponentJob(
    component=f"FetchRegisteredModel:{version_string}",
    inputs=fetch_job_inputs,
    outputs=fetch_job_outputs
)

With this registered model (and our datasets), we can create an empty RAI dashboard:

In [None]:
# Top level RAI Insights component

# We will reuse the same pipeline_inputs object in the end
create_rai_inputs = {
    'title': 'Run built from a Notebook',
    'task_type': 'classification',
    'model_info_path': '${{jobs.fetch-model-job.outputs.model_info_output_path}}',
    'train_dataset': '${{inputs.my_training_data}}',
    'test_dataset': '${{inputs.my_test_data}}',
    'target_column_name': '${{inputs.target_column_name}}',
    'categorical_column_names': '["Race", "Sex", "Workclass", "Marital Status", "Country", "Occupation"]',
}
create_rai_outputs = {
    'rai_insights_dashboard': None # Could theoretically redirect the datastore here
}
create_rai_job = ComponentJob(
    component=f"RAIInsightsConstructor:{version_string}",
    inputs=create_rai_inputs,
    outputs=create_rai_outputs
)

Now, create an instance of each of our RAI tools:

In [None]:
# Setup the explanation

# Not explanations, pending bugfix for save/load/save scenario
# explain_inputs = {
#    'comment': 'Insert text here',
#    'rai_insights_dashboard': '${{jobs.create-rai-job.outputs.rai_insights_dashboard}}'
#}
#explain_outputs = {
#    'explanation': None
#}
#explain_job = ComponentJob(
#    component=f"RAIInsightsExplanation:{version_string}",
#    inputs=explain_inputs,
#    outputs=explain_outputs
#)

# Setup causal
causal_inputs = {
    'rai_insights_dashboard': '${{jobs.create-rai-job.outputs.rai_insights_dashboard}}',
    'treatment_features': '["Age", "Sex"]',
    'heterogeneity_features': '["Marital Status"]'
}
causal_outputs = {
    'causal': None
}
causal_job = ComponentJob(
    component=f"RAIInsightsCausal:{version_string}",
    inputs=causal_inputs,
    outputs=causal_outputs
)

# Setup counterfactual
counterfactual_inputs = {
    'rai_insights_dashboard': '${{jobs.create-rai-job.outputs.rai_insights_dashboard}}',
    'total_CFs': '10',
    'desired_class': 'opposite'
}
counterfactual_outputs = {
    'counterfactual': None
}
counterfactual_job = ComponentJob(
    component=f"RAIInsightsCounterfactual:{version_string}",
    inputs=counterfactual_inputs,
    outputs=counterfactual_outputs
)

# Setup error analysis
error_analysis_inputs = {
    'rai_insights_dashboard': '${{jobs.create-rai-job.outputs.rai_insights_dashboard}}',
    'filter_features': '["Race", "Sex", "Workclass", "Marital Status", "Country", "Occupation"]'
}
error_analysis_outputs = {
    'error_analysis': None
}
error_analysis_job = ComponentJob(
    component=f"RAIInsightsErrorAnalysis:{version_string}",
    inputs=error_analysis_inputs,
    outputs=error_analysis_outputs
)

Now the 'gather' component which assembles everything into an `RAIInsights` object, and computes the JSON for the UX:

In [None]:
# Configure the gather component
gather_inputs = {
    'constructor': '${{jobs.create-rai-job.outputs.rai_insights_dashboard}}',
    # 'insight_1': '${{jobs.explain-rai-job.outputs.explanation}}',
    'insight_2': '${{jobs.causal-job.outputs.causal}}',
    'insight_3': '${{jobs.counterfactual-job.outputs.counterfactual}}',
    'insight_4': '${{jobs.error-analysis-job.outputs.error_analysis}}'
}
gather_outputs = {
    'dashboard': None,
    'ux_json': None
}
gather_job = ComponentJob(
    component=f"RAIInsightsGather:{version_string}",
    inputs=gather_inputs,
    outputs=gather_outputs
)

Finally, the pipeline itself:

In [None]:
# Pipeline to construct the RAI Insights
insights_pipeline_job = PipelineJob(
    experiment_name=f"Compute_Insights_from_Notebook_{version_string}",
    description="Python submitted Adult insights using fetched model",
    jobs={
        'fetch-model-job': fetch_job,
        'create-rai-job': create_rai_job,
        'causal-job': causal_job,
        'counterfactual-job': counterfactual_job,
        'error-analysis-job': error_analysis_job,
        # 'explain-job': explain_job,
        'gather-job': gather_job
    },
    inputs=pipeline_inputs,
    outputs=None,
    compute="cpucluster"
)

And submit it:

In [None]:
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

# Download and display the insights
Now we can download the insights we have computed. To start, we need to obtain the Run id of the 'gather-job' which ran as part of the previous pipeline. We have a helper for this, but the name of the experiment is required:

In [None]:
from azure_ml_rai._list_rai_runs import list_rai_insights

In [None]:
run_list = list_rai_insights(ml_client, insights_pipeline_job.experiment_name)

print(insights_pipeline_job.experiment_name)
display(run_list)

Before we can download the generated insights, we need to go to the Azure portal, and locate the blob storage account associated with the workspace. Within the 'Access Control' pane, we then add ourselves to the role 'Storage Blob Data Contributor.'

With this done (and a small pause to ensure permissions have propagated), we can use the mini SDK to download to a local directory:

In [None]:
from azure_ml_rai._download_rai_insights import download_rai_insights

download_dir = 'my_downloaded_insight'

download_rai_insights(
    ml_client,
    rai_insight_id=run_list[0],
    path=download_dir
)

And with everything downloaded, we can load the RAIInsights object and instantiate the dashboard:

In [None]:
from responsibleai import RAIInsights
from raiwidgets import ResponsibleAIDashboard

rai_i = RAIInsights.load(download_dir)

ResponsibleAIDashboard(rai_i)