## Auteur : DIANI Mamoudou S

In [None]:
import os
from google.cloud import storage
from google.cloud import aiplatform

In [2]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'projet1-415505'

In [None]:
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

In [19]:
FRAMEWORK = 'tf'
TASK = 'prediction'
MODEL_TYPE = 'rnn'
SCRIPT_PATH = './code/train.py'
GCS_BUCKET=project[0]
EXPERIMENT = '02a'
SERIES = '02'
GCS_BUCKET = PROJECT_ID
DIR = f"temp/{EXPERIMENT}"
URI = f"gs://{GCS_BUCKET}/{SERIES}/{EXPERIMENT}"
REGION = 'northamerica-northeast1'
EXPERIMENT_NAME = f'experiment-{SERIES}-{EXPERIMENT}-{FRAMEWORK}-{TASK}-{MODEL_TYPE}'
BQ_PROJECT = project[0]
BQ_DATASET = 'EURUSD'
BQ_TABLE = 'EURUSD'
BQ_TABLE_TRAIN = 'EURUSD_Train_prepped'
BQ_TABLE_TEST = 'EURUSD_Test_prepped'
VAR_TARGET = 'val_y_train'
VAR_TARGET1 = 'val_y_test'
TRAIN_COMPUTE = 'n1-standard-4'
DEPLOY_COMPUTE = 'n1-standard-4'
TRAIN_IMAGE = 'us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-12.py310:latest'
DEPLOY_IMAGE ='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest' 
EPOCHS = 5
BATCH_SIZE =10

In [20]:
if not os.path.exists(DIR):
    os.makedirs(DIR)

In [21]:
gcs = storage.Client(project = PROJECT_ID)

In [22]:
aiplatform.init(project = project[0], location = 'northamerica-northeast1')

**************************

# Pipeline : Entraînement et mise en service continus
Créer un travail de pipeline sur Vertex AI Pipelines qui réalise le flux de travail complet ci-dessus :

1. Configuration initiale
2. Entraînement du modèle
3. Enregistrement du modèle
4. Classement du modèle
5. Validation du modèle (comparaison avec le modèle actuellement déployé)
6. Mise à jour du point de terminaison
7. Test du point de terminaison.


**************************

kfp ==> Kubeflow Pipelines (KFP) : Kubeflow est une plateforme open-source conçue pour faciliter le déploiement, la gestion et l'orchestration de workflows de machine learning (ML) sur Kubernetes

In [23]:
import kfp
from google_cloud_pipeline_components.types import artifact_types
from typing import NamedTuple

In [24]:
kfp.__version__

'2.7.0'

## Stockage du code d'entraînement dans GCS

Le fichier Python contenant le script d'entraînement doit être disponible localement lors de l'appel de la méthode from_local_script(). Toutefois, dans le cadre d'un job sur Vertex AI Pipelines, le fichier Python pourrait ne pas être accessible localement car le job s'exécute sur des ressources distantes. Pour résoudre ce problème, le fichier est stocké dans un emplacement spécifique sur Google Cloud Storage (GCS). Lors de l'exécution du job de pipeline, un système de fichiers FUSE (Filesystem in Userspace) est automatiquement mis en place pour monter le bucket GCS directement dans le conteneur du job à un emplacement déterminé.


In [25]:
bucket = gcs.lookup_bucket(GCS_BUCKET)

In [26]:
SCRIPT_PATH

'./code/train.py'

In [27]:
SCRIPT_NAME = SCRIPT_PATH.split('/')[-1]
SCRIPT_NAME

'train.py'

In [28]:
blob = bucket.blob(f'{SERIES}/{EXPERIMENT}/training/{SCRIPT_NAME}')

In [29]:
blob.upload_from_filename(SCRIPT_PATH)

***************************

## Component: experiment_setup
Définition d'un composant de pipeline Kubernetes pour la configuration d'expériences sur Vertex AI :

1. Initialise un environnement avec une image de base Python et installe les packages nécessaires pour interagir avec les services Google Cloud.
2. Définit une fonction pour configurer une expérience, qui prend plusieurs paramètres de projet, région, données BigQuery, etc.
3. Utilise le client BigQuery pour interroger des échantillons de données de test.
4. Configure un Tensorboard pour le suivi des expériences si disponible, sinon en crée un nouveau.
5. Initialise une expérience dans Vertex AI avec le Tensorboard configuré.
6. Prépare des arguments de commande pour l'exécution de modèles incluant des détails de l'expérience et des tables BigQuery.
7. Reformate et prépare un échantillon de données de test pour l'entrée de modèle.
8. Configure les URI des sources de données BigQuery pour un accès ultérieur.
9. Crée des URIs pour l'accès direct aux expériences et aux exécutions d'expériences sur l'interface web de Vertex AI.
10. Retourne les configurations, le nom de l'exécution, les détails du Tensorboard, et un échantillon de données pour l'utilisation ultérieure dans le pipeline.


