# Run scripts as a pipeline job

A pipeline allows you to group multiple steps into one workflow. You can build a pipeline with components. Each component reflects a Python script to run. A component is defined in a YAML file which specifies the script and how to run it. 

## Before you start

You'll need the latest version of the  **azureml-ai-ml** package to run the code in this notebook. Run the cell below to verify that it is installed.

> **Note**:
> If the **azure-ai-ml** package is not installed, run `pip install azure-ai-ml` to install it.

In [1]:
pip show azure-ai-ml

Name: azure-ai-ml
Version: 1.16.1
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /anaconda/envs/azureml_py38/lib/python3.8/site-packages
Requires: azure-storage-blob, azure-core, isodate, tqdm, colorama, msrest, azure-storage-file-datalake, opencensus-ext-logging, pydash, strictyaml, azure-storage-file-share, azure-mgmt-core, pyyaml, pyjwt, typing-extensions, opencensus-ext-azure, azure-common, marshmallow, jsonschema
Required-by: 
Note: you may need to restart the kernel to use updated packages.


## Connect to your workspace

With the required SDK packages installed, now you're ready to connect to your workspace.

To connect to a workspace, we need identifier parameters - a subscription ID, resource group name, and workspace name. Since you're working with a compute instance, managed by Azure Machine Learning, you can use the default values to connect to the workspace.

In [2]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [3]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


## Create the scripts

You'll build a pipeline with two steps:

1. **Prepare the data**: Fix missing data and normalize the data.
1. **Train the model**: Trains a decision tree classification model.

Run the following cells to create the **src** folder and the two scripts.

In [4]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


# Preparar los datos

In [5]:
%%writefile $script_folder/prep-data.py
# importar librerías
import argparse
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

def main(args):
    # leer datos
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    prepared_data = preprocess_data(cleaned_data)
    
    output_df = prepared_data.to_csv((Path(args.output_data) / "obesity.csv"), index=False)

# función que lee los datos
def get_data(path):
    df = pd.read_csv(path)

    # Contar las filas e imprimir el resultado
    row_count = len(df)
    print('Preparando {} filas de datos'.format(row_count))
    
    return df

# función que elimina valores nulos
def clean_data(df):
    df = df.dropna()
    return df

