# スクリプトをパイプラインジョブとして実行する

パイプラインを使用すると、複数のステップを1つのワークフローにまとめることができます。コンポーネントでパイプラインを構築できます。それぞれのコンポーネントは実行するPythonスクリプトを反映します。コンポーネントはYAMLファイルで定義され、スクリプトとその実行方法を指定します。

## 始める前に

このノートブックのコードを実行するには、最新バージョンの **azure-ai-ml** パッケージが必要です。以下のセルを実行して、インストールされていることを確認してください。

> 注**：
> もし **azure-ai-ml** パッケージがインストールされていない場合は、`pip install azure-ai-ml` を実行してインストールしてください。

In [1]:
pip show azure-ai-ml

Name: azure-ai-ml
Version: 1.21.1
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /anaconda/envs/azureml_py38/lib/python3.10/site-packages
Requires: azure-common, azure-core, azure-mgmt-core, azure-storage-blob, azure-storage-file-datalake, azure-storage-file-share, colorama, isodate, jsonschema, marshmallow, msrest, opencensus-ext-azure, opencensus-ext-logging, pydash, pyjwt, pyyaml, strictyaml, tqdm, typing-extensions
Required-by: 
Note: you may need to restart the kernel to use updated packages.


## ワークスペースに接続する
必要な SDK パッケージがインストールされ、ワークスペースに接続する準備が整いました。

ワークスペースに接続するには、サブスクリプションID、リソースグループ名、ワークスペース名といった識別子のパラメータが必要だ。Azure Machine Learningが管理するコンピュートインスタンスで作業しているので、デフォルト値を使用してワークスペースに接続できる。

In [2]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [3]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


## スクリプトの作成
つのステップでパイプラインを構築する：

1. **データを準備する**： 欠損データを修正し、データを正規化する。
2. **モデルを訓練する**： ロジスティック回帰モデルを訓練する。

次のセルを実行して、**src** フォルダーと2つのスクリプトを作成する。

In [4]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


データの前処理のpyファイル

In [5]:
%%writefile $script_folder/prep-data.py
# import libraries
import argparse
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler

def main(args):
    # read data
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    normalized_data = normalize_data(cleaned_data)

    output_df = normalized_data.to_csv((Path(args.output_data) / "diabetes.csv"), index = False)

# function that reads the data
def get_data(path):
    df = pd.read_csv(path,engine='python')

    # Count the rows and print the result
    row_count = (len(df))
    print('Preparing {} rows of data'.format(row_count))
    
    return df

# function that removes missing values
def clean_data(df):
    df = df.dropna()
    
    return df

# function that normalizes the data
def normalize_data(df):
    scaler = MinMaxScaler()
    num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
    df[num_cols] = scaler.fit_transform(df[num_cols])

    return df

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting src/prep-data.py


モデルの学習のpyファイル

In [6]:
%%writefile $script_folder/train-model.py
# import libraries
import mlflow
import glob
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

def main(args):
    # enable autologging
    mlflow.autolog()

    # read data
    df = get_data(args.training_data)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train_model(args.reg_rate, X_train, X_test, y_train, y_test)

    eval_model(model, X_test, y_test)

# function that reads the data
def get_data(data_path):

    all_files = glob.glob(data_path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    
    return df

# function that splits the data
def split_data(df):
    print("Splitting data...")
    X, y = df[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness',
    'SerumInsulin','BMI','DiabetesPedigree','Age']].values, df['Diabetic'].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

    return X_train, X_test, y_train, y_test

# function that trains the model
def train_model(reg_rate, X_train, X_test, y_train, y_test):
    mlflow.log_param("Regularization rate", reg_rate)
    print("Training model...")
    model = LogisticRegression(C=1/reg_rate, solver="liblinear").fit(X_train, y_train)

    mlflow.sklearn.save_model(model, args.model_output)

    return model