@kfp.dsl.component(
    base_image = "python:3.9", 
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components", "google-cloud-bigquery", "pandas", "db-dtypes"]
)

cette section de code définit une composante Kubeflow qui utilise une image de base Python 3.9 et installe certains packages Python spécifiques dans le conteneur pour permettre l'exécution de tâches spécifiques dans le pipeline Kubeflow.

In [30]:
@kfp.dsl.component(
    base_image = "python:3.9", 
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components", "google-cloud-bigquery", "pandas", "db-dtypes"]
)
def experiment_setup(
    project: str,
    region: str,
    series: str,
    experiment: str,
    experiment_name: str,
    cmdargs: list,
    bq_project: str,
    bq_dataset: str,
    bq_table_train: str,
    bq_table_test: str,
    var_target: str,
    var_target1: str,
    bq_source: kfp.dsl.Output[artifact_types.BQTable]
) -> NamedTuple('outputs', [
    ('cmdargs', list), 
    ('run_name', str), 
    ('tensorboard', str), 
    ('timestamp', str),
    ('experiment_uri', str),
    ('experiment_run_uri', str),
    ('sample', list)]):
    
   
    from collections import namedtuple
    result = namedtuple('outputs', ['cmdargs', 'run_name', 'tensorboard', 'timestamp', 'experiment_uri', 'experiment_run_uri', 'sample'])
    
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    import numpy as np
    
    
    from google.cloud import bigquery
    bq = bigquery.Client(project = project)
    
    
    tb = aiplatform.Tensorboard.list(filter=f"labels.series={series}")
    if tb:
        tb = tb[0]
    else: 
        tb = aiplatform.Tensorboard.create(display_name = series, labels = {'series' : f'{series}'})
    
    
    aiplatform.init(experiment = experiment_name, experiment_tensorboard = tb.resource_name)
    
    
    from datetime import datetime
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    RUN_NAME = f'run-{TIMESTAMP}'
    CMDARGS = cmdargs + [
        "--project_id=" + project,
        "--region=" + region,
        "--series=" + series,
        "--experiment=" + experiment,
        "--experiment_name=" + experiment_name,
        "--run_name=" + RUN_NAME,
        "--bq_project=" + bq_project,
        "--bq_dataset=" + bq_dataset,
        "--bq_table_train=" + bq_table_train,
        "--bq_table_test=" + bq_table_test,
        "--var_target=" + var_target,
        "--var_target1=" + var_target1,
    ]
    
    
    samples = bq.query(query = f"""
        SELECT * 
        FROM `{bq_project}.{bq_dataset}.{bq_table_test}`
        LIMIT 1
    """).to_dataframe()
    samples = samples.iloc[:, :-1]
    samples = np.array(samples)
    samples = np.reshape(samples, (samples.shape[0], samples.shape[1], 1))
    samples = samples.tolist()
    
    
    bq_source.uri1 = f'https://www.googleapis.com/bigquery/v2/projects/{bq_project}/datasets/{bq_dataset}/tables/{bq_table_train}'
    bq_source.uri2 = f'https://www.googleapis.com/bigquery/v2/projects/{bq_project}/datasets/{bq_dataset}/tables/{bq_table_test}'
    bq_source.metadata['projectId'] = bq_project
    bq_source.metadata['datasetId'] = bq_dataset
    bq_source.metadata['tableTrainId'] = bq_table_train
    bq_source.metadata['tableTestId'] = bq_table_test
    
    
    exp_uri = f'https://console.cloud.google.com/vertex-ai/locations/{region}/experiments/{experiment_name}?project={project}'
    exp_run_uri = f'https://console.cloud.google.com/vertex-ai/locations/{region}/experiments/{experiment_name}/runs/{experiment_name}-{RUN_NAME}?project={project}'

    
    return result(CMDARGS, RUN_NAME, tb.resource_name, TIMESTAMP, exp_uri, exp_run_uri, samples[0])

## Component: train_from_local_script

Définition d'un composant de pipeline Kubernetes pour l'entraînement de modèles à partir de scripts locaux dans Vertex AI :

1. Initialise un environnement avec une image de base Python et installe des packages nécessaires pour interagir avec Vertex AI et ses composants de pipeline.
2. Définit une fonction pour configurer et exécuter un job d'entraînement personnalisé, qui utilise un script local pour l'entraînement.
3. Utilise la fonction from_local_script de l'API Vertex AI pour créer un job personnalisé en spécifiant des paramètres comme le chemin du script, l'image du conteneur, les arguments du script, et la configuration du matériel.
4. Configure des détails additionnels tels que les répertoires de sortie et les étiquettes pour l'organisation et le suivi.
5. Exécute le job d'entraînement en spécifiant le compte de service et le Tensorboard associé pour le suivi des métriques.
6. Génère des artefacts de sortie qui contiennent des informations sur l'état du job et les ressources GCP utilisées, accessibles via des URI dédiés pour la surveillance dans la console Google Cloud.
7. Retourne l'état du job et une représentation JSON des ressources GCP utilisées pour une intégration facile avec d'autres services et outils Google Cloud.


*********

* replica_count: C'est le nombre de réplicas (instances) à utiliser pour le job d'entraînement.
* accelerator_count: C'est le nombre d'accélérateurs (par exemple, GPU) à attacher à chaque instance de la réplica pour accélérer le processus d'entraînement.
* base_output_dir: C'est le répertoire de sortie de base où les artefacts de formation (modèles, checkpoints, etc.) seront sauvegardés dans Google Cloud Storage (GCS).
* staging_bucket: C'est le bucket GCS utilisé pour le stockage temporaire des artefacts de formation pendant l'exécution du job d'entraînement.
* labels: Ce sont des étiquettes (labels) facultatives qui peuvent être attachées au job d'entraînement pour organiser et catégoriser les exécutions. 

****************

* Lors de l'exécution d'un composant, il est courant de voir non seulement le lien vers la tâche de composant lancée, mais également le lien vers les ressources cloud sous-jacentes, telles que les tâches de prédiction par lot Vertex ou les tâches Dataflow. Le proto gcp_resource est un paramètre spécial que vous pouvez utiliser dans votre composant pour permettre à Google Cloud Console d'offrir une vue personnalisée des journaux et de l'état de la ressource dans la console Vertex AI Pipelines.

In [31]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)

def train_from_local_script(
    project: str,
    region: str,
    series: str,
    experiment: str,
    experiment_name: str,
    run_name: str,
    cmdargs: list,
    script: str,
    train_image: str,
    requirements: list,
    train_compute: str,
    timestamp: str,
    bucket: str,
    service_account: str,
    tensorboard: str,
    job_resources: kfp.dsl.Output[kfp.dsl.Artifact]
) -> NamedTuple('outputs', [('state', str), ('gcp_resources', str)]):
    
    from collections import namedtuple
    result = namedtuple('outputs', ['state', 'gcp_resources'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    customJob = aiplatform.CustomJob.from_local_script(
        display_name = f'{series}_{experiment}_{timestamp}',
        script_path = f'/gcs/{bucket}/{series}/{experiment}/training/{script}',
        container_uri = train_image,
        args = cmdargs,
        requirements = requirements,
        replica_count = 1,
        machine_type = train_compute,
        accelerator_count = 0,
        base_output_dir = f"gs://{bucket}/{series}/{experiment}/models/{timestamp}",
        staging_bucket = f"gs://{bucket}/{series}/{experiment}/models/{timestamp}",
        labels = {'series' : f'{series}', 'experiment' : f'{experiment}', 'experiment_name' : f'{experiment_name}', 'run_name' : f'{run_name}'}
    )
    
    customJob.run(
        service_account = service_account,
        tensorboard = tensorboard
    )
    
    job_resources.uri = f"https://console.cloud.google.com/vertex-ai/locations/{region}/training/{customJob.resource_name.split('/')[-1]}/cpu"
    job_resources.metadata = dict(state = customJob.state.name, resource_name = customJob.resource_name, model = f"gs://{bucket}/{series}/{experiment}/models/{timestamp}/model")
    
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
    from google.protobuf.json_format import MessageToJson
    customJob_resources = GcpResources()
    cr = customJob_resources.resources.add()
    cr.resource_type = 'CustomJob'
    cr.resource_uri = f'https://{region}-aiplatform.googleapis.com/v1/{customJob.resource_name}'
    gcp_resources = MessageToJson(customJob_resources)
    
    return result(customJob.state.name, gcp_resources)    

## Component: register_model

Définition d'un composant de pipeline Kubernetes pour enregistrer un modèle d'apprentissage machine dans Vertex AI :

1. Initialise un environnement avec une image de base Python et installe les packages nécessaires pour interagir avec Vertex AI.
2. Définit une fonction pour enregistrer un modèle dans le registre de modèles de Vertex AI, utilisant divers paramètres de configuration comme le projet, la région, et le nom de l'expérience.
3. Vérifie si le modèle spécifié existe déjà et si la version courante est déjà chargée ; si non, le modèle est chargé comme nouvelle version par défaut ou en tant que nouveau modèle.
4. Utilise les métadonnées du job précédent pour configurer l'URI du modèle et les détails du conteneur de déploiement.
5. Crée un lien vers le Tensorboard associé pour le suivi des métriques d'entraînement.
6. Log les paramètres et les métadonnées du modèle dans l'exécution de l'expérience sur Vertex AI pour faciliter le suivi et l'audit.
7. Met à jour l'état de l'exécution de l'expérience pour marquer la complétude de l'enregistrement du modèle.
8. Configure et retourne un artefact représentant le modèle enregistré, incluant son URI et les métadonnées associées.
9. Retourne l'ID de la version du modèle pour une utilisation ultérieure dans le pipeline ou d'autres opérations.


In [32]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)

def register_model(
    project: str,
    region: str,
    series: str,
    experiment: str,
    experiment_name: str,
    run_name: str,
    deploy_image: str,
    job_resources: kfp.dsl.Input[kfp.dsl.Artifact],
    tensorboard: str,
    vertex_model: kfp.dsl.Output[artifact_types.VertexModel]
) -> NamedTuple('outputs', [('version_id', str)]):
    
    from collections import namedtuple
    result = namedtuple('outputs', ['version_id'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    modelmatch = aiplatform.Model.list(filter = f'display_name={series}_{experiment} AND labels.series={series} AND labels.experiment={experiment}')
    
    upload_model = True
    if modelmatch:
        print("Model Already in Registry:")
        if run_name in modelmatch[0].version_aliases:
            print("This version already loaded, no action taken.")
            upload_model = False
            model = aiplatform.Model(model_name = modelmatch[0].resource_name)
        else:
            print('Loading model as new default version.')
            parent_model = modelmatch[0].resource_name
    else:
        print('This is a new model, creating in model registry')
        parent_model = ''

    if upload_model:
        model = aiplatform.Model.upload(
            display_name = f'{series}_{experiment}',
            model_id = f'model_{series}_{experiment}',
            parent_model =  parent_model,
            serving_container_image_uri = deploy_image,
            artifact_uri = job_resources.metadata['model'], 
            is_default_version = True,
            version_aliases = [run_name],
            version_description = run_name,
            labels = {'series' : f'{series}', 'experiment' : f'{experiment}', 'experiment_name' : f'{experiment_name}', 'run_name' : f'{run_name}'}        
        )
    
    customJob = aiplatform.CustomJob.get(resource_name = job_resources.metadata['resource_name'])
    board_link = f"https://{region}.tensorboard.googleusercontent.com/experiment/{tensorboard.replace('/', '+')}+experiments+{customJob.resource_name.split('/')[-1]}"
    
    expRun = aiplatform.ExperimentRun(run_name = run_name, experiment = experiment_name)
    expRun.log_params({
        'model.uri': model.uri,
        'model.display_name': model.display_name,
        'model.name': model.name,
        'model.resource_name': model.resource_name,
        'model.version_id': model.version_id,
        'model.versioned_resource_name': model.versioned_resource_name,
        'customJobs.display_name': customJob.display_name,
        'customJobs.resource_name': customJob.resource_name,
        'customJobs.link': job_resources.uri,
        'customJobs.tensorboard': board_link
    })
    
    expRun.update_state(state = aiplatform.gapic.Execution.State.COMPLETE)
    
    vertex_model.uri = f'https://{region}-aiplatform.googleapis.com/v1/{model.versioned_resource_name}'
    vertex_model.metadata['model_resource_name'] = model.versioned_resource_name
    
    return result(model.version_id)

## Component: endpoint_model

Composant Kubernetes pour gérer les endpoints de modèles dans Vertex AI :

1. Initialise un client Vertex AI spécifique au projet et à la région donnés.
2. Liste et vérifie l'existence d'un endpoint avec un nom d'affichage spécifique et des labels associés.
3. Si l'endpoint existe déjà, il est récupéré ; sinon, un nouvel endpoint est créé avec le nom d'affichage et les labels appropriés.
4. Enregistre l'URI et les métadonnées de l'endpoint existant ou nouvellement créé dans un artefact de sortie pour un accès ultérieur.
5. Parcourt les modèles associés à l'endpoint et, pour chaque modèle, vérifie si celui-ci reçoit 100% du trafic (c'est-à-dire, le modèle actuellement déployé).
6. Enregistre l'URI et les métadonnées du modèle actuellement déployé dans un autre artefact de sortie pour référence et utilisation futures.


In [47]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components", 'pandas']
)

def endpoint_model(
    project: str,
    region: str,
    series: str,
    vertex_endpoint: kfp.dsl.Output[artifact_types.VertexEndpoint],
    vertex_model: kfp.dsl.Output[artifact_types.VertexModel]
):
    
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    endpoints = aiplatform.Endpoint.list(filter = f"labels.series={series} AND display_name={series}")
    if endpoints:
        endpoint = endpoints[0]
        print(f"Endpoint Exists: {endpoints[0].resource_name}")
    else:
        endpoint = aiplatform.Endpoint.create(
            display_name = f"{series}",
            labels = {'series' : f"{series}"}    
        )
        print(f"Endpoint Created: {endpoint.resource_name}")

    vertex_endpoint.uri = f'https://{region}-aiplatform.googleapis.com/v1/{endpoint.resource_name}'
    vertex_endpoint.metadata['endpoint_resource_name'] = endpoint.resource_name
    

    for model in endpoint.list_models():
        if endpoint.traffic_split[model.id] == 100:
            vertex_model.uri = f'https://{region}-aiplatform.googleapis.com/v1/{model.model}@{model.model_version_id}'
            vertex_model.metadata['model_resource_name'] = model.model+'@'+model.model_version_id
 

## Component: rank_model

Composant Kubernetes pour classer les modèles d'apprentissage machine dans Vertex AI :

1. Initialise un client Vertex AI spécifique au projet et à la région donnés.
2. Liste toutes les expériences associées à une série spécifique et récupère leurs données dans un DataFrame.
3. Définit une fonction pour classer les modèles en fonction de leur performance (perte de test) à la fois dans l'ensemble de la série et dans chaque expérience.
4. Ajoute une étiquette 'Status' pour identifier le nouveau modèle et le modèle actuellement déployé.
5. Si le modèle actuellement déployé n'est pas trouvé, ajoute manuellement ses données pour éviter des erreurs dans les classements.
6. Réorganise les colonnes pour mettre 'Status' en premier, facilitant la visualisation.
7. Calcule les rangs pour le nouveau modèle et le modèle actuellement déployé pour fournir des métriques comparatives.
8. Exporte le tableau complet des rangs au format Markdown dans un fichier spécifié.
9. Retourne les rangs expérimentaux et de série pour le nouveau modèle ainsi que le rang de série pour le modèle actuellement déployé.


In [101]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components", 'pandas']
)
def rank_model(
    project: str,
    region: str,
    series: str,
    current_model: kfp.dsl.Input[artifact_types.VertexModel], 
    new_model: kfp.dsl.Input[artifact_types.VertexModel],
    ranks_table: kfp.dsl.Output[kfp.dsl.Markdown]
) -> NamedTuple('outputs', [('new_model_experiment_rank', float), ('new_model_series_rank', float), ('current_model_series_rank', float)]):
    
    from collections import namedtuple
    result = namedtuple('outputs', ['new_model_experiment_rank', 'new_model_series_rank', 'current_model_series_rank'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)

    import pandas as pd
    
    experiments = aiplatform.Experiment.list()
    experiments = [e for e in experiments if e.name.split('-')[0:2] == ['experiment', series]]
    results = []
    for experiment in experiments:
            results.append(experiment.get_data_frame())
            print(experiment.name)
    results = pd.concat(results)
    
    def ranker(metric = 'metric.test_loss'):
        ranks = results[['experiment_name', 'run_name', 'param.model.display_name', 'param.model.version_id', metric]].copy().reset_index(drop = True)
        ranks = ranks[ranks['param.model.display_name'].notnull()]
        ranks['series_rank'] = ranks[metric].rank(method = 'dense', ascending = True)
        ranks['experiment_rank'] = ranks.groupby('experiment_name')[metric].rank(method = 'dense', ascending = True)
        return ranks.sort_values(by = ['series_rank', 'experiment_rank'])
    ranks = ranker('metric.test_loss') 
    
    new_model = aiplatform.Model(model_name = new_model.metadata['model_resource_name'])
    ranks.loc[(ranks['param.model.display_name'] == new_model.display_name) & (ranks['param.model.version_id'] == new_model.version_id), 'Status'] = 'New Model'
    
    try :
        current_model = aiplatform.Model(model_name = current_model.metadata['model_resource_name'])
        ranks.loc[(ranks['param.model.display_name'] == current_model.display_name) & (ranks['param.model.version_id'] == current_model.version_id), 'Status'] = 'On Endpoint'
        
    except KeyError:
        data = {
    'experiment_name': [None],
    'run_name': [None],
    'param.model.display_name': ['current_model'],
    'param.model.version_id': ['1'],
    'metric.test_loss': [1000],
    'series_rank': [1000],
    'experiment_rank': [1000],
    'Status': 'On Endpoint'
        }
        df = pd.DataFrame(data)
        ranks= pd.concat([ranks, df], ignore_index=True)
        
    col = ranks.pop('Status')
    ranks.insert(0, col.name, col)
    
    new_rank = ranks.loc[(ranks['param.model.display_name'] == new_model.display_name) & (ranks['param.model.version_id'] == new_model.version_id)]
    current_rank = ranks.loc[(ranks['param.model.display_name'] == "current_model") & (ranks['param.model.version_id'] == '1')]
    
    with open(ranks_table.path, 'w') as f:
        f.write(ranks.to_markdown(index = False))
    
    return result(new_rank['experiment_rank'].iloc[0], new_rank['series_rank'].iloc[0], current_rank['series_rank'].iloc[0])

## Component: deploy_model

Composant Kubernetes pour déployer un modèle de machine learning sur un endpoint dans Vertex AI 
1. Initialise un client Vertex AI pour le projet et la région spécifiés.
2. Récupère les ressources existantes de l'endpoint et du modèle à partir de leurs métadonnées.
3. Déploie le modèle sur l'endpoint spécifié avec une allocation de trafic de 100%, et configure le type de machine et les compteurs de réplicas.
4. Vérifie les modèles déployés sur l'endpoint pour ajuster la répartition du trafic ou retirer les anciens modèles.
5. Détermine si le modèle spécifié a été correctement déployé et récupère le pourcentage de trafic qui lui est attribué.
6. Retourne le nombre total de modèles déployés, un booléen indiquant si le nouveau modèle a été déployé, et le pourcentage de trafic alloué au nouveau modèle.


In [102]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
def deploy_model(
    project: str,
    region: str,
    deploy_compute: str,
    vertex_endpoint: kfp.dsl.Input[artifact_types.VertexEndpoint], 
    vertex_model: kfp.dsl.Input[artifact_types.VertexModel]
) -> NamedTuple('outputs', [('count_of_deployed', int), ('new_model_deployed', bool), ('traffic_new_model', int)]):
    
    from collections import namedtuple
    result = namedtuple('outputs', ['count_of_deployed', 'new_model_deployed', 'traffic_new_model'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    endpoint = aiplatform.Endpoint(endpoint_name = vertex_endpoint.metadata['endpoint_resource_name'])
    model = aiplatform.Model(model_name = vertex_model.metadata['model_resource_name'])

    endpoint.deploy(
        model = model,
        deployed_model_display_name = model.display_name,
        traffic_percentage = 100,
        machine_type = deploy_compute,
        min_replica_count = 1,
        max_replica_count = 1
    )

    deployed = False
    traffic = 0
    for deployed_model in endpoint.list_models():
        if deployed_model.id not in endpoint.traffic_split:
            endpoint.undeploy(deployed_model_id = deployed_model.id)
        elif f'{deployed_model.model}@{deployed_model.model_version_id}' == vertex_model.metadata['model_resource_name']:
            deployed = True
            traffic = endpoint.traffic_split[deployed_model.id]

    info = endpoint.to_dict()
    
    return result(
        len(info['deployedModels']),
        deployed,
        traffic
    )

## Component: prediction_test

Composant Kubernetes pour effectuer des tests de prédiction sur un modèle déployé dans Vertex AI :

1. Initialise un client Vertex AI spécifié par le projet et la région.
2. Récupère l'endpoint de Vertex AI à partir de ses métadonnées pour accéder au modèle déployé.
3. Effectue une prédiction en utilisant un échantillon de données fourni comme entrée.
4. Retourne les résultats de la prédiction ainsi que le nom de ressource du modèle utilisé pour la prédiction, incluant l'identifiant de version.


In [103]:
@kfp.dsl.component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
def prediction_test(
    project: str,
    region: str,
    sample: list,
    vertex_endpoint: kfp.dsl.Input[artifact_types.VertexEndpoint]
) -> NamedTuple('outputs', [('predictions', list), ('model_resource_name', str)]):

    from collections import namedtuple
    result = namedtuple('outputs', ['predictions', 'model_resource_name'])

    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region) 
    
    endpoint = aiplatform.Endpoint(endpoint_name = vertex_endpoint.metadata['endpoint_resource_name'])
    
    prediction = endpoint.predict(instances = [sample])
    
    return result(
        predictions = prediction.predictions[0],
        model_resource_name = prediction.model_resource_name + '@' + prediction.model_version_id
    )

## Pipeline

Définition d'un pipeline complet de développement dans Vertex AI pour la série et l'expérience spécifiées :
Ce pipeline intègre toutes les étapes nécessaires depuis la configuration initiale jusqu'au déploiement du modèle, y compris l'entraînement, l'enregistrement et les tests de prédiction.

Étapes du pipeline :
1. Configuration de l'expérience : Initialise les paramètres de l'expérience, configure les ressources de BigQuery et prépare les arguments de commande.
2. Entraînement du modèle : Exécute un script local pour l'entraînement du modèle sur les données spécifiées, en utilisant les ressources de calcul définies.
3. Vérification de la complétion de l'entraînement : Contrôle si l'entraînement a réussi avant de procéder.
4. Modèle de point d'entrée avant la mise à jour : Capture l'état actuel du point d'entrée pour la comparaison de modèles.
5. Enregistrement du modèle : Enregistre le modèle entraîné dans le registre de modèles Vertex AI, prêt pour le déploiement.
6. Classement du modèle : Compare les performances du nouveau modèle avec celles du modèle actuellement déployé pour décider de la mise à jour.
7. Déploiement du modèle : Si le nouveau modèle est jugé supérieur, il est déployé au point d'entrée avec le trafic redirigé vers lui.
8. Modèle de point d'entrée après la mise à jour : Capture le nouvel état du point d'entrée après le déploiement du modèle.
9. Test de prédiction : Effectue un test de prédiction pour vérifier les performances du modèle déployé.

Chaque étape du pipeline est configurée pour utiliser des limites de CPU spécifiques et des options de mise en cache pour optimiser les performances et les coûts.


In [104]:
@kfp.dsl.pipeline(
    name = f'series-{SERIES}-{EXPERIMENT}-pipeline',
    description = 'Full development pipeline.'
)


def retrain_pipeline(
    project: str,
    region: str,
    series: str,
    experiment: str,
    experiment_name: str,
    bq_project: str,
    bq_dataset: str,
    bq_table_train: str,
    bq_table_test: str,
    var_target: str,
    var_target1: str,
    cmdargs: list,
    script: str,
    train_image: str,
    requirements: list,
    train_compute: str,
    deploy_image: str,
    deploy_compute: str,
    bucket: str,
    service_account: str
):
    
    from google_cloud_pipeline_components.types import artifact_types
    
    setup = experiment_setup(
        project = project,
        region = region,
        series = series,
        experiment = experiment,
        experiment_name = experiment_name,
        cmdargs = cmdargs,
        bq_project = bq_project,
        bq_dataset = bq_dataset,
        bq_table_train = bq_table_train,
        bq_table_test = bq_table_test,
        var_target = var_target,
        var_target1 = var_target1).set_display_name('Setup').set_cpu_limit('2').set_caching_options(True)
    
    trainer = train_from_local_script(
        project = project,
        region = region,
        series = series,
        experiment = experiment,
        experiment_name = experiment_name,
        run_name = setup.outputs['run_name'],
        cmdargs = setup.outputs['cmdargs'],
        script = script,
        train_image = train_image,
        requirements = requirements,
        train_compute = train_compute,
        timestamp = setup.outputs['timestamp'],
        bucket = bucket,
        service_account = service_account,
        tensorboard = setup.outputs['tensorboard']
    ).set_display_name('Train With Script').set_cpu_limit('2').set_caching_options(True)
    

    with kfp.dsl.If(
        trainer.outputs['state'] == 'JOB_STATE_SUCCEEDED',
        name = 'Check Training Completion'
    ):
    
        endpoint_before = endpoint_model(
            project = project,
            region = region,
            series = series
        ).set_display_name('Endpoint: Before Update').set_cpu_limit('2').set_caching_options(False)
        
        model = register_model(
            project = project,
            region = region,
            series = series,
            experiment = experiment,
            experiment_name = experiment_name,
            run_name = setup.outputs['run_name'],
            deploy_image = deploy_image,
            job_resources = trainer.outputs['job_resources'],
            tensorboard = setup.outputs['tensorboard']
        ).set_display_name('Register Model').set_cpu_limit('2').set_caching_options(False)
    
        rank = rank_model(
            project = project,
            region = region,
            series = series,
            new_model = model.outputs['vertex_model'],
            current_model = endpoint_before.outputs['vertex_model']
        ).set_display_name('Rank Model').set_cpu_limit('2').set_caching_options(False)
        
        with kfp.dsl.If(
            rank.outputs['new_model_series_rank'] < rank.outputs['current_model_series_rank'],
            name = 'Bless New Model'
        ):
            
            deploy = deploy_model(
                project = project,
                region = region,
                deploy_compute = deploy_compute,
                vertex_endpoint = endpoint_before.outputs['vertex_endpoint'],
                vertex_model = model.outputs['vertex_model']
            ).set_display_name('Deploy Model').set_cpu_limit('2').set_caching_options(False)
            
            endpoint_after = endpoint_model(
                project = project,
                region = region,
                series = series
            ).set_display_name('Endpoint: After Update').set_cpu_limit('2').set_caching_options(False).after(deploy)
            
            
            with kfp.dsl.If(
                deploy.outputs['new_model_deployed'] == True,
                name = 'Check The Model Deployment'
            ):
    
                prediction = prediction_test(
                    project = project,
                    region = region,
                    sample = setup.outputs['sample'],
                    vertex_endpoint = endpoint_after.outputs['vertex_endpoint'],
                ).set_display_name('Prediction').set_cpu_limit('2').set_caching_options(False)
    

## Compile Pipeline

Compiler le pipeline en utilisant kfp.compiler.Compiler().

In [105]:
kfp.compiler.Compiler().compile(
    pipeline_func = retrain_pipeline,
    package_path = f"{DIR}/{EXPERIMENT}.json"
)

## Definie Pipeline Job

- Initialiser les parametres d'entree 
- creer un pipeline job avec aiplatform.PipelineJob().

In [106]:
REQUIREMENTS = ['tensorflow_io', f'google-cloud-aiplatform==1.34.0', 'db-dtypes', f"protobuf>=3.19.5"]

In [107]:
CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE)
]

In [108]:
parameter_values = {
    "project": PROJECT_ID,
    "region": REGION,
    "series": SERIES,
    "experiment": EXPERIMENT,
    "experiment_name": EXPERIMENT_NAME,
    "bq_project": BQ_PROJECT,
    "bq_dataset": BQ_DATASET,
    "bq_table_train": BQ_TABLE_TRAIN,
    "bq_table_test": BQ_TABLE_TEST,
    "var_target": VAR_TARGET,
    "var_target1": VAR_TARGET1,
    "cmdargs": CMDARGS,
    "script": SCRIPT_NAME,
    "train_image": TRAIN_IMAGE,
    "requirements": REQUIREMENTS,
    "train_compute": TRAIN_COMPUTE,
    "deploy_image": DEPLOY_IMAGE,
    "deploy_compute": DEPLOY_COMPUTE,
    "bucket": GCS_BUCKET,
    "service_account": SERVICE_ACCOUNT
}

In [109]:
pipeline_job = aiplatform.PipelineJob(
    display_name = f"{SERIES}_{EXPERIMENT}",
    template_path = f"{DIR}/{EXPERIMENT}.json",
    parameter_values = parameter_values,
    pipeline_root = f"{URI}/pipeline_root",
    enable_caching = None, # True (enabled), False (disable), None (defer to component level caching) 
    labels = {'series': SERIES, 'experiment': EXPERIMENT}
)

## Soumettre le travail de pipeline

Soumettre le travail de pipeline pour exécution avec aiplatform.PipelineJob.submit().

In [110]:
response = pipeline_job.submit(
    service_account = SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/northamerica-northeast1/pipelines/runs/series-02-02a-pipeline-20240413062807?project=3515132901


In [111]:
print(f'The Dashboard can be viewed here:\n{pipeline_job._dashboard_uri()}')

The Dashboard can be viewed here:
https://console.cloud.google.com/vertex-ai/locations/northamerica-northeast1/pipelines/runs/series-02-02a-pipeline-20240413062807?project=3515132901


## Attendre le Job Pipeline

mettre en attente l'exécution jusqu'à ce que le travail du pipeline soit terminé avec aiplatform.PipelineJob.wait().

In [112]:
pipeline_job.wait()

PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/3515132901/locations/northamerica-northeast1/pipelineJobs/series-02-02a-pipeline-20240413062807 current state:
PipelineState.PIPELINE_