# función que preprocesa los datos
def preprocess_data(df):
    # Definir las columnas numéricas y categóricas
    numeric_features = ['Age', 'Height', 'Weight', 'FCVC', 'NCP', 'CH2O', 'FAF', 'TUE']
    categorical_features = ['Gender', 'family_history_with_overweight', 'FAVC', 'CAEC', 'SMOKE', 'SCC', 'CALC', 'MTRANS']

    # Definir los pasos del pipeline para las características numéricas
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', MinMaxScaler())])

    # Definir los pasos del pipeline para las características categóricas
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    # Combinar los pasos del pipeline
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)])

    # Aplicar el preprocesamiento al DataFrame
    X_prepared = preprocessor.fit_transform(df)
    
    # Obtener los nombres de las características transformadas para las columnas categóricas
    cat_encoder = preprocessor.named_transformers_['cat']['onehot']
    
    # Construir los nombres de las características categóricas transformadas manualmente
    cat_feature_names = []
    for cat_feature in cat_encoder.categories_:
        cat_feature_names.extend([f"{cat_feature}_{value}" for value in cat_feature])
    
    # Combinar los nombres de las características numéricas y categóricas
    all_feature_names = numeric_features + cat_feature_names

    # Convertir los datos transformados a un DataFrame con los nombres de las columnas adecuadas
    X_prepared_df = pd.DataFrame(X_prepared, columns=all_feature_names)
    
    # Añadir la columna de destino 'NObeyesdad' al DataFrame preprocesado
    X_prepared_df['NObeyesdad'] = df['NObeyesdad'].values
    
    return X_prepared_df


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data', type=str)
    parser.add_argument("--output_data", dest='output_data', type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# ejecutar script
if __name__ == "__main__":
    # añadir espacio en los registros
    print("\n\n")
    print("*" * 60)

    # analizar argumentos
    args = parse_args()

    # ejecutar función principal
    main(args)

    # añadir espacio en los registros
    print("*" * 60)
    print("\n\n")

Overwriting src/prep-data.py


##### train model

In [6]:
%%writefile $script_folder/train-model.py
# importar librerías

import mlflow
import mlflow.sklearn
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score
import glob
import matplotlib.pyplot as plt

def main(args):
    # enable autologging
    mlflow.autolog()

    # leer datos
    df = get_data(args.training_data)

    # dividir datos
    X_train, X_test, y_train, y_test = split_data(df)
 #entrenar modelo
    model = train_model(X_train, y_train)

    # evaluar modelo
    eval_model(model, X_test, y_test)


# función que lee los datos
def get_data(data_path):
    all_files = glob.glob(data_path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    return df

# función que divide los datos
def split_data(df):
    print("Dividiendo datos...")
    X = df.drop(columns=["NObeyesdad"])
    y = df["NObeyesdad"]
    
    # codificar variables categóricas
    X = pd.get_dummies(X)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    return X_train, X_test, y_train, y_test

# función que entrena el modelo
def train_model(X_train, y_train, ):
    print("Entrenando modelo...")
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    
    # Guardar el modelo en el path especificado
    mlflow.sklearn.save_model(model, args.model_output)
  

    return model

# función que evalúa el modelo
def eval_model(model, X_test, y_test):
    print("Evaluando modelo...")
    # calcular precisión
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    print('Precisión:', acc)
   
    
    # calcular AUC-ROC
    y_probs = model.predict_proba(X_test)
    auc = roc_auc_score(y_test, y_probs, multi_class='ovr')  # Para multiclase
    print('AUC-ROC:', auc)
    

    # Calcular F1-score
    f1 = f1_score(y_test, y_pred, average='macro')  # O 'micro' si prefieres
    print('F1-score:', f1)
    

    # Calcular precisión y recall
    precision = precision_score(y_test, y_pred, average='macro')  # O 'micro' si prefieres
    recall = recall_score(y_test, y_pred, average='macro')  # O 'micro' si prefieres
    print('Precisión:', precision)
    print('Recall:', recall)
  

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)
    # parse args
    args = parser.parse_args()

    # return args
    return args

# ejecutar el script
if __name__ == "__main__":
    # añadir espacio en los registros
    print("\n\n")
    print("*" * 60)

    # analizar argumentos
    args = parse_args()

    # ejecutar función principal
    main(args)

    # añadir espacio en los registros
    print("*" * 60)
    print("\n\n")


Overwriting src/train-model.py


## Define the components

To define the component you need to specify:

- **Metadata**: *name*, *display name*, *version*, *description*, *type* etc. The metadata helps to describe and manage the component.
- **Interface**: *inputs* and *outputs*. For example, a model training component will take training data and the regularization rate as input, and generate a trained model file as output. 
- **Command, code & environment**: the *command*, *code* and *environment* to run the component. Command is the shell command to execute the component. Code usually refers to a source code directory. Environment could be an AzureML environment (curated or custom created), docker image or conda environment.

Run the following cells to create a YAML for each component you want to run as a pipeline step.

##### prep-data.yml

In [7]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prep-data.yml


#### train-model.yml

In [8]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a decision tree classifier model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


## Load the components

Now that you have defined each component, you can load the components by referring to the YAML files. 

In [9]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_decision_tree = load_component(source=parent_dir + "./train-model.yml")

## Build the pipeline

After creating and loading the components, you can build the pipeline. You'll compose the two components into a pipeline. First, you'll want the `prep_data` component to run. The output of the first component should be the input of the second component `train_decision_tree`, which will train the model.

The `diabetes_classification` function represents the complete pipeline. The function expects one input variable: `pipeline_job_input`. A data asset was created during setup. You'll use the registered data asset as the pipeline input. 

In [10]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def obesity_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_decision_tree(training_data=clean_data.outputs.output_data)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = obesity_classification(Input(type=AssetTypes.URI_FILE, path="azureml:obesity-data:1"))

You can retrieve the configuration of the pipeline job by printing the `pipeline_job` object:

In [11]:
print(pipeline_job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


display_name: obesity_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:obesity-data:1
outputs:
  pipeline_job_transformed_data:
    type: uri_folder
  pipeline_job_trained_model:
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
      code: /mnt/batch/ta

You can change any parameter of the pipeline job configuration by referring to the parameter and specifying the new value:

In [12]:
# change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
# set pipeline level compute
pipeline_job.settings.default_compute = "aml-cluster"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

# print the pipeline job again to review the changes
print(pipeline_job)

display_name: obesity_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:obesity-data:1
outputs:
  pipeline_job_transformed_data:
    mode: upload
    type: uri_folder
  pipeline_job_trained_model:
    mode: upload
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cp

## Submit the pipeline job

Finally, when you've built the pipeline and configured the pipeline job to run as required, you can submit the pipeline job:

In [13]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_obesity"
)
pipeline_job

[32mUploading src (0.01 MBs): 100%|██████████| 9304/9304 [00:00<00:00, 201511.99it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
pipeline_obesity,witty_bulb_16b39s08zv,pipeline,NotStarted,Link to Azure Machine Learning studio
