# Créer un pipeline

Vous pouvez accomplir les différentes étapes requises pour ingérer des données, effectuer l’apprentissage d’un modèle et inscrire le modèle individuellement à l’aide du Kit de développement logiciel (SDK) Azure ML SDK afin d’exécuter des expérimentations basées sur des scripts. Toutefois, dans un environnement d’entreprise, il est courant d’encapsuler la séquence des étapes requises pour générer une solution d’apprentissage automatique dans un *pipeline* exécutable sur une ou plusieurs cibles de calcul, soit à la demande d’un utilisateur, soit en vertu d’un processus de génération automatisé ou d’une planification.

Dans ce notebook, vous allez réunir tous ces éléments pour créer un pipeline simple qui prétraite les données, puis effectue l’apprentissage d’un modèle et inscrit celui-ci.

## Vous connecter à votre espace de travail

Pour commencer, connectez-vous à votre espace de travail.

> **Remarque** : si vous n’avez pas encore établi de session authentifiée avec votre abonnement Azure, vous serez invité à vous authentifier en cliquant sur un lien, en saisissant un code d’authentification et en vous connectant à Azure.

In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## Préparer les données

Dans votre pipeline, vous allez utiliser un jeu de données contenant des détails sur des patients atteints de diabète. Exécutez la cellule ci-dessous pour créer ce jeu de données (si vous l’avez créé précédemment, le code trouvera la version existante).

In [None]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    Dataset.File.upload_directory(src_dir='data',
                              target=DataPath(default_ds, 'diabetes-data/')
                              )

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## Créer des scripts pour des étapes de pipeline

Les pipelines se composent d’une ou plusieurs *étapes*, qui peuvent être des scripts Python, ou des étapes spécialisées comme une étape de transfert de données qui copie des données d’un emplacement vers un autre. Chaque étape peut s’exécuter dans son propre contexte de calcul. Dans cet exercice, vous allez créer un pipeline simple contenant deux étapes de script Python : l’une pour prétraiter certaines données d’apprentissage, et l’autre pour utiliser les données prétraitées afin d’effectuer l’apprentissage d’un modèle et d’inscrire celui-ci.

Commençons par créer un dossier pour les fichiers de script que nous allons utiliser dans les étapes du pipeline.

In [None]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

Créons maintenant le premier script qui lira les données du jeu de données sur le diabète, puis appliquera un prétraitement simple pour supprimer toutes les lignes dans lesquelles des données sont manquantes et normaliser les fonctionnalités numériques afin que celles-ci soient à une échelle similaire.

Le script inclut un argument nommé **--prepped-data**, qui fait référence au dossier dans lequel les données obtenues devraient être enregistrées.

In [None]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Vous pouvez maintenant créer le script pour la deuxième étape, ce qui aura pour effet d’effectuer l’apprentissage d’un modèle. Le script inclut un argument nommé **--training-data**, qui fait référence à l’emplacement où les données préparées ont été enregistrées à l’étape précédente.

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

## Préparer un environnement de calcul pour les étapes du pipeline

Dans cet exercice, vous allez utiliser le même calcul pour les deux étapes, mais il est important de comprendre que chaque étape est exécutée indépendamment. Vous pourriez donc spécifier des contextes de calcul différents pour chaque étape si nécessaire.

Commencez par récupérer la cible de calcul que vous avez créée dans un labo précédent (si elle n’existe pas, elle sera créée).

> **Important** : remplacez *your-compute-cluster* par le nom de votre cluster de calcul dans le code ci-dessous avant de l’exécuter. Les noms de cluster doivent être des noms globalement uniques d’une longueur comprise entre 2 et 16 caractères. Les caractères valides sont les lettres, les chiffres et le caractère « - ».

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **Remarque** : les clusters et instances de calcul sont basés sur des images de machines virtuelles Azure standard. Pour cet exercice, l’image *Standard_DS11_v2* est recommandée pour obtenir l’équilibre optimal entre coûts et performances. Si votre abonnement s’accompagne d’un quota qui ne couvre pas cette image, choisissez-en une autre. Gardez cependant à l’esprit qu’une image plus grande peut entraîner des coûts plus élevés, tandis qu’une plus petite risque de ne pas suffire pour effectuer les tâches. Vous pouvez également demander à votre administrateur Azure d’étendre votre quota.

Le calcul nécessitera un environnement Python avec les dépendances de package nécessaires installées.

In [None]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Maintenant que vous disposez d’un fichier de configuration Conda, vous pouvez créer un environnement et l’utiliser dans la configuration d’exécution pour le pipeline.

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## Créer et exécuter un pipeline

Vous êtes maintenant prêt à créer et exécuter un pipeline.

