# 建立管線

您可以用 Azure ML SDK 執行指令碼型的實驗，以便執行為了內嵌資料、訓練模型和個別註冊模型所需的各種步驟步驟。不過，在企業環境中，經常會將打造機器學習所需的一系列特定步驟封裝成 *管線*；管線可在一或多個計算目標上，視使用者需要、作為自動建置流程的一部分，或按照排程來執行。

在此筆記本中，您將整合上述所有元素，建立可預先處理資料，然後訓練並註冊模型的簡單管線。

## 連線到您的工作區

若要開始進行，請連線到您的工作區。

> **注意**：如果您尚未使用 Azure 訂用帳戶建立已驗證的工作階段，系統會提示您透過按一下連結、輸入驗證碼並登入 Azure 來進行驗證。

In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 準備資料

在您的管線中，您將使用包含糖尿病病患詳細資料的資料集。執行下方儲存格來建立此資料集 (如果您先前已建立了資料集，程式碼會尋找現有的版本)

In [None]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    Dataset.File.upload_directory(src_dir='data',
                              target=DataPath(default_ds, 'diabetes-data/')
                              )

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## 建立管線步驟的指令碼

管線由一或多個 *步驟* 組成；這可以是 Python 指令碼，也可以是將資料從一個位置複製到另一個位置的資料傳輸步驟等特定步驟。每個步驟可以在各自的計算內容下執行。在此練習中，您會打造包含兩個 Python 指令碼步驟的簡單管線：一個會預先處理部分的訓練資料，另一個則會使用預先處理過的資料，來訓練並註冊模型。

首先，讓我們建立資料夾，以存放要用在管線步驟中的指令碼檔案。

In [None]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

接著建立第一個指令碼，以便從糖尿病資料集讀取資料，並套用部分簡單的預先處理，以移除缺少資料的資料列，並將各項數值特徵標準化，使其規模相近。

這個指令碼包含名為 **--prepped-data** 的引數，會參考存放結果資料的資料夾。

In [None]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

現在您可建立第二步要用的指令碼，用以訓練模型。這個指令碼包含名為 **--training-data** 的引數，會參考上一步中備妥的資料所存放的位置。

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

# # 準備管線步驟的計算環境

在此練習中，您將針對這兩個步驟使用相同的計算方式，但一定要了解每個步驟都是獨立執行的，因此必要時可針對各步驟指定不同的計算內容。

首先，取得上一個實驗中所建立的計算目標 (若不存在，系統將會自動建立)。

> **重要**：在執行前，請先將 *your-compute-cluster* 的名稱，變更為程式碼中計算叢集的名稱！叢集名稱必須是長度介於 2 到 16 個字元，全域唯一的名稱。有效字元包括字母、數字及 - 字元。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **注意**：計算執行個體和叢集是以標準 Azure 虛擬機器映像為基礎。針對此練習，建議使用 *Standard_DS11_v2* 映像，以達到最佳的成本與效能平衡。如果您的訂用帳戶具有不包含此映像的配額，請選擇替代映像；但請記得，較大的映像可能會產生較高的成本，而較小的映像可能不足以完成工作。或者，請要求您的 Azure 系統管理員擴大您的配額。

計算會需要 Python 環境，且須安裝好必要的套件相依性。

In [None]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

現在有了 Conda 設定檔，您就可以建立環境，並用於管線的回合組態中。

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## 建立並執行管線

現在您可以開始建立並執行管線。

首先，您必須定義管線的步驟，以及任何需要在各步驟間傳遞的資料參考。在此案例中，第一步必須將備妥的資料寫入可在第二步讀取的資料夾。由於這些步驟將會在遠端計算上執行 (事實上，各步驟都可在不同的計算上執行)，因此，必須將資料夾路徑當作資料參考，傳遞到工作區內資料存放區中的位置。**OutputFileDatasetConfig** 物件是一種特殊的資料參考，可當作暫時儲存位置在管線步驟之間傳遞，因此您要建立一個這種物件，當作第一步的輸出，以及第二步的輸入。請注意，您必須將此物件當作指令碼的引數傳遞，讓您的程式碼可以存取資料參考所參考的資料存放區位置。

In [None]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

這樣，您就可以開始從定義好的步驟建立管線，並以實驗的形式執行管線。

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

在執行管線時，管線實驗的圖形表示會顯示在 widget 中。請留意頁面右上方的核心指標，當指標從 **&#9899;** 變成 **&#9711;** 時，就代表程式碼已完成執行。您也可以在 [Azure Machine Learning 工作室](https://ml.azure.com) 的 **實驗** 頁面中監視管線執行。

當管線完成時，您可以檢查管線子執行所記錄的計量。

In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

假設管線成功，則應使用 *訓練內容* 標籤來註冊新的模型，表示模型已在管線中經過訓練。請執行下列程式碼以驗證這點。

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## 發佈管線

建立並測試過管線後，您就可以將管線當作 REST 服務來發佈。

In [None]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

請注意，發佈的管線具有端點，可在 [Azure Machine Learning 工作室](https://ml.azure.com) 的 **端點** 頁面中看見 (位於 **管線端點** 索引標籤上)。您也可以在已發佈管線的屬性中，找到物件的 URI：

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

## 呼叫管線端點

若要使用此端點，用戶端應用程式會需要透過 HTTP 進行 REST 呼叫。這項要求必須經過驗證，所以需要有授權標頭。實際的應用程式會需要服務主體 (需要驗證服務主體)，但為了測試這點，我們會使用您 Azure 工作區目前連線的授權標頭，可利用下列程式碼來取得：

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

現在我們可以開始呼叫 REST 介面。管線會以非同步方式執行，所以我們會取得傳回的識別碼，並用來追蹤執行中的管線實驗：

In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

由於會有執行 ID，可用來確認執行已完成。

> **注意**：管線應該很快會完成，因為每個步驟都設為允許重複使用輸出。這主要是為了方便起見，以及在此課程中節省時間。實際上，您可能會希望每次都執行第一個步驟，以免資料發生變更，而只有在步驟一的輸出變更時，才觸發後續的步驟。

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

## 排程管線

假設治療糖尿病病患的診所會每週收集新資料，並新增到資料集內。您可以每週執行管線，以新的資料重新訓練模型。

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

您可以擷取在工作區中所定義的排程，如下所示：

In [None]:
schedules = Schedule.list(ws)
schedules

您可以檢查最新的執行，如下所示：

In [None]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

這是個簡單的範例，用來示範原則。實際上，您可以將更複雜的邏輯應用在管線步驟中，例如用部分測試資料來評估模型，以計算效能計量 (如 AUC 或正確性)，接著比較現有模型與模型先前註冊的版本，只有當新模型的計量顯示效能更佳時，才註冊新模型。

您可以使用 [適用於 Azure DevOps 的 Azure Machine Learning 延伸模組](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml)，將 Azure ML 管線與 Azure DevOps 管線合併 (沒錯，兩者的名稱相同，*的確* 容易令人混淆！)，並將模型訓練整合到 *持續整合/持續部署 (CI/CD)* 程序中。例如，您可以使用 Azure DevOps *建立* 管線，以觸發可訓練和註冊模型的 Azure ML 管線，而當模型註冊時，可觸發 Azure Devops *發行* 管線，將模型部署為 Web 服務，以及會使用模型的應用程式或服務。