# 일괄 처리 유추 서비스 만들기

병원에서 하루 종일 환자의 상태를 측정하여 각 환자의 세부 정보를 개별 파일에 저장한다고 가정해 보겠습니다. 그리고 야간에는 당뇨병 예측 모델을 사용해 하루 동안 수집한 환자 데이터를 일괄 처리하여 예측을 생성할 수 있습니다. 그러면 당뇨 의심 대상으로 예측된 환자에 대해 다음날 후속 조치를 취할 수 있습니다. Azure Machine Learning을 사용하면 *일괄 처리 유추 파이프라인*을 만들어 이러한 과정을 진행할 수 있습니다. 이 연습에서는 일괄 처리 유추 파이프라인을 구현합니다.

## 작업 영역에 연결

이 Notebook의 작업을 시작하려면 먼저 작업 영역에 연결합니다.

> **참고**: Azure 구독에 인증된 세션을 아직 설정하지 않은 경우에는 링크를 클릭하고 인증 코드를 입력한 다음 Azure에 로그인하여 인증하라는 메시지가 표시됩니다.

In [None]:
import azureml.core
from azureml.core import Workspace

# 저장된 구성 파일에서 작업 영역 로드
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 모델 학습 및 등록

이제 일괄 처리 유추 파이프라인에서 배포할 모델의 학습과 등록을 진행하겠습니다.

In [None]:
from azureml.core import Experiment
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

# 작업 영역에서 Azure ML 실험 만들기
experiment = Experiment(workspace=ws, name='mslearn-train-diabetes')
run = experiment.start_logging()
print("Starting experiment:", experiment.name)

# 당뇨병 데이터 세트 로드
print("Loading Data...")
diabetes = pd.read_csv('data/diabetes.csv')