Vous devez commencer par définir les étapes du pipeline, ainsi que toutes les références de données qui doivent être transmises entre elles. Dans ce cas, la première étape doit écrire les données préparées dans un dossier qui peut être lu à partir de la deuxième étape. Étant donné que les étapes seront exécutées sur une instance de calcul distante (et pourraient être exécutées sur une instance de calcul différente), le chemin d’accès au dossier doit être transmis en tant que référence de données à un emplacement dans un magasin de données au sein de l’espace de travail. L’objet **OutputFileDatasetConfig** est un type spécial de référence de données utilisé pour des emplacements de stockage intermédiaires qui peuvent être transmis entre étapes du pipeline. Vous allez donc en créer un et l’utiliser comme sortie pour la première étape et entrée pour la deuxième. Notez que vous devez le passer en tant qu’argument de script afin que votre code puisse accéder à l’emplacement de magasin de données référencé par la référence de données.

In [None]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

OK, vous êtes prêt à créer le pipeline à partir des étapes que vous avez définies, et à l’exécuter à titre d’essai.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Une représentation graphique de l’essai du pipeline s’affiche dans le widget lors de son exécution. Gardez un œil sur l’indicateur de noyau en haut à droite de la page. Quand il passe de **&#9899;** à **&#9711;**, l’exécution du code est terminée. Vous pouvez également surveiller les exécutions du pipeline dans la page **Essais** dans [Azure Machine Learning Studio](https://ml.azure.com).

Une fois le pipeline terminé, vous pouvez examiner les métriques enregistrées par ses exécutions enfants.

In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

En supposant que le pipeline a réussi, un nouveau modèle devrait être inscrit avec l’étiquette *Contexte d’apprentissage* indiquant que son apprentissage a été effectué dans un pipeline. Exécutez le code suivant pour vérifier cela.

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## Publier le pipeline

Une fois que vous avez créé et testé un pipeline, vous pouvez le publier en tant que service REST.

In [None]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Notez que le pipeline publié possède un point de terminaison que vous pouvez voir dans la page **Points de terminaison** (sous l’onglet **Points de terminaison du pipeline**) dans [Azure Machine Learning studio](https://ml.azure.com). Vous pouvez également trouver son URI en tant que propriété de l’objet pipeline publié :

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

## Appeler le point de terminaison du pipeline

Pour utiliser le point de terminaison, les applications clientes doivent effectuer un appel REST sur HTTP. Cette demande devant être authentifiée, un en-tête d’autorisation est requis. Une application réelle nécessiterait un principal de service avec lequel s’authentifier mais, pour tester cela, nous allons utiliser l’en-tête d’autorisation de votre connexion actuelle à votre espace de travail Azure, que vous pouvez obtenir à l’aide du code suivant :

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Nous sommes maintenant prêts à appeler l’interface REST. Le pipeline s’exécutant de façon asynchrone, nous obtenons un identificateur en retour, que nous pouvons utiliser pour suivre l’essai de pipeline à mesure pendant son exécution :

In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

Étant donné que vous disposez de l’ID d’exécution, vous pouvez l’utiliser pour attendre la fin de l’exécution.

> **Remarque** : le pipeline devrait se terminer rapidement, car chaque étape a été configurée pour permettre la réutilisation de la sortie. Cela a été fait principalement pour des raisons pratiques et pour gagner du temps dans ce cours. En réalité, vous souhaiterez probablement que la première étape s’exécute chaque fois que les données changent, et ne déclenche les étapes suivantes que si la sortie de la première étape change.

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

## Planifier le pipeline

Supposons que la clinique pour patients atteints d’un diabète collecte de nouvelles données chaque semaine et les ajoute au jeu de données. Vous pouvez exécuter le pipeline chaque semaine pour ré-effectuer l’apprentissage du modèle avec les nouvelles données.

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Vous pouvez récupérer les planifications définies dans l’espace de travail comme suit :

In [None]:
schedules = Schedule.list(ws)
schedules

Vous pouvez vérifier la dernière exécution comme suit :

In [None]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

Il s’agit d’un exemple simple, conçu pour illustrer le principe. En réalité, vous pouvez créer une logique plus sophistiquée dans les étapes du pipeline, par exemple, en évaluant le modèle par rapport à des données de test afin de calculer une métrique de performance comme l’AUC ou la précision, en comparant la métrique à celle de versions précédemment inscrites du modèle et en inscrivant le nouveau modèle uniquement s’il fonctionne mieux.

Vous pouvez utiliser l’[extension Azure Machine Learning pour Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) pour combiner des pipelines Azure ML avec des pipelines Azure DevOps (le fait qu’ils portent le même nom porte à confusion), et intégrer le ré-apprentissage du modèle dans un *processus d’intégration continue et livraison continue (CI/CD)* Par exemple, vous pourriez utiliser un pipeline *génération* Azure DevOps pour déclencher un pipeline Azure ML qui effectue l’apprentissage d’un modèle et l’inscrit. Ensuite, une fois le modèle inscrit, il pourrait déclencher un pipeline *mise en production* Azure Devops qui déploie le modèle en tant que service web, ainsi que l’application ou le service qui utilisent le modèle.