# MNIST Train Implements

- 1. Load Data Component
- 2. Train Linear Component
- 3. Deploy Model Component
- 4. Pipeline function
- 5. Run on Vertex AI

<img src="demo2.png" width="50%"/>

### 0. Import Library

In [1]:
import kfp
import matplotlib.pyplot as plt
import pandas as pd
import requests

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from typing import NamedTuple

# We'll use this beta library for metadata querying
from google.cloud import aiplatform_v1beta1
from datetime import datetime

### 1. Load Data Component

In [2]:
@component(
    packages_to_install=["tensorflow", "numpy"]
)
def load_data(
    dataset: Output[Dataset]
):
    import tensorflow as tf
    import numpy as np
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train / 255.0
    x_test = x_test / 255.0
    
    with open(dataset.path, "wb") as f:
        np.savez(
            f,
            x_train=x_train,
            y_train=y_train,
            x_test=x_test,
            y_test=y_test
        )
    print(f"Saved On : {dataset.path}")
    

### 2. Train Linear Component

In [19]:
@component(
    packages_to_install=["tensorflow", "numpy"]
)
def train(
    dataset: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
):
    import tensorflow as tf
    import numpy as np
    
    with open(dataset.path, "rb") as f:
        dataset = np.load(f)
        x_train, y_train = dataset["x_train"], dataset["y_train"]
        x_test, y_test = dataset["x_test"], dataset["y_test"]
        
    linear_model = tf.keras.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ]
    )
    linear_model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    history = linear_model.fit(x_train, y_train, epochs=3)
    
    loss, acc = linear_model.evaluate(x_test, y_test)
    print(f"acc : {acc}")

    metrics.log_metric("accuracy",(acc * 100.0))
    metrics.log_metric("framework", "Tensorflow")
    metrics.log_metric("Model", "LinearModel")
    metrics.log_metric("dataset_size", len(x_train))

    linear_model.save(model.path)
    print(f"Model saved on : {model.path}")

### 3. Deploy Model Component

In [26]:
@component(
    packages_to_install=["google-cloud-aiplatform", "tensorflow", "numpy"]
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="mnist-model-pipeline",
        artifact_uri = model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name


### 4. Pipeline function

In [27]:
PIPELINE_ROOT = "<BUCKET_URI>"
PROJECT_ID = "<PROJECT_ID>"
REGION = "us-central1"

In [28]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="mnist-pipeline",
)
def pipeline(
    project: str = PROJECT_ID,
    region: str = REGION
):
    data_task = load_data()

    train_task = train(
        dataset=data_task.output
    )
    deploy_task = deploy_model(
        model=train_task.outputs["model"],
        project=project,
        region=region
    )


### 5. Run on Vertex AI

In [29]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="mnist_pipeline.json"
)

run1 = pipeline_jobs.PipelineJob(
    display_name="mnist-pipeline",
    template_path="mnist_pipeline.json",
    job_id="mnist-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={},
    enable_caching=True,
)

run1.run()


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/697793444829/locations/us-central1/pipelineJobs/mnist-pipeline-small-20211031101853
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/697793444829/locations/us-central1/pipelineJobs/mnist-pipeline-small-20211031101853')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mnist-pipeline-small-20211031101853?project=697793444829
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/697793444829/locations/us-central1/pipelineJobs/mnist-pipeline-small-20211031101853 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/697793444829/locations/us-central