# Analysis of Synthetic Data

This notebook demonstrates a hypothetical scenario of how likely a programmer should be given access to a GPT2 model for inferencing, based on information such as their favorite programming language, preference for tabs vs spaces, OS, location and so forth. Each programmer will be given a score between [0,10] where a score between [7,10] indicates access given to the programmer and [0,7) indicates access denied. The data were synthetically generated via the [PyPI package, Fibber.io](https://pypi.org/project/fibber/).

First, we need to specify the version of the RAI components which are available in the workspace. This was specified when the components were uploaded, and will have defaulted to '1':

In [1]:
version_string = '1'

We also need to give the name of the compute cluster we want to use in AzureML. Later in this notebook, we will create it if it does not already exist:

In [2]:
compute_name = "rai-cluster"

Finally, we need to specify a version for the data and components we will create while running this notebook. This should be unique for the workspace, but the specific value doesn't matter:

In [3]:
rai_programmer_example_version_string = '5'

## Accessing the Data

We supply the synthetic data as a pair of parquet files and accompanying `MLTable` file. We can read them in and take a brief look:

In [4]:
import os
import pandas as pd

Now define the paths to the data:

In [5]:
train_data_path = 'data-programmer-regression/train/'

In [6]:
test_data_path = 'data-programmer-regression/test/'

Load some data for a quick view:

In [7]:
import mltable

tbl = mltable.load(train_data_path)
train_df: pd.DataFrame = tbl.to_pandas_dataframe()

display(train_df)

Unnamed: 0,score,style,YOE,IDE,Programming language,location,Number of github repos contributed to,Employer,OS,job title,age
0,4.0,tabs,6.0,XCode,Java,Europe,0.0,K,Windows,SWE 2,39.1
1,0.0,tabs,5.0,Visual Studio,Java,Europe,0.0,F,Windows,SWE 2,36.1
2,9.0,spaces,18.0,Visual Studio,Java,Europe,0.0,A,Windows,SWE 2,37.1
3,8.0,tabs,9.0,Intellij,Python,Europe,0.0,F,Windows,SWE 1,34.1
4,3.0,tabs,8.0,pyCharm,Python,Europe,3.0,G,Windows,SWE 1,37.9
...,...,...,...,...,...,...,...,...,...,...,...
1995,9.0,spaces,15.0,XCode,Java,Europe,0.0,J,Windows,SWE 2,30.3
1996,2.0,spaces,18.0,Visual Studio,PHP,Europe,0.0,F,Linux,Distinguished Engineer,30.1
1997,8.0,spaces,14.0,Visual Studio,C#,Europe,0.0,F,Windows,SWE 2,33.1
1998,2.0,tabs,6.0,VSCode,Swift,North America,0.0,J,Linux,Distinguished Engineer,28.8


The (synthetic) data are about a collection of programmers, with a 'score' column which we wish to predict:

In [8]:
target_column_name = "score"

First, we need to upload the datasets to our workspace. We start by creating an `MLClient` for interactions with AzureML:

In [13]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
ml_client = MLClient.from_config(credential=DefaultAzureCredential(exclude_shared_token_cache_credential=True),
                     logging_enable=True)

Found the config file in: /home/workspace/work/RAI-vNext-Preview/config.json


We can now upload the data to AzureML:

In [19]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

input_train_data = "Programmers_Train_MLTable"
input_test_data = "Programmers_Test_MLTable"

train_data = Data(
    path=train_data_path,
    type=AssetTypes.MLTABLE,
    description="RAI programmers training data",
    name=input_train_data,
    version=rai_programmer_example_version_string,
)
ml_client.data.create_or_update(train_data)

test_data = Data(
    path=test_data_path,
    type=AssetTypes.MLTABLE,
    description="RAI programmers test data",
    name=input_test_data,
    version=rai_programmer_example_version_string,
)
ml_client.data.create_or_update(test_data)

[32mUploading train (0.02 MBs): 100%|██████████| 19510/19510 [00:00<00:00, 39405.02it/s]
[39m

[32mUploading test (0.01 MBs): 100%|██████████| 13302/13302 [00:00<00:00, 37347.49it/s]
[39m



Data({'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': ['./programmers-test.parquet'], 'type': 'mltable', 'is_anonymous': False, 'auto_increment_version': False, 'name': 'Programmers_Test_MLTable', 'description': 'RAI programmers test data', 'tags': {}, 'properties': {}, 'id': '/subscriptions/fac34303-435d-4486-8c3f-7094d82a0b60/resourceGroups/RAIPM/providers/Microsoft.MachineLearningServices/workspaces/RAIPM2/data/Programmers_Test_MLTable/versions/6', 'Resource__source_path': None, 'base_path': '/home/workspace/work/RAI-vNext-Preview/examples/notebooks', 'creation_context': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.SystemData object at 0x7f89801a76d0>, 'serialize': <msrest.serialization.Serializer object at 0x7f898018ee20>, 'version': '6', 'latest_version': None, 'path': 'azureml://subscriptions/fac34303-435d-4486-8c3f-7094d82a0b60/resourcegroups/RAIPM/workspaces/RAIPM2/datastores/workspaceblobstore/paths/LocalUpload/d41b357e3e7193f0832161f679eba6

# Creating the Model

To simplify the model creation process, we're going to use a pipeline.

We create a directory for the training script:

In [20]:
import os

os.mkdir('programmer_component_src')

Next, we write out our training script:

In [21]:
%%writefile programmer_component_src/training_script_reg.py

import argparse
import os
import shutil
import tempfile


from azureml.core import Run

import mlflow
import mlflow.sklearn

import mltable

import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", type=str, help="Path to training data")
    parser.add_argument("--target_column_name", type=str, help="Name of target column")
    parser.add_argument("--model_output", type=str, help="Path of output model")

    # parse args
    args = parser.parse_args()

    # return args
    return args

def create_regression_pipeline(X, y):
    pipe_cfg = {
        'num_cols': X.dtypes[X.dtypes == 'int64'].index.values.tolist(),
        'cat_cols': X.dtypes[X.dtypes == 'object'].index.values.tolist(),
    }
    num_pipe = Pipeline([
        ('num_imputer', SimpleImputer(strategy='median')),
        ('num_scaler', StandardScaler())
    ])
    cat_pipe = Pipeline([
        ('cat_imputer', SimpleImputer(strategy='constant', fill_value='?')),
        ('cat_encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
    ])
    feat_pipe = ColumnTransformer([
        ('num_pipe', num_pipe, pipe_cfg['num_cols']),
        ('cat_pipe', cat_pipe, pipe_cfg['cat_cols'])
    ])

    # Append classifier to preprocessing pipeline.
    # Now we have a full prediction pipeline.
    pipeline = Pipeline(steps=[('preprocessor', feat_pipe),
                               ('model', LinearRegression())])
    return pipeline.fit(X, y)

def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    print("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)
    
    # Read in data
    print("Reading data")
    tbl = mltable.load(args.training_data)
    all_data = tbl.to_pandas_dataframe()

    print("Extracting X_train, y_train")
    print("all_data cols: {0}".format(all_data.columns))
    y_train = all_data[args.target_column_name]
    X_train = all_data.drop(labels=args.target_column_name, axis="columns")
    print("X_train cols: {0}".format(X_train.columns))

    print("Training model")
    # The estimator can be changed to suit
    model = create_regression_pipeline(X_train, y_train)

    # Saving model with mlflow - leave this section unchanged
    with tempfile.TemporaryDirectory() as td:
        print("Saving model with MLFlow to temporary directory")
        tmp_output_dir = os.path.join(td, "my_model_dir")
        mlflow.sklearn.save_model(sk_model=model, path=tmp_output_dir)

        print("Copying MLFlow model to output path")
        for file_name in os.listdir(tmp_output_dir):
            print("  Copying: ", file_name)
            # As of Python 3.8, copytree will acquire dirs_exist_ok as
            # an option, removing the need for listdir
            shutil.copy2(src=os.path.join(tmp_output_dir, file_name), dst=os.path.join(args.model_output, file_name))


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Writing programmer_component_src/training_script_reg.py


Now, we can build this into an AzureML component:

In [22]:
from azure.ai.ml import load_component

yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: rai_programmers_training_component
display_name: Programmers training component for RAI example
version: {rai_programmer_example_version_string}
type: command
inputs:
  training_data:
    type: path
  target_column_name:
    type: string
outputs:
  model_output:
    type: path
code: ./programmer_component_src/
environment: azureml:AML-RAI-Environment:{version_string}
command: >-
  python training_script_reg.py
  --training_data ${{{{inputs.training_data}}}}
  --target_column_name ${{{{inputs.target_column_name}}}}
  --model_output ${{{{outputs.model_output}}}}
"""

yaml_filename = "ProgrammersRegTrainingComp.yaml"

with open(yaml_filename, 'w') as f:
    f.write(yaml_contents)
    
train_component_definition = load_component(
    path=yaml_filename
)

ml_client.components.create_or_update(train_component_definition)

[32mUploading programmer_component_src (0.0 MBs): 100%|██████████| 3601/3601 [00:00<00:00, 36437.01it/s]
[39m



CommandComponent({'auto_increment_version': False, 'source': 'REMOTE.WORKSPACE.COMPONENT', 'is_anonymous': False, 'name': 'rai_programmers_training_component', 'description': None, 'tags': {}, 'properties': {}, 'id': '/subscriptions/fac34303-435d-4486-8c3f-7094d82a0b60/resourceGroups/RAIPM/providers/Microsoft.MachineLearningServices/workspaces/RAIPM2/components/rai_programmers_training_component/versions/6', 'Resource__source_path': None, 'base_path': './', 'creation_context': <azure.ai.ml._restclient.v2022_05_01.models._models_py3.SystemData object at 0x7f89801893a0>, 'serialize': <msrest.serialization.Serializer object at 0x7f896b62ef40>, 'command': 'python training_script_reg.py --training_data ${{inputs.training_data}} --target_column_name ${{inputs.target_column_name}} --model_output ${{outputs.model_output}}', 'code': '/subscriptions/fac34303-435d-4486-8c3f-7094d82a0b60/resourceGroups/RAIPM/providers/Microsoft.MachineLearningServices/workspaces/RAIPM2/codes/cba5141d-76eb-48e9-81f

We need a compute target on which to run our jobs. The following checks whether the compute specified above is present; if not, then the compute target is created.

In [23]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="Standard_DS2_v2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600
    )
    ml_client.compute.begin_create_or_update(my_compute)
    print("Initiated compute creation")

Found existing compute: rai-cluster


## Running a training pipeline

Now that we have our training component, we can run it. We begin by generating a unique name for the mode;

In [24]:
import time

model_name_suffix = int(time.time())
model_name = 'rai_programmer_example_reg'

Next, we define our training pipeline. This has two components. The first is the training component which we defined above. The second is a component to register the model in AzureML:

In [25]:
from azure.ai.ml import dsl, Input

register_component = ml_client.components.get(
    name="register_model", version=version_string
)
train_model_component = ml_client.components.get(
    name="rai_programmers_training_component", version=rai_programmer_example_version_string
)
programmers_train_mltable = Input(
    type="mltable", path=f"{input_train_data}:{rai_programmer_example_version_string}", mode="download"
)
programmers_test_mltable = Input(
    type="mltable", path=f"{input_test_data}:{rai_programmer_example_version_string}", mode="download"
)

@dsl.pipeline(
    compute=compute_name,
    description="Register Model for RAI Programmers example",
    experiment_name=f"RAI_Programmers_Example_Model_Training_{model_name_suffix}",
)
def my_training_pipeline(target_column_name, training_data):
    trained_model = train_component_definition(
        target_column_name=target_column_name,
        training_data=training_data
    )
    trained_model.set_limits(timeout=120)

    _ = register_component(
        model_input_path=trained_model.outputs.model_output,
        model_base_name=model_name,
        model_name_suffix=model_name_suffix,
    )

    return {}

model_registration_pipeline_job = my_training_pipeline(target_column_name, programmers_train_mltable)

With the training pipeline defined, we can submit it for execution in AzureML. We define a helper function to wait for the job to complete:

In [26]:
from azure.ai.ml.entities import PipelineJob

def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    while created_job.status not in ['Completed', 'Failed', 'Canceled', 'NotResponding']:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))
    assert created_job.status == 'Completed'
    return created_job

# This is the actual submission
training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Completed


## Creating the RAI Insights

Now that we have our model, we can generate RAI insights for it. We will need the `id` of the registered model, which will be as follows:

In [27]:
expected_model_id = f'{model_name}_{model_name_suffix}:1'
azureml_model_id = f'azureml:{expected_model_id}'

Next, we load the RAI components, so that we can construct a pipeline:

In [28]:
rai_constructor_component = ml_client.components.get(
    name="rai_insights_constructor", version=version_string
)

rai_explanation_component = ml_client.components.get(
    name="rai_insights_explanation", version=version_string
)

rai_causal_component = ml_client.components.get(
    name="rai_insights_causal", version=version_string
)

rai_counterfactual_component = ml_client.components.get(
    name="rai_insights_counterfactual", version=version_string
)

rai_erroranalysis_component = ml_client.components.get(
    name="rai_insights_erroranalysis", version=version_string
)

rai_gather_component = ml_client.components.get(
    name="rai_insights_gather", version=version_string
)

rai_scorecard_component = ml_client.components.get(
    name="rai_score_card", version=version_string
)

## Score card generation config
For score card generation, we need some additional configuration in a separate json file. Here we configure the following model performance metrics for reporting:
- mean absolute error
- mean squared error

In [37]:
import json

score_card_config_dict = {
  "Model": {
    "ModelName": "GPT2 Access",
    "ModelType": "Regression",
    "ModelSummary": "This is a regression model to analyzer how likely a programmer is given access to gpt 2"
  },
  "Metrics": {
    "mean_absolute_error": {
      "threshold": "<=20"
    },
    "mean_squared_error": {}
  },
  "FeatureImportance": {
    "top_n": 6
  },
  "DataExplorer": {
    "features": [
      "YOE",
      "age"
    ]
  },
  "Fairness": {
    "metric": ["mean_squared_error", "mean_absolute_error"],
    "sensitive_features": ["IDE", "style"],
    "fairness_evaluation_kind": "difference"
  }
}

score_card_config_filename = "rai_programmer_regression_score_card_config.json"

with open(score_card_config_filename, 'w') as f:
    json.dump(score_card_config_dict, f)

We can now specify our pipeline. Complex objects (such as lists of column names) have to be converted to JSON strings before being passed to the components. Note that the timeout for the counterfactual job is noticeably longer, since generating counterfactual points is a comparatively slow process:

In [38]:
import json
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

score_card_config_path = Input(
    type="uri_file",
    path=score_card_config_filename,
    mode="download"
)

categorical_columns = json.dumps(["location", "style", "job title", "OS", "Employer", "IDE", "Programming language"])
treatment_features = json.dumps(["Number of github repos contributed to", "YOE"])
desired_range = json.dumps([5, 10])
filter_columns = json.dumps(["style", "Employer"])

@dsl.pipeline(
        compute=compute_name,
        description="Example RAI computation on programmers data",
        experiment_name=f"RAI_Programmers_Example_RAIInsights_Computation_{model_name_suffix}",
    )
def rai_programmer_regression_pipeline(
        target_column_name,
        train_data,
        test_data,
        score_card_config_path,
    ):
        # Initiate the RAIInsights
        create_rai_job = rai_constructor_component(
            title="RAI Dashboard Example",
            task_type="regression",
            model_info=expected_model_id,
            model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),
            train_dataset=train_data,
            test_dataset=test_data,
            target_column_name=target_column_name,
            categorical_column_names=categorical_columns
        )
        create_rai_job.set_limits(timeout=120)
        
        # Add an explanation
        explain_job = rai_explanation_component(
            comment="Explanation for the programmers dataset",
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        )
        explain_job.set_limits(timeout=120)
        
        # Add causal analysis
        causal_job = rai_causal_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
            treatment_features=treatment_features,
        )
        causal_job.set_limits(timeout=180)
        
        # Add counterfactual analysis
        counterfactual_job = rai_counterfactual_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
            total_cfs=10,
            desired_range=desired_range
        )
        counterfactual_job.set_limits(timeout=600)
        
        # Add error analysis
        erroranalysis_job = rai_erroranalysis_component(
            rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
            filter_features=filter_columns
        )
        erroranalysis_job.set_limits(timeout=120)
        
        # Combine everything
        rai_gather_job = rai_gather_component(
            constructor=create_rai_job.outputs.rai_insights_dashboard,
            insight_1=explain_job.outputs.explanation,
            insight_2=causal_job.outputs.causal,
            insight_3=counterfactual_job.outputs.counterfactual,
            insight_4=erroranalysis_job.outputs.error_analysis,
        )
        rai_gather_job.set_limits(timeout=120)

        rai_gather_job.outputs.dashboard.mode = "upload"
        rai_gather_job.outputs.ux_json.mode = "upload"

        # Generate score card in pdf format for a summary report on model performance,
        # and observe distrbution of error between prediction vs ground truth.
        rai_scorecard_job = rai_scorecard_component(
            dashboard=rai_gather_job.outputs.dashboard,
            pdf_generation_config=score_card_config_path
        )

        return {
            "dashboard": rai_gather_job.outputs.dashboard,
            "ux_json": rai_gather_job.outputs.ux_json,
            "scorecard": rai_scorecard_job.outputs.scorecard
        }

Next, we define the pipeline object itself, and ensure that the outputs will be available for download:

In [39]:
import uuid
from azure.ai.ml import Output

insights_pipeline_job = rai_programmer_regression_pipeline(
    target_column_name=target_column_name,
    train_data=programmers_train_mltable,
    test_data=programmers_test_mltable,
    score_card_config_path=score_card_config_path,
)

rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.scorecard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/scorecard/",
    mode="upload",
    type="uri_folder",
)

And submit the pipeline to AzureML for execution:

In [40]:
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

[32mUploading rai_programmer_regression_score_card_config.json[32m (< 1 MB): 100%|██████████| 492/492 [00:00<00:00, 4.72kB/s]
[39m



Latest status : Running
Latest status : Running
Latest status : Running
Latest status : Completed


The dashboard should appear in the AzureML portal in the registered model view. The following cell computes the expected URI:

In [None]:
sub_id = ml_client._operation_scope.subscription_id
rg_name = ml_client._operation_scope.resource_group_name
ws_name = ml_client.workspace_name

expected_uri = f"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}"

print(f"Please visit {expected_uri} to see your analysis")

## Downloading the Scorecard PDF

We can download the scorecard PDF from our pipeline as follows:

In [None]:
target_directory = "."

ml_client.jobs.download(
    insights_job.name, download_path=target_directory, output_name="scorecard"
)

We can also download the dashboard, and view it in this notebook. Note that this is fragile with respect to the Python version and conda environment:

In [None]:
import tempfile
import pathlib
from responsibleai import RAIInsights
from raiwidgets import ResponsibleAIDashboard
with tempfile.TemporaryDirectory() as dashboard_path:
        ml_client.jobs.download(
            insights_job.name, download_path=dashboard_path, output_name="dashboard"
        )
        expected_path = pathlib.Path(dashboard_path) / 'named-outputs' / 'dashboard'
        # This load is very fragile with respect to Python version and conda environment
        rai_i = RAIInsights.load(expected_path)
        ResponsibleAIDashboard(rai_i)

## Constructing the pipeline in YAML

It is also possible to specify the pipeline as a YAML file, and submit that using the command line. We will now create a YAML specification of the above pipeline and submit that:

In [None]:
yaml_contents = f"""
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
experiment_name: RAI_Programmer_Example_YAML_{rai_programmer_example_version_string}
type: pipeline

inputs:
  target_column_name: {target_column_name}
  my_training_data:
    type: mltable
    path: azureml:{input_train_data}:{rai_programmer_example_version_string}
    mode: download
  my_test_data:
    type: mltable
    path: azureml:{input_test_data}:{rai_programmer_example_version_string}
    mode: download

settings:
  default_datastore: azureml:workspaceblobstore
  default_compute: azureml:cpucluster
  continue_on_step_failure: false

jobs:
  create_rai_job:
    type: command
    component: azureml:rai_insights_constructor:{version_string}
    inputs:
      title: RAI Programmer Analysis from YAML
      task_type: regression
      model_info: {expected_model_id}
      model_input:
        type: mlflow_model
        path: {azureml_model_id}
      train_dataset: ${{{{parent.inputs.my_training_data}}}}
      test_dataset: ${{{{parent.inputs.my_test_data}}}}
      target_column_name: ${{{{parent.inputs.target_column_name}}}}
      categorical_column_names: '["location", "job title", "OS", "Employer", "IDE", "Programming language", "style"]'
      
  explain_01:
    type: command
    component: azureml:rai_insights_explanation:{version_string}
    inputs:
      comment: Explanation from YAML for RAI Programmer example
      rai_insights_dashboard: ${{{{parent.jobs.create_rai_job.outputs.rai_insights_dashboard}}}}

  causal_01:
    type: command
    component: azureml:rai_insights_causal:{version_string}
    inputs:
      rai_insights_dashboard: ${{{{parent.jobs.create_rai_job.outputs.rai_insights_dashboard}}}}
      treatment_features: '["Number of github repos contributed to", "YOE"]'

  counterfactual_01:
    type: command
    component: azureml:rai_insights_counterfactual:{version_string}
    inputs:
      rai_insights_dashboard: ${{{{parent.jobs.create_rai_job.outputs.rai_insights_dashboard}}}}
      total_CFs: 10
      desired_range: '[5, 10]'

  error_analysis_01:
    type: command
    component: azureml:rai_insights_erroranalysis:{version_string}
    inputs:
      rai_insights_dashboard: ${{{{parent.jobs.create_rai_job.outputs.rai_insights_dashboard}}}}
      filter_features: '["style", "Employer"]'

  gather_01:
    type: command
    component: azureml:rai_insights_gather:{version_string}
    inputs:
      constructor: ${{{{parent.jobs.create_rai_job.outputs.rai_insights_dashboard}}}}
      insight_1: ${{{{parent.jobs.causal_01.outputs.causal}}}}
      insight_2: ${{{{parent.jobs.counterfactual_01.outputs.counterfactual}}}}
      insight_3: ${{{{parent.jobs.error_analysis_01.outputs.error_analysis}}}}
      insight_4: ${{{{parent.jobs.explain_01.outputs.explanation}}}}
"""

yaml_pipeline_filename = "rai_programmer_example.yaml"

with open(yaml_pipeline_filename, 'w') as f:
    f.write(yaml_contents)

The created file can then be submitted using the Azure CLI:

In [None]:
cmd_line = ['az', 'ml', 'job', 'create',
            '--resource-group', rg_name,
            '--workspace', ws_name,
            '--file', yaml_pipeline_filename]

import subprocess

try:
    cmd = subprocess.run(cmd_line, check=True, shell=True, capture_output=True)
except subprocess.CalledProcessError as cpe:
    print(f"Error invoking: {cpe.args}")
    print(cpe.stdout)
    print(cpe.stderr)
    raise
else:
    print("Azure CLI submission completed")