# Create a Pipeline

## Connect to Workspace

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.44.0 to work with ml-tabular_synthesis


# Create Compute or connect to it

In [2]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

vm_size="STANDARD_DS11_V2"
vm_size="STANDARD_NC6_PROMO"
compute_name = "cpu-ds11-cluster"
compute_name = "gpu-nc6-promo"

try:
    # Check for existing compute target
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size, max_nodes=2)
        compute_target = ComputeTarget.create(ws, compute_name, compute_config)
        compute_target.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


# alternative: select current compute instance

In [2]:
from azureml.core.compute_target import ComputeTargetException
from azureml.core.compute import ComputeTarget

compute_name = "gpu-nc6-promo"
try:
    # Check for existing compute target
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
    print('Found existing Target, use it.')
except ComputeTargetException as e:
    print("Compute not found: ", e)

Found existing Target, use it.


# Environment and config

In [3]:
import os
# Create a folder for the pipeline step files
experiment_folder = '../../git_repos/Tabular-Data-Synthesis/src'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

../../git_repos/Tabular-Data-Synthesis/src


In [4]:
from azureml.core import  Environment

# Create a Python environment for the experiment (from a .yml file)
env = Environment.from_conda_specification("tabsyn", "environment.yml")

# Register the environment 
env.register(workspace=ws)
registered_env = Environment.get(ws, 'tabsyn')

In [5]:
from azureml.core.runconfig import RunConfiguration
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = compute_target

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


# Define Pipeline

input_args = [  "--config_path",
                "tabular_synthesis/data/config/adult.json",
                "--learn_sigma",
                "True",
                "--iterations",
                "5000",
                "--anneal_lr",
                "True",
                "--batch_size",
                "64",
                "--lr",
                "3e-4",
                "--save_interval",
                "10000",
                "--weight_decay",
                "0.05",
                "--classifier_attention_resolutions",
                "32,16,8",
                "--classifier_depth",
                "2",
                "--classifier_width",
                "64",
                "--classifier_pool",
                "attention",
                "--classifier_resblock_updown",
                "True",
                "--classifier_use_scale_shift_norm",
                "True",
                "--log_interval",
                "25",
                "--eval_interval",
                "50"
            ]

In [6]:
input_args = [  "--config_path",
                "tabular_synthesis/data/config/adult.json",
                "--learn_sigma",
                "True",
                "--iterations",
                "5000",
                "--anneal_lr",
                "True",
                "--batch_size",
                "64",
                "--lr",
                "2e-3",
                "--save_interval",
                "10000",
                "--weight_decay",
                "0.01",
                "--classifier_attention_resolutions",
                "32,16,8",
                "--classifier_depth",
                "4",
                "--classifier_width",
                "64",
                "--classifier_pool",
                "attention",
                "--classifier_resblock_updown",
                "True",
                "--classifier_use_scale_shift_norm",
                "True",
                "--log_interval",
                "10",
                "--eval_interval",
                "25",
                "--noised",
                "True",
                "--classifier_use_fp16",
                "True"
            ]

# Create Pipeline for classifier train

In [12]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
import tempfile
import os

# Get the training dataset
adult_ds = ws.datasets.get("adult_train")


# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
output = OutputFileDatasetConfig("output")

# Step 1, Run the data prep script
classifier_train = PythonScriptStep(name = "classifier_train",
                                source_directory = experiment_folder,
                                script_name = "tabular_synthesis/classifier_train_azure.py",
                                arguments = ["--dataset_path", adult_ds.as_download(),
                                             '--output_path', output] + input_args,
                                compute_target = compute_target,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


In [13]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [classifier_train]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'classifier_train_test0')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step classifier_train [e759f249][7f360fad-0690-4073-9138-d823fdafb89c], (This step will run and generate new outputs)
Submitted PipelineRun 8ab5a55e-f1c7-4dbb-b7ab-1260c06d93a0
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/8ab5a55e-f1c7-4dbb-b7ab-1260c06d93a0?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 8ab5a55e-f1c7-4dbb-b7ab-1260c06d93a0
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/8ab5a55e-f1c7-4dbb-b7ab-1260c06d93a0?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
PipelineRun Status: Running


StepRunId: f7c71f64-29fa-4561-b926-9b4ff2af9a45
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/f7c71f64-29fa-4561-b926-9b4ff2af9a45?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
StepRun( classifier_train ) Status: Running