# 기능 및 레이블 분리
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# 데이터를 학습 세트와 테스트 세트로 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# 의사 결정 트리 모델 학습 진행
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# 정확도 계산
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# AUC 계산
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# 학습된 모델 저장
model_file = 'diabetes_model.pkl'
joblib.dump(value=model, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

# 실행 완료
run.complete()

# 모델 등록
run.register_model(model_path='outputs/diabetes_model.pkl', model_name='diabetes_model',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

## 일괄 처리 데이터 생성 및 업로드

이 연습에 사용할 새 데이터를 가져올 수 있는 환자들을 진료하는 정식 병원이 실제로는 없으므로, 여기서는 당뇨병 CSV 파일에서 무작위 샘플을 생성하여 Azure Machine Learning 작업 영역의 데이터 저장소에 해당 데이터를 업로드한 다음 이 데이터용 데이터 세트를 등록합니다.

In [None]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# 기본 데이터 저장소 설정
ws.set_default_datastore('workspaceblobstore')
default_ds = ws.get_default_datastore()

# 모든 데이터 저장소를 열거하고 기본 데이터 저장소 표시
for ds_name in ws.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)

# 당뇨병 데이터 로드
diabetes = pd.read_csv('data/diabetes2.csv')
# 항목 100개가 포함된 기능 열 샘플 가져오기(당뇨병 레이블은 제외)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# 폴더 만들기
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# 각 샘플을 개별 파일로 저장
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# 기본 데이터 저장소에 파일 업로드
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# 입력 데이터용 데이터 세트 등록
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='batch-data',
                                             description='batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

## 컴퓨팅 컨텍스트 만들기

이 연습의 작업을 진행하려면 파이프라인용 컴퓨팅 컨텍스트가 필요합니다. 여기서는 다음 코드를 사용하여 Azure Machine Learning 컴퓨팅 클러스터를 지정합니다. 이 클러스터는 아직 없으면 자동으로 생성됩니다.

> **중요**: 컴퓨팅 클러스터를 실행하기 전에 아래 코드에서 *your-compute-cluster*를 컴퓨팅 클러스터의 이름으로 변경하세요! 클러스터 이름은 2~16자 사이의 전역으로 고유한 이름이어야 합니다. 유효한 문자는 영문자, 숫자 및 문자입니다.

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **참고**: 컴퓨팅 인스턴스와 클러스터는 표준 Azure 가상 머신 이미지를 기반으로 합니다. 이 연습에서는 비용과 성능 간 최적의 균형을 달성하기 위해 *Standard_DS11_v2* 이미지를 사용하는 것이 좋습니다. 구독의 할당량이 적어 이 이미지를 포함할 수 없는 경우 대체 이미지를 선택할 수 있습니다. 그러나 큰 이미지는 높은 비용을 야기할 수 있고 작은 이미지는 작업을 완료하는 데 충분하지 않을 수 있으므로 신중히 선택하는 것이 좋습니다. Azure 관리자에게 요청하여 할당량을 늘릴 수도 있습니다.

## 일괄 처리 유추용 파이프라인 만들기

이제 일괄 처리 유추에 사용할 파이프라인을 정의할 수 있습니다. 이 파이프라인에는 일괄 처리 유추를 수행할 Python 코드가 필요합니다. 그러므로 파이프라인에 사용되는 모든 파일을 저장할 수 있는 폴더를 만들겠습니다.

In [None]:
import os
# 실험 파일용 폴더 만들기
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

이제 실제 작업을 수행하는 Python 스크립트를 만들어 파이프라인 폴더에 저장합니다.

In [None]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

파이프라인에는 파이프라인을 실행할 환경이 필요하므로 코드에서 사용하는 패키지를 포함하는 Conda 사양 파일을 만들겠습니다.

In [None]:
%%writefile $experiment_folder/batch_environment.yml
name: batch_environment
dependencies:
- python=3.6.2
- scikit-learn
- pip
- pip:
  - azureml-defaults

다음으로는 Conda 환경을 포함하는 실행 컨텍스트를 정의하겠습니다.

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# 실험용 환경 만들기
batch_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

파이프라인을 사용하여 일괄 처리 예측 스크립트를 실행하고, 입력 데이터에서 예측을 생성한 다음 결과를 출력 폴더에 텍스트 파일로 저장하겠습니다. 이러한 과정을 진행하려면 **ParallelRunStep**을 사용할 수 있습니다. 그러면 일괄 처리 데이터를 병렬로 처리할 수 있으며, 결과가 *parallel_run_step.txt* 출력 파일 하나에 정렬됩니다.

In [None]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

이제 단계를 파이프라인에 포함한 후 파이프라인을 실행합니다.

> **참고**: 파이프라인을 실행하려면 시간이 다소 걸릴 수 있습니다.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'mslearn-diabetes-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

파이프라인 실행이 완료되면 파이프라인의 첫 단계이자 유일한 단계와 연결된 실험의 출력에 결과로 생성된 예측이 저장됩니다. 저장된 예측은 다음과 같이 검색할 수 있습니다.

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# 출력 형식 정리
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# 처음 20개 결과 표시
df.head(20)

## 파이프라인 게시 및 해당 REST 인터페이스 사용

정상 작동하는 일괄 처리 유추용 파이프라인이 작성되었으므로 해당 파이프라인을 게시한 후 REST 엔드포인트를 사용하여 애플리케이션에서 파이프라인을 실행할 수 있습니다.

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='diabetes-batch-pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

게시된 파이프라인에는 엔드포인트가 있습니다. Azure Portal에서 이 엔드포인트를 확인할 수 있습니다. 게시된 파이프라인 개체의 속성으로 엔드포인트를 확인할 수도 있습니다.

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

엔드포인트를 사용하려면 클라이언트 애플리케이션이 HTTP를 통해 REST 호출을 수행해야 합니다. 이 요청은 인증해야 하므로 인증 헤더가 필요합니다. 여기서는 이 인증 과정을 테스트하기 위해 현재 Azure 작업 영역에 설정되어 있는 연결의 인증 헤더를 사용합니다. 다음 코드를 사용하면 해당 인증 헤더를 가져올 수 있습니다.

> **참고**: 실제 애플리케이션에는 인증에 사용할 서비스 주체가 필요합니다.

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

이제 REST 인터페이스를 호출할 수 있습니다. 파이프라인은 비동기식으로 실행되므로 식별자를 다시 가져와야 합니다. 이 식별자는 실행 중인 파이프라인 실험을 추적하는 데 사용할 수 있습니다.

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "mslearn-diabetes-batch"})
run_id = response.json()["Id"]
run_id

실행 ID를 가져온 후에는 **RunDetails** 위젯을 사용하여 실행 중인 실험을 확인할 수 있습니다.

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments['mslearn-diabetes-batch'], run_id)

# Block until the run completes
published_pipeline_run.wait_for_completion(show_output=True)

파이프라인 실행이 완료될 때까지 기다린 후 다음 셀을 실행하여 결과를 확인합니다.

앞에서와 마찬가지로 결과는 첫 번째 파이프라인 단계의 출력에 포함됩니다.

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# 출력 형식 정리
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# 처음 20개 결과 표시
df.head(20)

이제 일별 환자 데이터를 일괄 처리하는 데 사용할 수 있는 파이프라인이 완성되었습니다.

**추가 정보**: 일괄 처리 유추에 파이프라인을 사용하는 방법에 대한 자세한 내용은 Azure Machine Learning 설명서에서 [일괄 처리 예측을 실행하는 방법](https://docs.microsoft.com/azure/machine-learning/how-to-run-batch-predictions)을 참조하세요.