In [214]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        Model,
                        Metrics,
                        Markdown,
                        HTML,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import google.auth

from datetime import datetime
import pandas as pd
from typing import NamedTuple

In [215]:
%env GOOGLE_APPLICATION_CREDENTIALS=E:\\_UNIVER\UCU\2 sem\MLOps\bird-project\data\bird-project-mlops-vertex-ab4a1e84f536.json

env: GOOGLE_APPLICATION_CREDENTIALS=E:\\_UNIVER\UCU\2 sem\MLOps\bird-project\data\bird-project-mlops-vertex-ab4a1e84f536.json


In [216]:
PROJECT_ID = "bird-project-mlops-vertex"
REGION = 'us-central1'

# BUCKET_NAME="gs://bird-project-mlops-vertex-bucket"
DATA_BUCKET="bird-project-mlops-vertex-data"
BUCKET_NAME="gs://bird-project-mlops-vertex-pipebucket"

PIPELINE_ROOT = f"{BUCKET_NAME}/training_pipe/"

IMAGE_NAME = "training"
SERVING_IMAGE_NAME = "serving"
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/bird-containers/{IMAGE_NAME}"
SERVING_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/bird-containers/{SERVING_IMAGE_NAME}"

In [217]:
@component(
    base_image=BASE_IMAGE,
    packages_to_install=['google-cloud-storage==2.17.0']
)
def obtain_data(
    dataset_bucket: str,
    samples_download: int,
    dataset_full: Output[Dataset],
    # new_samples_count: Output[Metrics],
    min_samples_grow: int = 1,
) -> NamedTuple("Outputs", [("new_samples_count", int)]):
    import os
    import logging
    from google.cloud.storage import Client, transfer_manager
    import pandas as pd
    from time import sleep
    from collections import namedtuple

    logging.getLogger().setLevel(logging.INFO)

    def download_bucket_with_transfer_manager(
        bucket_name, destination_directory="", workers=8, max_results=1000
    ):
        """Download all of the blobs in a bucket, concurrently in a process pool.

        The filename of each blob once downloaded is derived from the blob name and
        the `destination_directory `parameter. For complete control of the filename
        of each blob, use transfer_manager.download_many() instead.

        Directories will be created automatically as needed, for instance to
        accommodate blob names that include slashes.
        """

        # The ID of your GCS bucket
        # bucket_name = "your-bucket-name"

        # The directory on your computer to which to download all of the files. This
        # string is prepended (with os.path.join()) to the name of each blob to form
        # the full path. Relative paths and absolute paths are both accepted. An
        # empty string means "the current working directory". Note that this
        # parameter allows accepts directory traversal ("../" etc.) and is not
        # intended for unsanitized end user input.
        # destination_directory = ""

        # The maximum number of processes to use for the operation. The performance
        # impact of this value depends on the use case, but smaller files usually
        # benefit from a higher number of processes. Each additional process occupies
        # some CPU and memory resources until finished. Threads can be used instead
        # of processes by passing `worker_type=transfer_manager.THREAD`.
        # workers=8

        # The maximum number of results to fetch from bucket.list_blobs(). This
        # sample code fetches all of the blobs up to max_results and queues them all
        # for download at once. Though they will still be executed in batches up to
        # the processes limit, queueing them all at once can be taxing on system
        # memory if buckets are very large. Adjust max_results as needed for your
        # system environment, or set it to None if you are sure the bucket is not
        # too large to hold in memory easily.
        # max_results=1000
        logging.info("Libs imported")

        storage_client = Client()

        logging.info("Client created")

        bucket = storage_client.bucket(bucket_name)

        logging.info("Bucket created")

        blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)]

        logging.info("Blobs listed")

        transfer_manager.download_many_to_path(
            bucket, blob_names, destination_directory=destination_directory, max_workers=workers
        )

        logging.info("Blobs downloaded")

    download_bucket_with_transfer_manager(dataset_bucket, dataset_full.path, max_results = samples_download)

    dataset_df_path = os.path.join(dataset_full.path, 'dataset.csv')
    df = pd.read_csv(dataset_df_path)

    if 'trained_on' in df.columns:
        new_samples_count_val = len(df) - len(df.loc[df['trained_on']])
    else:
        new_samples_count_val = len(df)

    if new_samples_count_val > min_samples_grow:
        df['trained_on'] = True

        df.to_csv(dataset_df_path, index=False)

        storage_client = Client()
        bucket = storage_client.bucket(dataset_bucket)

        bucket.delete_blob('dataset.csv')

        sleep(10)

        blob = bucket.blob('dataset.csv')
        blob.upload_from_filename(dataset_df_path, if_generation_match=0)
    
    outputs = namedtuple("Outputs", ["new_samples_count"])
    return outputs(new_samples_count_val)


In [218]:
@component(
    base_image=BASE_IMAGE,
)
def preprocess_data(
    dataset_full: Input[Dataset],
    CLASS2ID_data: Output[Dataset],
):
    import os
    import json
    import logging
    import pandas as pd

    logging.getLogger().setLevel(logging.INFO)
   
    dataset_df_path = os.path.join(dataset_full.path, 'dataset.csv')
    df = pd.read_csv(dataset_df_path)

    logging.info("Dataframe loaded")

    df['file_path'] = df['path'].apply(lambda x: f"{dataset_full.path}/{x.split('/', 3)[-1]}")
    
    df = df.loc[df['file_path'].apply(os.path.exists)]

    CLASS2ID = {classname: i for i, classname in enumerate(df['label'].unique())}
    class2id_json = json.dumps(CLASS2ID, indent=4)
    with open(CLASS2ID_data.path, 'w') as class2id_file:
        class2id_file.write(class2id_json)

    df['label_id'] = df['label'].apply(CLASS2ID.get)

    df.to_csv(dataset_df_path, index=False)

    logging.info("Dataframe saved")

In [219]:
@component(
    base_image=BASE_IMAGE,
)
def train_test_split(dataset_full: Input[Dataset],
                     dataset_train: Output[Dataset],
                     dataset_test: Output[Dataset],
                     test_size: float = 0.05):
    import pandas as pd
    import logging
    import os

    logging.getLogger().setLevel(logging.INFO)

    dataset_df_path = os.path.join(dataset_full.path, 'dataset.csv')
    df = pd.read_csv(dataset_df_path)

    df_test = df.sample(int(test_size * len(df)))
    df_train = df.loc[~df.index.isin(df_test.index)]

    logging.info("Dataframe splitted")

    df_train.to_csv(dataset_train.path, index=False)
    df_test.to_csv(dataset_test.path, index=False)
    logging.info("Dataframes splitted saved")

In [220]:
@component(
    base_image=BASE_IMAGE
)
def train_model(dataset_train: Input[Dataset],
                CLASS2ID_data: Input[Dataset],
                model_out: Output[Model],
                batch_size: int = 16,
                epochs_count: int = 10):
    import torch
    import pandas as pd
    import numpy as np
    from torch.utils.data import DataLoader
    from tqdm import tqdm
    import logging
    import os
    import json
    
    from src.dataset import AudioDataset, SAMPLE_LEN_SEC, SAMPLE_RATE
    from src.model import BaselineBirdClassifier

    logging.getLogger().setLevel(logging.INFO)

    with open(CLASS2ID_data.path, 'r') as class2id_file:
        CLASS2ID = json.load(class2id_file)

    train_df = pd.read_csv(dataset_train.path)

    train_ds = AudioDataset(train_df['file_path'].tolist(), train_df['label_id'].tolist())
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)

    logging.info("Dataset and dataloader for train created")

    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    model = BaselineBirdClassifier(len(CLASS2ID), sr=SAMPLE_RATE).to(device)

    logging.info("Model created")

    loss_fn = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.RAdam(model.parameters(), lr=1e-3)

    batch_num = 0

    for epoch in tqdm(range(epochs_count), desc='Epoch'):
        for audios, labels in train_loader:
            audios = audios.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(audios)

            loss = loss_fn(outputs, labels)
            loss.backward()

            optimizer.step()            
            batch_num += 1
    
    logging.info("Model trained")
    
    torch.save(model.state_dict(), model_out.path)

    logging.info("Model saved")


In [221]:
@component(
    base_image=BASE_IMAGE,
)
def eval_model(dataset_test: Input[Dataset],
                CLASS2ID_data: Input[Dataset],
                model_out: Input[Model],
                metrics: Output[Metrics],
                main_metric: str) -> NamedTuple("Outputs", [("main_metric_val", float)]):
    import torch
    import pandas as pd
    import numpy as np
    from torch.utils.data import DataLoader
    import logging
    import os
    import json
    from collections import namedtuple
    
    from src.dataset import AudioDataset, SAMPLE_RATE, obtain_metrics
    from src.model import BaselineBirdClassifier

    with open(CLASS2ID_data.path, 'r') as class2id_file:
        CLASS2ID = json.load(class2id_file)

    logging.getLogger().setLevel(logging.INFO)

    test_df = pd.read_csv(dataset_test.path)

    test_ds = AudioDataset(test_df['file_path'].tolist(), test_df['label_id'].tolist())
    test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)

    logging.info("Test dataset and dataloader created")

    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    model = BaselineBirdClassifier(len(CLASS2ID), sr=SAMPLE_RATE)
    model.load_state_dict(torch.load(model_out.path, map_location=device))

    logging.info("Model loaded")

    loss_fn = torch.nn.CrossEntropyLoss()

    model.eval()
    eval_running_loss = 0.
    outputs_list = []
    labels_list = []
    with torch.no_grad():
        for audios, labels in test_loader:
            audios = audios.to(device)
            labels = labels.to(device)

            outputs = model(audios)
            loss = loss_fn(outputs, labels)

            eval_running_loss += loss.item()
            outputs_list.append(outputs.cpu().numpy())
            labels_list.append(labels.cpu().numpy())
    
    logging.info("Test dataset forward pass finished")

    eval_running_loss = eval_running_loss/len(test_loader.dataset)

    outputs = np.concatenate(outputs_list, axis=0)
    labels = np.concatenate(labels_list, axis=0)

    test_metrics = obtain_metrics(labels, outputs)

    logging.info("Metrics calculated")

    for metric_name, val in test_metrics.items():
        metrics.log_metric(metric_name, float(val))
    
    logging.info("Test metrics logged")

    outputs = namedtuple("Outputs", ["main_metric_val"])
    return outputs(test_metrics[main_metric])

In [222]:
@component(
    base_image=BASE_IMAGE,
    packages_to_install=['google-cloud-storage==2.17.0', 'google-cloud-aiplatform==1.59.0', 'onnx==1.16.1','onnxscript==0.1.0.dev20240528']
)
def deploy_model(
        serving_container_image_uri: str,
        display_name: str,
        model_endpoint: str,
        gcp_project: str,
        gcp_region: str,
        model: Input[Model],
        CLASS2ID_data: Input[Dataset],
        model_onnx: Output[Model],
        vertex_model: Output[Model],
        vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path
    import json
    import torch
    import logging

    from src.dataset import SAMPLE_RATE, SAMPLE_LEN_SEC
    from src.model import BaselineBirdClassifier

    logging.getLogger().setLevel(logging.INFO)
    
    with open(CLASS2ID_data.path, 'r') as class2id_file:
        CLASS2ID = json.load(class2id_file)

    # Checks existing Vertex AI Enpoint or creates Endpoint if it is not exist.
    def create_endpoint ():
        endpoints = vertex_ai.Endpoint.list(
            filter=f'display_name="{model_endpoint}"',
            order_by='create_time desc',
            project=gcp_project,
            location=gcp_region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0] # most recently created
        else:
            endpoint = vertex_ai.Endpoint.create(
                display_name=model_endpoint,
                project=gcp_project,
                location=gcp_region
        )
        return endpoint

    endpoint = create_endpoint()
    
    logging.info("Endpoint created")

    # Uploads trained model to Vertex AI Model Registry or creates new model version into existing uploaded one.
    def upload_model():
        listed_model = vertex_ai.Model.list(
            filter=f'display_name="{display_name}"',
            project=gcp_project,
            location=gcp_region,
        )
        if len(listed_model) > 0:
            model_version = listed_model[0] # most recently created
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    parent_model=model_version.resource_name,
                    artifact_uri=str(Path(model_onnx.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        else:
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    artifact_uri=str(Path(model_onnx.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        return model_upload
    
    model_pt = BaselineBirdClassifier(len(CLASS2ID), sr=SAMPLE_RATE)
    model_pt.load_state_dict(torch.load(model.path, map_location='cpu'))
    model_pt.eval()

    logging.info("Torch model downloaded")

    torch_input = torch.randn(8, SAMPLE_RATE*SAMPLE_LEN_SEC)
    torch.onnx.export(model_pt.cpu(),
                    torch_input,
                    model_onnx.path,
                    export_params=True,
                    do_constant_folding=True,
                    input_names = ['input'],
                    output_names = ['output'],
                    dynamic_axes={'input' : {0: 'batch_size', 1: 'sample_length'},
                                'output' : {0: 'batch_size'}}
    )

    logging.info("ONNX model created")

    uploaded_model = upload_model()
    
    logging.info("Model uploaded")

    # Save data to the output params
    vertex_model.uri = uploaded_model.resource_name

    # Deploys trained model to Vertex AI Endpoint
    model_deploy = uploaded_model.deploy(
        machine_type='e2-standard-4',
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=display_name,
    )

    logging.info("Model deployed")

    # Save data to the output params
    vertex_endpoint.uri = model_deploy.resource_name

In [259]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = f'birds' 

In [260]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="bird-pipeline"   
)
def pipeline(
    data_filepath: str = DATA_BUCKET,
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,
    serving_container_image_uri: str = SERVING_IMAGE,
    test_size: float = 0.05,
    batch_size: int = 16,
    epochs_count: int = 1,
    samples_download: int = 40,
    min_samples_grow: int = -1,
    main_metric: str = 'macro_f1',
    main_metric_thresh: float = 0.75,
):
    data_op = obtain_data(dataset_bucket=data_filepath, samples_download=samples_download, min_samples_grow=min_samples_grow)
    with dsl.Condition(data_op.outputs["new_samples_count"] > min_samples_grow, "enough-new-data"):
        data_preprocess_op = preprocess_data(dataset_full=data_op.outputs["dataset_full"])
        train_test_split_op = train_test_split(dataset_full=data_op.outputs["dataset_full"], test_size=test_size).after(data_preprocess_op)
        train_model_op = train_model(dataset_train=train_test_split_op.outputs["dataset_train"], CLASS2ID_data=data_preprocess_op.outputs["CLASS2ID_data"], batch_size=batch_size, epochs_count=epochs_count)
        model_evaluation_op = eval_model(dataset_test=train_test_split_op.outputs["dataset_test"], CLASS2ID_data=data_preprocess_op.outputs["CLASS2ID_data"], model_out=train_model_op.outputs["model_out"], main_metric=main_metric)
        with dsl.Condition(model_evaluation_op.outputs['main_metric_val'] > main_metric_thresh, 'save-model-choice'):
            deploy_op = deploy_model(model = train_model_op.outputs['model_out'],
                gcp_project = project,
                gcp_region = region, 
                serving_container_image_uri = serving_container_image_uri,
                display_name = display_name,
                model_endpoint = f"{display_name}_endpoint",
                CLASS2ID_data=data_preprocess_op.outputs["CLASS2ID_data"],
            )

  with dsl.Condition(data_op.outputs["new_samples_count"] > min_samples_grow, "enough-new-data"):
  with dsl.Condition(model_evaluation_op.outputs['main_metric_val'] > main_metric_thresh, 'save-model-choice'):


In [261]:
compiler.Compiler().compile(pipeline_func=pipeline,
    package_path='bird-pipeline.json')

In [262]:
credentials, project = google.auth.default()

In [263]:
aiplatform.init(project = PROJECT_ID,
                location = REGION,
                credentials = credentials)

In [264]:
start_pipeline = aiplatform.PipelineJob(
    display_name="pipeline-birds",
    template_path="bird-pipeline.json",
    enable_caching=True,
    job_id=f"bird-pipeline-{TIMESTAMP}",
    location=REGION,
    parameter_values={
        'samples_download': 100,
        'main_metric': 'macro_precision',
        'epochs_count': 2,
        'main_metric_thresh': 0.5
    }
)

In [265]:
start_pipeline.run(service_account="bird-google-storage-account@bird-project-mlops-vertex.iam.gserviceaccount.com")

Creating PipelineJob
PipelineJob created. Resource name: projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bird-pipeline-20240720144940?project=64206774286
PipelineJob projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940 current state:
3
PipelineJob projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940 current state:
3
PipelineJob projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940 current state:
3
PipelineJob projects/64206774286/locations/us-central1/pipelineJobs/bird-pipeline-20240720144940 current state:
3
PipelineJob projects/64206774286/locations/us-central1/pipelineJobs/bird-pipelin

In [None]:
# start_pipeline.create_schedule(
#   display_name="bird-schedule",
#   cron="TZ=0 9 * * *",
#   max_concurrent_run_count=2,
#   max_run_count=1000)
