<a href="https://colab.research.google.com/github/ayoub-kplr/AI-Architecture-Cloud/blob/main/Azure/python%20Notebooks/08_Create_a_Pipeline.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Créer un pipeline

Vous pouvez effectuer les différentes étapes requises pour ingérer des données, former un modèle et inscrire le modèle individuellement en utilisant le SDK Azure ML pour exécuter des expériences basées sur des scripts. Cependant, dans un environnement d'entreprise, il est courant d'encapsuler la séquence d'étapes discrètes nécessaires pour créer une solution d'apprentissage automatique dans un * pipeline * qui peut être exécuté sur une ou plusieurs cibles de calcul ; soit à la demande d'un utilisateur, à partir d'un processus de construction automatisé, soit selon un calendrier.

Dans ce notebook, vous allez rassembler tous ces éléments pour créer un pipeline simple qui prétraite les données, puis forme et enregistre un modèle.


## Connectez-vous à votre espace de travail

Pour commencer, connectez-vous à votre espace de travail.

> **Remarque** : Si vous n'avez pas encore établi de session authentifiée avec votre abonnement Azure, vous serez invité à vous authentifier en cliquant sur un lien, en saisissant un code d'authentification et en vous connectant à Azure.


In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## Préparer les données

Dans votre pipeline, vous utiliserez un ensemble de données contenant des détails sur les patients diabétiques. Exécutez la cellule ci-dessous pour créer ce jeu de données (si vous l'avez créé précédemment, le code trouvera la version existante)


In [None]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    Dataset.File.upload_directory(src_dir='data',
                              target=DataPath(default_ds, 'diabetes-data/')
                              )

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## Créer des scripts pour les étapes du pipeline

Les pipelines consistent en une ou plusieurs *étapes*, qui peuvent être des scripts Python, ou des étapes spécialisées comme une étape de transfert de données qui copie les données d'un emplacement à un autre. Chaque étape peut s'exécuter dans son propre contexte de calcul. Dans cet exercice, vous allez créer un pipeline simple contenant deux étapes de script Python : une pour prétraiter certaines données d'entraînement et une autre pour utiliser les données prétraitées pour entraîner et enregistrer un modèle.

Commençons par créer un dossier pour les fichiers de script que nous utiliserons dans les étapes du pipeline.


In [None]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

Créons maintenant le premier script, qui lira les données de l'ensemble de données sur le diabète et appliquera un prétraitement simple pour supprimer toutes les lignes avec des données manquantes et normaliser les caractéristiques numériques afin qu'elles soient sur une échelle similaire.

Le script inclut un argument nommé **--prepped-data**, qui fait référence au dossier dans lequel les données résultantes doivent être enregistrées.


In [None]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Vous pouvez maintenant créer le script pour la deuxième étape, qui entraînera un modèle. Le script inclut un argument nommé **--training-data**, qui fait référence à l'emplacement où les données préparées ont été enregistrées à l'étape précédente.


In [None]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

## Préparer un environnement de calcul pour les étapes du pipeline

Dans cet exercice, vous utiliserez le même calcul pour les deux étapes, mais il est important de réaliser que chaque étape est exécutée indépendamment ; vous pouvez donc spécifier différents contextes de calcul pour chaque étape, le cas échéant.

Commencez par obtenir la cible de calcul que vous avez créée dans un atelier précédent (si elle n'existe pas, elle sera créée).

> **Important** : Remplacez *your-compute-cluster* par le nom de votre cluster de calcul dans le code ci-dessous avant de l'exécuter ! Les noms de cluster doivent être des noms globalement uniques de 2 à 16 caractères. Les caractères valides sont les lettres, les chiffres et le caractère -.


In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **Remarque** : Les instances de calcul et les clusters sont basés sur des images de machines virtuelles Azure standard. Pour cet exercice, l'image *Standard_DS11_v2* est recommandée pour atteindre l'équilibre optimal entre coût et performances. Si votre abonnement a un quota qui n'inclut pas cette image, choisissez une autre image ; mais gardez à l'esprit qu'une image plus grande peut entraîner un coût plus élevé et qu'une image plus petite peut ne pas être suffisante pour accomplir les tâches. Vous pouvez également demander à votre administrateur Azure d'étendre votre quota.

Le calcul nécessitera un environnement Python avec les dépendances de package nécessaires installées.


In [None]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Maintenant que vous disposez d'un fichier de configuration Conda, vous pouvez créer un environnement et l'utiliser dans la configuration d'exécution du pipeline.


In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## Créer et exécuter un pipeline

Vous êtes maintenant prêt à créer et à exécuter un pipeline.

Vous devez d'abord définir les étapes du pipeline et toutes les références de données qui doivent être transmises entre elles. Dans ce cas, la première étape doit écrire les données préparées dans un dossier qui peut être lu par la deuxième étape. Étant donné que les étapes seront exécutées sur un calcul distant (et en fait, chacune pourrait être exécutée sur un calcul différent), le chemin du dossier doit être transmis en tant que référence de données à un emplacement dans un magasin de données dans l'espace de travail. L'objet **OutputFileDatasetConfig** est un type spécial de référence de données qui est utilisé pour les emplacements de stockage provisoires qui peuvent être transmis entre les étapes du pipeline. Vous allez donc en créer un et l'utiliser comme sortie pour la première étape et comme entrée pour le deuxième étape. Notez que vous devez le transmettre en tant qu'argument de script afin que votre code puisse accéder à l'emplacement du magasin de données référencé par la référence de données.


In [None]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

OK, vous êtes prêt à créer le pipeline à partir des étapes que vous avez définies et à l'exécuter en tant qu'expérience.


In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Une représentation graphique de l'expérience de pipeline s'affichera dans le widget lors de son exécution. Gardez un œil sur l'indicateur du noyau en haut à droite de la page, lorsqu'il passe de **⚫** à **◯**, le code a fini de s'exécuter. Vous pouvez également surveiller les exécutions de pipeline sur la page **Experiments** dans [Azure Machine Learning studio](https://ml.azure.com).

Lorsque le pipeline est terminé, vous pouvez examiner les métriques enregistrées par ses exécutions enfants.


In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

En supposant que le pipeline a réussi, un nouveau modèle doit être enregistré avec une balise *Training context* indiquant qu'il a été formé dans un pipeline. Exécutez le code suivant pour vérifier cela.


In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## Publier le pipeline

Après avoir créé et testé un pipeline, vous pouvez le publier en tant que service REST.


In [None]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Notez que le pipeline publié a un point de terminaison, que vous pouvez voir sur la page **Points de terminaison** (sur l'onglet **Points de terminaison du pipeline**) dans [Azure Machine Learning studio](https://ml.azure.com) . Vous pouvez également trouver son URI en tant que propriété de l'objet de pipeline publié :


In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

## Appeler le point de terminaison du pipeline

Pour utiliser le point de terminaison, les applications clientes doivent effectuer un appel REST via HTTP. Cette demande doit être authentifiée, donc un en-tête d'autorisation est requis. Une application réelle nécessiterait un principal de service avec lequel s'authentifier, mais pour le tester, nous utiliserons l'en-tête d'autorisation de votre connexion actuelle à votre espace de travail Azure, que vous pouvez obtenir à l'aide du code suivant :


In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Nous sommes maintenant prêts à appeler l'interface REST. Le pipeline s'exécute de manière asynchrone, nous récupérons donc un identifiant, que nous pouvons utiliser pour suivre l'expérience du pipeline pendant son exécution :


In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

Puisque vous avez l'ID d'exécution, vous pouvez l'utiliser pour attendre la fin de l'exécution.

> **Remarque** : Le pipeline devrait se terminer rapidement, car chaque étape a été configurée pour permettre la réutilisation de la sortie. Cela a été fait principalement pour des raisons de commodité et pour gagner du temps dans ce cours. En réalité, vous voudriez probablement que la première étape s'exécute à chaque fois au cas où les données auraient changé, et déclencher les étapes suivantes uniquement si la sortie de la première étape change.


In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

## Planifier le pipeline

Supposons que la clinique pour les patients diabétiques collecte de nouvelles données chaque semaine et les ajoute à l'ensemble de données. Vous pouvez exécuter le pipeline chaque semaine pour recycler le modèle avec les nouvelles données.


In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Vous pouvez récupérer les plannings définis dans l'espace de travail comme ceci :


In [None]:
schedules = Schedule.list(ws)
schedules

Vous pouvez vérifier la dernière exécution comme ceci :


In [None]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

Ceci est un exemple simple, conçu pour démontrer le principe. En réalité, vous pouvez créer une logique plus sophistiquée dans les étapes du pipeline - par exemple, évaluer le modèle par rapport à certaines données de test pour calculer une métrique de performance comme l'AUC ou la précision, comparer la métrique à celle de toutes les versions précédemment enregistrées du modèle, et seulement enregistrer le nouveau modèle s'il fonctionne mieux.

Vous pouvez utiliser [l'extension Azure Machine Learning pour Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) pour combiner les pipelines Azure ML avec les pipelines Azure DevOps ( oui, c'est * déroutant * qu'ils portent le même nom !) et intégrez le recyclage du modèle dans un processus d' * intégration continue/déploiement continu (CI/CD) *. Par exemple, vous pouvez utiliser un pipeline Azure DevOps *build* pour déclencher un pipeline Azure ML qui entraîne et enregistre un modèle, et lorsque le modèle est enregistré, il peut déclencher un pipeline Azure Devops *release* qui déploie le modèle en tant que service Web, ainsi que l'application ou le service qui utilise le modèle.
