# Ejecutar pipelines

In [1]:
pip show azure-ai-ml

Name: azure-ai-ml
Version: 1.27.0
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /anaconda/envs/azureml_py38/lib/python3.10/site-packages
Requires: azure-common, azure-core, azure-mgmt-core, azure-monitor-opentelemetry, azure-storage-blob, azure-storage-file-datalake, azure-storage-file-share, colorama, isodate, jsonschema, marshmallow, msrest, pydash, pyjwt, pyyaml, six, strictyaml, tqdm, typing-extensions
Required-by: 
Note: you may need to restart the kernel to use updated packages.


## Conectar a workspace

In [49]:
# conectar
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())

print(f"Conectado al Workspace: {ml_client.workspace_name}")

Found the config file in: ./config.json
Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed


Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


Conectado al Workspace: naturgyml


## Cargar los componentes

In [64]:
from azure.ai.ml import load_component

ing_prep_data = ml_client.components.get(name="ing_prep_data_etapa_5_ces", version=5)
train_model_etapa_5 = ml_client.components.get(name="train_rf_model_etapa_5_ces", version=10)
eval_model_etapa_5 = ml_client.components.get(name="eval_model_metrics_etapa_5_ces", version=2)
# register_model_predict_failure = ml_client.components.get(name="register_conditional_model", version=5)

## Build del pipeline

In [65]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline


@pipeline(name="Predict_failure_pipeline", description="Pipeline to train a failure predict model")
def predict_failure(input_data, equipo_input, horizon, n_estimators):
    clean_data = ing_prep_data(input_data=input_data, equipo_input=equipo_input)
    train_model = train_model_etapa_5(input_data=clean_data.outputs.output_data, horizon=horizon, n_estimators=n_estimators)
    eval_model = eval_model_etapa_5(input_data=train_model.outputs.output_data, model_input=train_model.outputs.model_output)
    # register_model = register_model_predict_failure(model_input=train_model.outputs.model_output, metrics_input=eval_model.outputs.metrics_output, model_name="predict_failure_rf_model")

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
        "pipeline_job_output_data": train_model.outputs.output_data,
        "pipeline_job_metrics_data": eval_model.outputs.metrics_output,
    }

In [66]:
# definir job de pipeline
pipeline_job = predict_failure(Input(type=AssetTypes.URI_FILE, path="azureml:etapa_5_data_asset_ces:1"), equipo_input=3, horizon=100, n_estimators=100)

# change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"

# set pipeline level compute
pipeline_job.settings.default_compute = "Carlos-Esteve"

# set pipeline level datastore
pipeline_job.settings.default_datastore = "etapa_5_data_assets_ces"
pipeline_job.settings.force_rerun = False

In [67]:
print(pipeline_job)

display_name: Predict_failure_pipeline
description: Pipeline to train a failure predict model
type: pipeline
inputs:
  input_data:
    type: uri_file
    path: azureml:etapa_5_data_asset_ces:1
  equipo_input: 3
  horizon: 100
  n_estimators: 100
outputs:
  pipeline_job_transformed_data:
    mode: upload
    type: uri_folder
  pipeline_job_trained_model:
    mode: upload
    type: mlflow_model
  pipeline_job_output_data:
    type: uri_folder
  pipeline_job_metrics_data:
    type: uri_file
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.input_data}}
      equipo_input:
        path: ${{parent.inputs.equipo_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    resources:
      instance_count: 1
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: ing_prep_data_etapa_5_ces
      version: '5'
      display_name: Ingesta y Preprocesamiento d

## Enviar el job de pipeline

In [68]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job,
    experiment_name="pipeline_predict_failure",
    display_name="pipeline_predict_failure_ces",
)

pipeline_job

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFileJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
pipeline_predict_failure,helpful_mango_jpxdvl5gff,pipeline,NotStarted,Link to Azure Machine Learning studio


## Programar el job

In [69]:
# from azure.ai.ml.entities import JobSchedule
# from azure.ai.ml.entities import RecurrenceTrigger
# from datetime import datetime


# schedule_name = "run_ev_five_min_predict_failure"

# recurrence_trigger = RecurrenceTrigger(
#     frequency="week",
#     interval=1,
#     schedule={
#         "week_days": ["Monday"],
#         "hours": [8],
#         "minutes": [0]
#     },
#     start_time=datetime.utcnow()
# )

# job_schedule = JobSchedule(
#     name=schedule_name,
#     trigger=recurrence_trigger,
#     create_job=pipeline_job
# )

# job_schedule = ml_client.schedules.begin_create_or_update(job_schedule).result()

## Publicar Modelo

In [70]:
# from azure.ai.ml.entities import Model
# from azure.ai.ml.constants import AssetTypes

# # Suponiendo que 'pipeline_job' es el resultado de la ejecución del pipeline
# job_name = pipeline_job.name

# run_model = Model(
#     path=f"azureml://jobs/{job_name}/outputs/pipeline_job_trained_model/",
#     name="predict_failure_rf_model",
#     description="Modelo Random Forest para predicción de tipo de mantenimiento.",
#     type=AssetTypes.MLFLOW_MODEL,
# )

# ml_client.models.create_or_update(run_model)