In [17]:
import os
try:
    os.chdir('Users/giosue.cotugno/mlops_titanic/notebooks/')
except:
    print('already moved')

already moved


# imports

In [18]:

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
import os
from azureml.core import Workspace,Experiment, Environment
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.compute_target import ComputeTargetException

# Defining utility function

In [19]:
def getOrCreateCompute(ws:Workspace):
    
    

    aml_compute_target = "testcot"
    try:
        aml_compute = AmlCompute(ws, aml_compute_target)
        print("found existing compute target.")
    except ComputeTargetException:
        print("creating new compute target")

        provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                    min_nodes = 0, 
                                                                    max_nodes = 4)    
        aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
        aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    finally:
        return aml_compute

In [20]:
def createRunConfig(ws):
    
    from azureml.core.runconfig import RunConfiguration
    from azure.ai.ml import MLClient
    from azure.identity import DefaultAzureCredential
    from azureml.core import Environment
    # create a new runconfig object
    run_config = RunConfiguration()
    env = Environment.get(workspace=ws, name='TITANIC', version='1')

    run_config.environment=env

    

    return run_config

# Build dev pipeline

In [21]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')


# Default datastore (Azure blob storage)
def_blob_store = ws.get_default_datastore()
#def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))
#Upload file to datastore

# Use a CSV to read in the data set.
file_name = "../data/rawdata/train.csv"

if not os.path.exists(file_name):
    raise Exception(
        'Could not find CSV dataset at "%s". '
        % file_name
    )  # NOQA: E50
# Upload file to default datastore in workspace

mlops-aml-ws
mlops-rg
westeurope
f90533aa-280d-40b9-9949-a7ba0ee9511f
Blobstore's name: workspaceblobstore


# Uploading data to blob storage

In [22]:
target_path = "training-data/"
def_blob_store.upload_files(
    files=[file_name],
    target_path=target_path,
    overwrite=True,
    show_progress=False,
)

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="training-data/train.csv")


aml_compute = getOrCreateCompute(ws)
run_config = createRunConfig(ws)
#processed_data1 = PipelineData("processed_data1",datastore=def_blob_store)
models_data = PipelineData("models_data",datastore=def_blob_store)

found existing compute target.


## Preprocess step

In [23]:
source_directory="../src/preprocess/"
preprocess_step = PythonScriptStep(
    
    name="Preprocessing Step",
    script_name="preprocess.py", 
    arguments=["--data", blob_input_data],
    inputs=[blob_input_data],
    compute_target=aml_compute, 
    source_directory=source_directory,
    runconfig=run_config,
    allow_reuse=True
)

In [24]:
from azureml.pipeline.core import Pipeline
Pipeline(ws, [preprocess_step])

<azureml.pipeline.core.pipeline.Pipeline at 0x7f473b6f6370>

## Training step

In [25]:
source_directory="../src/train/"
train_step = PythonScriptStep(
    name="Train Model Step",
    script_name="train.py",
    compute_target=aml_compute,
    source_directory=source_directory,
    outputs=[models_data],
    arguments=[
        "--model",
        models_data
    ],
    runconfig=run_config,
    allow_reuse=True,
)

## Validation step

In [26]:
source_directory="../src/evaluation/"
evaluate_step = PythonScriptStep(
    name="Evaluate Model Step",
    script_name="eval.py",
    compute_target=aml_compute,
    source_directory=source_directory,
    inputs=[models_data],
    arguments=[
        "--model_path",
        models_data,
    ],
    runconfig=run_config,
    allow_reuse=False,
)

## Registration step

In [27]:
source_directory="../src/register/"
register_step = PythonScriptStep(
    name="Registration Model Step",
    script_name="register.py",
    compute_target=aml_compute,
    source_directory=source_directory,
    inputs=[models_data],
    arguments=[
        "--model_path",
        models_data,
    ],
    runconfig=run_config,
    allow_reuse=False,
)

# Pipeline creation

In [28]:
from azureml.pipeline.core import Pipeline
train_step.run_after(preprocess_step)
evaluate_step.run_after(train_step)
register_step.run_after(evaluate_step)
steps = [preprocess_step,train_step, evaluate_step,register_step]

train_pipeline = Pipeline(workspace=ws, steps=steps)
#train_pipeline._set_experiment_name


In [29]:
train_pipeline.validate()
published_pipeline = train_pipeline.publish(
    name="preproc-train-register pipeline",
    description="Model training/retraining pipeline"
)
print(f"Published pipeline: {published_pipeline.name}")

Created step Preprocessing Step [e44e04d5][a499580b-0362-4649-9672-de20520987dc], (This step is eligible to reuse a previous run's output)
Created step Train Model Step [69e488a3][1f1baf0a-c474-4c27-a7b0-295d70cfb640], (This step is eligible to reuse a previous run's output)
Created step Evaluate Model  [e42cb0e4][64f34c78-b057-4ce3-924d-245e92720fc7], (This step will run and generate new outputs)
Created step Registration Model  [0e59775d][547e96e9-2d53-42ce-beb0-98f653f8c2ed], (This step will run and generate new outputs)
Using data reference test_data for StepId [7aaed69e][ee17b8e7-aad7-4670-a604-b0b5a122ca4f], (Consumers of this data are eligible to reuse prior runs.)
Published pipeline: preproc-train-register pipeline


In [30]:
#pipeline_run1 = Experiment(ws, 'titanic-pipeline').submit(train_pipeline)

In [31]:
#pipeline_run1.wait_for_completion(show_output=True)