# Pipeline for diffusion train

In [14]:
model_args =[
    "--learn_sigma",
    "True",
    "--class_cond",
    "True",
    "--num_channels",
    "128",
    "--num_res_blocks",
    "3",
    ]

training_args = [
    "--iterations",
    "10",
    "--save_interval",
    "200000",
    "--log_interval",
    "5",
    "--diffusion_steps",
    "2000",
    "--noise_schedule",
    "linear",
    "--lr",
    "1e-4",
    "--weight_decay",
    "0.01",
    "--batch_size",
    "32",
    "--use_fp16",
    "True"
]


location_args = [  
    "--config_path",
    "tabular_synthesis/data/config/adult.json",
]

input_args = location_args + model_args + training_args

In [25]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
import tempfile
import os

# Get the training dataset
adult_ds = ws.datasets.get("adult_train")


# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
output = OutputFileDatasetConfig("output")



train_args =  ["--dataset_path", adult_ds.as_download(), '--output_path', output] + input_args

# Step 1, Run the data prep script
image_train = PythonScriptStep(name = "image_training",
                                source_directory = experiment_folder,
                                script_name = "tabular_synthesis/image_train_azure.py",
                                arguments = train_args,
                                compute_target = compute_target,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

output2=OutputFileDatasetConfig("output2")
sample_process_args = [
                "--diffusion_steps",
                "50",
                "--noise_schedule",
                "linear",
                "--num_samples",
                "-1",
                "--batch_size",
                "32",
]
sample_args = ["--dataset_path", adult_ds.as_download(), "--model_path", output.as_input(), "--output_path", output2] + model_args + sample_process_args + location_args

image_sample = PythonScriptStep(name = "image_sampling",
                                source_directory = experiment_folder,
                                script_name = "tabular_synthesis/image_sample_azure.py",
                                arguments = sample_args,
                                compute_target = compute_target,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

final_output=OutputFileDatasetConfig("final_output")

eval_args = ["--real_dataset_path", adult_ds.as_download(),"--synthetic_dataset", output2.as_input(), "--output_path", final_output] + location_args


quick_eval = PythonScriptStep(name = "dataset_evaluation",
                                source_directory = experiment_folder,
                                script_name = "tabular_synthesis/quick_evaluation_azure.py",
                                arguments = eval_args,
                                compute_target = compute_target,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

In [26]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [image_train, image_sample, quick_eval]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print(pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'image_train_sample_eval')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

[<azureml.pipeline.steps.python_script_step.PythonScriptStep object at 0x7fb0100d62e0>, <azureml.pipeline.steps.python_script_step.PythonScriptStep object at 0x7fb01007e340>, <azureml.pipeline.steps.python_script_step.PythonScriptStep object at 0x7fb010081520>]
Pipeline is built.
Created step image_training [207da231][7f849c4e-8e9c-4199-9aae-429997a78369], (This step will run and generate new outputs)Created step image_sampling [8bbf37ae][3222f0ad-c891-4fcc-a785-83201ec8cdb9], (This step will run and generate new outputs)

Created step dataset_evaluation [3b5ad73e][1e09094f-6d38-4f88-a213-f0be530ad2f8], (This step will run and generate new outputs)
Submitted PipelineRun ece6daa9-614c-4687-9c0d-e36081bdbf02
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ece6daa9-614c-4687-9c0d-e36081bdbf02?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
Pipeline submitt

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: ece6daa9-614c-4687-9c0d-e36081bdbf02
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ece6daa9-614c-4687-9c0d-e36081bdbf02?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 22a30df6-fa75-4b27-b275-c3b46a2cac27
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/22a30df6-fa75-4b27-b275-c3b46a2cac27?wsid=/subscriptions/49641ae7-6237-4363-b149-e721ac81137a/resourcegroups/rg-tabular_synthesis/workspaces/ml-tabular_synthesis&tid=84c31ca0-ac3b-4eae-ad11-519d80233e6f
StepRun( image_training ) Status: Queued
StepRun( image_training ) Status: Running

StepRun(image_training) Execution Summary
StepRun( image_training ) Status: Finished
{'runId': '22a30df6-fa75-4b27-b275-c3b46a2cac27', 'target': 'gpu-nc6-promo', 'status': 'Completed', 'startTimeUtc': '2022-10-27T11:04: