In [4]:
from azure.ai.ml import load_component
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import MLClient, Input
from azure.identity import DefaultAzureCredential, EnvironmentCredential
from azure.ai.ml.entities import AmlCompute

import pandas as pd

Definimos el cluster donde correrá el pipeline

In [12]:
def get_comput_target(ml_client, name="new-cpu-cluster", family='Standard_E4s_v3'):
    cpu_compute_target = name
    
    try:
        # let's see if the compute target already exists
        cpu_cluster = ml_client.compute.get(cpu_compute_target)
    except Exception:
        cpu_cluster = AmlCompute(
            name=cpu_compute_target,
            type="amlcompute",
            size=family,
            min_instances=0,
            max_instances=4,
            idle_time_before_scale_down=180,
            tier="Dedicated",
        )
    
        cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster).result()

In [24]:
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


In [25]:
compute_target = get_comput_target(ml_client)

In [26]:
preprocess_component = load_component(source="./preprocess-ds-component/preprocess.yml")
split_component = load_component(source="./split-component/split.yml")
train_logistic_component = load_component(source="./train-logistic-component/train_logistic.yml")
eval_component = load_component(source="./eval-model-component/eval.yml")

In [27]:
@pipeline(
    default_compute='new-cpu-cluster',
)
def water_potability_pipeline(pipeline_input_data):
    preprocess_node = preprocess_component(
        dataset=pipeline_input_data,
        plot_style='dark'
    )

    split_node = split_component(
        dataset=preprocess_node.outputs.dataset_cleaned,
        test_size=0.2
    )

    train_node = train_logistic_component(
        X_train=split_node.outputs.X_train,
        y_train=split_node.outputs.y_train
    )
    
    eval_node = eval_component(
        model=train_node.outputs.logistic_model,
        X_test=split_node.outputs.X_test,
        y_test=split_node.outputs.y_test
    )

    # eval_node = eval_component(
    #     scoring_result=score_node.outputs.score_output
    # )

    return {
        "pair_plot_output": preprocess_node.outputs.pair_plot,
        "pair_plot_output": train_node.outputs.logistic_model,
        "report": eval_node.outputs.report
    }


# create a pipeline
water_potability_ds =  Input(
            type="uri_file",
            path="azureml://subscriptions/d4e39a00-586b-4eea-9d7a-5c200a16ba64/resourcegroups/pipeline/workspaces/project-2-pipeline/datastores/workspaceblobstore/paths/UI/2023-11-08_204801_UTC/water_potability_ds.csv",
        )

pipeline_job = water_potability_pipeline(pipeline_input_data=water_potability_ds)


In [28]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_water_pota"
)
pipeline_job

Experiment,Name,Type,Status,Details Page
pipeline_water_pota,polite_deer_9l15svhmxq,pipeline,Preparing,Link to Azure Machine Learning studio


In [29]:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)


RunId: polite_deer_9l15svhmxq
Web View: https://ml.azure.com/runs/polite_deer_9l15svhmxq?wsid=/subscriptions/d4e39a00-586b-4eea-9d7a-5c200a16ba64/resourcegroups/pipeline/workspaces/project-2-pipeline

Streaming logs/azureml/executionlogs.txt

[2023-11-08 21:32:47Z] Submitting 1 runs, first five are: 57f283db:02af4bed-9aed-4082-a7dc-fe01c20f52e6
[2023-11-08 21:33:53Z] Execution of experiment failed, update experiment status and cancel running nodes.

Execution Summary
RunId: polite_deer_9l15svhmxq
Web View: https://ml.azure.com/runs/polite_deer_9l15svhmxq?wsid=/subscriptions/d4e39a00-586b-4eea-9d7a-5c200a16ba64/resourcegroups/pipeline/workspaces/project-2-pipeline


JobException: Exception : 
 {
    "error": {
        "code": "UserError",
        "message": "Pipeline has failed child jobs. Failed nodes: /preprocess_node. For more details and logs, please go to the job detail page and check the child jobs.",
        "message_format": "Pipeline has failed child jobs. {0}",
        "message_parameters": {},
        "reference_code": "PipelineHasStepJobFailed",
        "details": []
    },
    "environment": "eastus",
    "location": "eastus",
    "time": "2023-11-08T21:33:53.788111Z",
    "component_name": ""
} 

In [None]:
# Download all the outputs of the job
output = ml_client.jobs.download(name=pipeline_job.name, download_path='./pipeline_output', all=True)