# function that evaluates the model
def eval_model(model, X_test, y_test):
    # calculate accuracy
    y_hat = model.predict(X_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))

    # plot ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
    fig = plt.figure(figsize=(6, 4))
    # Plot the diagonal 50% line
    plt.plot([0, 1], [0, 1], 'k--')
    # Plot the FPR and TPR achieved by our model
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.savefig("ROC-Curve.png") 

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--reg_rate", dest='reg_rate',
                        type=float, default=0.01)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


Overwriting src/train-model.py


## コンポーネントの定義

コンポーネントを定義するには、次のように指定する必要がある：

- **メタデータ**： *name*、*display name*、*version*、*description*、*type*など。メタデータはコンポーネントの説明と管理に役立ちます。
- **インターフェース**： **input**と**output**。例えば、モデル学習コンポーネントは、学習データと正則化率を入力として受け取り、学習済みモデルファイルを出力として生成します。
- **コマンド、コード、環境**：コンポーネントを実行するための*コマンド*、*コード*、*環境*。コマンドはコンポーネントを実行するシェルコマンドです。Code は通常ソースコードのディレクトリを指します。環境は、AzureML環境（キュレーションまたはカスタム作成）、dockerイメージ、またはconda環境です。

以下のセルを実行して、パイプラインステップとして実行したい各コンポーネントの YAML を作成します。

In [7]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prep-data.yml


In [8]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a logistic regression model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  reg_rate:
    type: number
    default: 0.01
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --reg_rate ${{inputs.reg_rate}} 
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


## Load the components

それぞれのコンポーネントを定義したので、YAMLファイルを参照してコンポーネントをロードできます。 

In [9]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_logistic_regression = load_component(source=parent_dir + "./train-model.yml")
#それぞれのymlファイルをロードする

## Build the pipeline

コンポーネントを作成してロードしたら、パイプラインを構築します。つのコンポーネントをパイプラインにまとめる。まず、`prep_data`コンポーネントを実行します。1つ目のコンポーネントの出力は2つ目のコンポーネント `train_logistic_regression` の入力になるはずです。

diabetes_classification`関数は、完全なパイプラインを表します。この関数は1つの入力変数を必要とする： pipeline_job_input`である。データアセットはセットアップ時に作成されます。登録されたデータアセットをパイプラインの入力として使用します。

In [10]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
#パイプラインのまとめをおこなう
@pipeline()
def diabetes_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_logistic_regression(training_data=clean_data.outputs.output_data)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = diabetes_classification(Input(type=AssetTypes.URI_FILE, path="azureml:diabetes-data:1"))#pathはデータのURIに切り替える

You can retrieve the configuration of the pipeline job by printing the `pipeline_job` object:

In [11]:
print(pipeline_job)

display_name: diabetes_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:diabetes-data:1
outputs:
  pipeline_job_transformed_data:
    type: uri_folder
  pipeline_job_trained_model:
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
      code: /mnt/batch/

You can change any parameter of the pipeline job configuration by referring to the parameter and specifying the new value:

In [12]:
# change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
# set pipeline level compute
pipeline_job.settings.default_compute = "aml-cluster"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

# print the pipeline job again to review the changes
print(pipeline_job)

display_name: diabetes_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:diabetes-data:1
outputs:
  pipeline_job_transformed_data:
    mode: upload
    type: uri_folder
  pipeline_job_trained_model:
    mode: upload
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-

## Submit the pipeline job

Finally, when you've built the pipeline and configured the pipeline job to run as required, you can submit the pipeline job:

In [13]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_diabetes"
)
pipeline_job

[32mUploading src (0.01 MBs): 100%|██████████| 7921/7921 [00:00<00:00, 253586.15it/s]
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.MLFlowModelJobOutput'> and will be ignored


Experiment,Name,Type,Status,Details Page
pipeline_diabetes,bright_egg_zygkqsdw60,pipeline,NotStarted,Link to Azure Machine Learning studio
