# Productionazing Machine Learning with Vertex AI

### Install Libraries

In [3]:
!pip3 install --user kfp --upgrade
!pip3 uninstall --user google-cloud-bigquery -y
!pip3 install --user google-cloud-bigquery --upgrade
!pip3 uninstall --user google_cloud_pipeline_components -y
!pip3 install --user google_cloud_pipeline_components --upgrade
!pip3 uninstall --user pyarrow -y
!pip3 install --user pyarrow --upgrade

ud-bigquery) (1.23.0)
Installing collected packages: google-cloud-bigquery
  Attempting uninstall: google-cloud-bigquery
    Found existing installation: google-cloud-bigquery 2.34.1
    Uninstalling google-cloud-bigquery-2.34.1:
      Successfully uninstalled google-cloud-bigquery-2.34.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dbt-bigquery 1.1.0 requires google-cloud-bigquery<3,>=1.25.0, but you have google-cloud-bigquery 3.2.0 which is incompatible.[0m
Successfully installed google-cloud-bigquery-3.2.0

Usage:   
  pip3 uninstall [options] <package> ...
  pip3 uninstall [options] -r <requirements file> ...

no such option: --user
[31mERROR: Error initializing plugin EntryPoint(name='libsecret', value='keyring.backends.libsecret', group='keyring.backends').
Traceback (most recent call last):
  File "/usr/share/python-wheels/resolvelib-0.5.4-py2.

In [4]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.12


In [5]:
import json
from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google.client import AIPlatformClient
from google_cloud_pipeline_components import aiplatform as gcc_aip

from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Model,
    Dataset,
    Metrics,
    InputPath
)


FileNotFoundError: [Errno 2] No such file or directory: '/home/admin_/.local/lib/python3.9/site-packages/google_cloud_bigquery-2.34.1.dist-info/METADATA'

### Set up project ID

In [4]:
import os

PROJECT_ID = "mlops-dev-999"

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  mlops-dev-999-c6b8


### Define Current Timestamp

In [5]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
print('Timestamp: ', TIMESTAMP)

Timestamp:  20220629151703


### Define Variables

In [6]:
BUCKET_NAME = "gs://crazy-pipelines"
REGION = "us-central1" 
ML_PROJECT_NAME = "earnings_classifier"
USER = "crazy-hippo" 

If you need to create the Bucket

In [7]:
! gsutil mb -l $REGION $BUCKET_NAME

Creating gs://crazy-pipelines/...
ServiceException: 409 A Cloud Storage bucket named 'crazy-pipelines' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [8]:
! gsutil ls -al $BUCKET_NAME

                                 gs://crazy-pipelines/earnings_classifier/
                                 gs://crazy-pipelines/models/


### Set Pipeline Root Directory

In [9]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

#Define Pipeline Root
PIPELINE_ROOT = "{}/{}/{}".format(BUCKET_NAME, ML_PROJECT_NAME, USER)
PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/jupyter/.local/bin


'gs://crazy-pipelines/earnings_classifier/crazy-hippo'

### Define Pipeline Comnponents

#### Pipeline Step 1 - Extract Data

In [10]:
@component(output_component_file='data_ingestion.yaml',
          base_image='python:3.9',
          packages_to_install=['pandas', 
                             'google-cloud-bigquery', 
                             'pyarrow' , 
                             'gcsfs',
                             'numpy',
                             'kfp',
                             'db_dtypes'
                              ])
def data_ingestion(
        INPUT_DATA : str,
        DATASET_VERSION : int,
        DATASET : Output[Dataset],
        pipeline_metrics: Output[Metrics]) -> NamedTuple(
          'ComponentOutputs',
          [
              ('dataset_name', str),
              ('dataset_version', int),
              ('num_of_examples', int),
              ('categorical_col', int),
              ('numeric_col', int)
          ]
    ):
    
    #Import libraries
    
    import pandas as pd
    import time
    import numpy
    from google.cloud.bigquery import Client, QueryJobConfig
    from google.cloud import bigquery
    import random
    
    
    
    #Initiate BigQuery Client
    
    examples = random.randint(10000, 32561)
    
    client = Client(project='mlops-dev-999-c6b8', location="us")
    query = """SELECT age, workclass, occupation, education_num, marital_status, capital_gain, label
    FROM `mlops-dev-999-c6b8.earnings_prediction.earnings_raw_data` 
    LIMIT @examples
    """
    
    #Run Query
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("examples", "INT64", examples),
        ]
)
    job = client.query(query, job_config=job_config)
    df = job.to_dataframe()
    
    #Set and calculate Dataset Metadata
    
    dataset_name = INPUT_DATA
    dataset_version = DATASET_VERSION
    num_of_examples = len(df)
    
    #Counting Data Types
    
    categorical_col = 0
    numeric_col = 0
    for col in df.columns : 
        print(type(df[col][0]))
        if type(df[col][0]) == str :  
            categorical_col += 1
        elif type(df[col][0]) == numpy.int64 :
            numeric_col += 1
            
    
    #Write data to GCS 
    
    df.to_csv(DATASET.path, index=False, header=True)
    
    # Log Metrics
    
    pipeline_metrics.log_metric('dataset_name', dataset_name)
    pipeline_metrics.log_metric('dataset_version', dataset_version)
    pipeline_metrics.log_metric('num_of_examples', num_of_examples)
    pipeline_metrics.log_metric('categorical_col', categorical_col)
    pipeline_metrics.log_metric('numeric_col', numeric_col)
    
    
    #Outputs of Component defined by named tuple
    
    from collections import namedtuple
    component_outputs = namedtuple('ComponentOutputs',
        ['dataset_name', 
         'dataset_version', 
         'num_of_examples', 
         'categorical_col', 
         'numeric_col'])
        
    #Returning outputs
    
    return component_outputs(dataset_name, 
                             dataset_version, 
                             num_of_examples, 
                             categorical_col, 
                             numeric_col)
    

#### Pipeline Step 2 - Transform and Prepare Data

In [11]:
@component(output_component_file='data_transformation.yaml',
          base_image='python:3.9',
          packages_to_install=['pandas', 
                             'google-cloud-bigquery', 
                             'pyarrow' , 
                             'gcsfs', 
                             'sklearn',
                             'db_dtypes'])
def data_transformation(
        DATASET : Input[Dataset],
        TRAINING_DATA : Output[Dataset],
        TEST_DATA : Output[Dataset],
        VALIDATION_DATA : Output[Dataset]
    ):
    
    #Import libraries
    import pandas as pd
    import time
    from sklearn.model_selection import train_test_split
    from google.cloud.bigquery import Client, QueryJobConfig
    
    df = pd.read_csv(DATASET.path)
    
    #Drop null values in dataset
    df = df.dropna()
    
    #Transform label to integer data type and format 1 or 0
    df['label'] = [int(1) if x == '>50K' else int(0) for x in df['label']]
    
    #Create training, test and validation datasets
    train, test = train_test_split(df, test_size=0.20, random_state=42)
    train, val = train_test_split(train, test_size=0.20, random_state=42)
    
    print(TRAINING_DATA.path)
    print(TEST_DATA.path)
    print(VALIDATION_DATA.path)

    #Write data to GCS Storage
    train.to_csv(TRAINING_DATA.path, index=False, header=True)
    test.to_csv(TEST_DATA.path, index=False, header=True)
    val.to_csv(VALIDATION_DATA.path, index=False, header=True)


#### Pipeline Step 3 - Train and Save Model

In [12]:
@component(output_component_file='model_training.yaml',
          base_image='python:3.9',
          packages_to_install=['pandas', 
                             'pyarrow' , 
                             'gcsfs' , 
                             'google-cloud-bigquery-storage',
                             'tensorflow==2.5',
                             'db_dtypes'])
def model_training(TRAINING_DATA: Input[Dataset], 
                TEST_DATA: Input[Dataset], 
                VALIDATION_DATA: Input[Dataset],
                MODEL: Output[Model]
               ):
    
    
    import pandas as pd
    import time
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    from tensorflow.keras.layers.experimental import preprocessing
    
    #VARIABLES AND TRAINING PARAMETERS
    TRAIN_DATA = pd.read_csv(TRAINING_DATA.path)
    TEST_DATA = pd.read_csv(TEST_DATA.path)
    VAL_DATA = pd.read_csv(VALIDATION_DATA.path)
    
    BATCH_SIZE = 32
    
    print(tf.__version__)
    
    print(MODEL.path)

    #TENSORFLOW DATASET FUNCTION
    def helperfunc_create_dataset(dataframe, shuffle=True, batch_size=5):
        dataframe = dataframe.copy()
        labels = dataframe.pop('label')
        ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
        if shuffle:
            ds = ds.shuffle(buffer_size=len(dataframe))
        ds = ds.batch(batch_size)
        ds = ds.prefetch(batch_size)
        return ds
    
    #NORMALIZATION FUNCTION
    def helperfunc_get_normalization_layer(name, dataset):
        # Create a Normalization layer for our feature.
        normalizer = preprocessing.Normalization()

        # Prepare a Dataset that only yields our feature.
        feature_ds = dataset.map(lambda x, y: x[name])

        # Learn the statistics of the data.
        normalizer.adapt(feature_ds)

        return normalizer
    
    #CATEGORY ENCODING FUNCTION
    def helperfunc_get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
        # Create a StringLookup layer which will turn strings into integer indices
        if dtype == 'string':
            index = preprocessing.StringLookup(max_tokens=max_tokens)
        else:
            index = preprocessing.IntegerLookup(max_values=max_tokens)

        # Prepare a Dataset that only yields our feature
        feature_ds = dataset.map(lambda x, y: x[name])

        # Learn the set of possible values and assign them a fixed integer index.
        index.adapt(feature_ds)

        # Create a Discretization for our integer indices.
        encoder = preprocessing.CategoryEncoding(max_tokens=index.vocab_size())

        # Prepare a Dataset that only yields our feature.
        feature_ds = feature_ds.map(index)

        # Learn the space of possible indices.
        encoder.adapt(feature_ds)

        # Apply one-hot encoding to our indices. The lambda function captures the
        # layer so we can use them, or include them in the functional model later.
        return lambda feature: encoder(index(feature))
    
    #CREATE TENSORFLOW DATASETS
    TRAIN_DS = helperfunc_create_dataset(TRAIN_DATA, batch_size=BATCH_SIZE)
    VALIDATION_DS = helperfunc_create_dataset(VAL_DATA, shuffle=False, batch_size=BATCH_SIZE)
    TESTING_DS = helperfunc_create_dataset(TEST_DATA, shuffle=False, batch_size=BATCH_SIZE)
    
    #CREATE PREPROCESSING LAYERS
    ALL_INPUTS = []
    ENCODED_FEATURES = []

    NUMERICAL = ['age' , 'capital_gain']
    CATEGORICAL_INT_COLS = ['education_num']
    CATEGORICAL_STRING_COLS = ['occupation', 
                               'workclass', 
                               'marital_status']
    TARGET = ['label']
    
    # Numeric features.
    for header in NUMERICAL:
        numeric_col = tf.keras.Input(shape=(1,), name=header)
        normalization_layer = helperfunc_get_normalization_layer(header, TRAIN_DS)
        encoded_numeric_col = normalization_layer(numeric_col)
        ALL_INPUTS.append(numeric_col)
        ENCODED_FEATURES.append(encoded_numeric_col)
        
    # Categorical features encoded as integers.
    for header in CATEGORICAL_INT_COLS:
        categorical_int_col = tf.keras.Input(shape=(1,), name=header, dtype='int64')
        encoding_layer = helperfunc_get_category_encoding_layer(header, TRAIN_DS, dtype='int64', max_tokens=5)
        encoded_categorical_int_col = encoding_layer(categorical_int_col)
        ALL_INPUTS.append(categorical_int_col)
        ENCODED_FEATURES.append(encoded_categorical_int_col)
    
    # Categorical features encoded as string.
    for header in CATEGORICAL_STRING_COLS:
        categorical_string_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
        encoding_layer = helperfunc_get_category_encoding_layer(header, TRAIN_DS, dtype='string', max_tokens=5)
        encoded_categorical_string_col = encoding_layer(categorical_string_col)
        ALL_INPUTS.append(categorical_string_col)
        ENCODED_FEATURES.append(encoded_categorical_string_col)
    
        
    #CREATE and COMPILE MODEL
    all_features = tf.keras.layers.concatenate(ENCODED_FEATURES)
    x = tf.keras.layers.Dense(32, activation="relu")(all_features)
    x = tf.keras.layers.Dropout(0.5)(x)
    output = tf.keras.layers.Dense(1)(x)
    model = tf.keras.Model(ALL_INPUTS, output)
    model.compile(optimizer='adam',
                  loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=["accuracy"])
    
    #TRAIN MODEL
    history = model.fit(TRAIN_DS, epochs=10, validation_data=VALIDATION_DS)
    
    
    #Define Bucket in GCS for Model Storage
    BUCKET = 'gs://crazy-pipelines/models/'
    
    #Define MODEL PATH 
    MODEL_PATH = BUCKET + 'earnings_model{}'.format(str(int(time.time())))
    
    MODEL.uri = MODEL_PATH 
    
    
    #Save model to Artifact Store for Project
    model.save(MODEL.path)
    
    print('Model saved to: ' + MODEL.path)

    


#### Pipeline Step 4 - Evaluate Saved Model

In [13]:
@component(output_component_file='model_evaluation.yaml',
          base_image='python:3.9',
          packages_to_install=['pandas',
                         'google-cloud-bigquery',
                         'pyarrow', 
                         'gcsfs',
                         'tensorflow==2.5',
                         'google-cloud-aiplatform',
                         'db_dtypes'])
def model_evaluation(MODEL : Input[Model], 
                            TEST_DATA: Input[Dataset], 
                            num_of_examples: int,
                            categorical_col: int,
                            numeric_col: int,
                            pipeline:str, 
                            framework:str,
                            input_path:str,
                            dataset_version:int,
                            pipeline_metrics: Output[Metrics]) ->  NamedTuple(
                                                                    'ComponentOutputs',
                                                                              [
                                                                                  ('accuracy', float),
                                                                                  ('loss', float),
                                                                                  ('dep_decision',str)
                                                                              ]):
    
    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    
    #HELPER FUNCTION - TENSORFLOW DATASET FUNCTION
    def helperfunc_create_dataset(dataframe, shuffle=True, batch_size=5):
        dataframe = dataframe.copy()
        labels = dataframe.pop('label')
        ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
        if shuffle:
            ds = ds.shuffle(buffer_size=len(dataframe))
        ds = ds.batch(batch_size)
        ds = ds.prefetch(batch_size)
        return ds
    
    #LOAD TRAINED MODEL FROM ARTIFACT STORE
    reloaded_model = tf.keras.models.load_model(MODEL.path)
    
    #READ TESTING DATASET
    TESTING_DATA = pd.read_csv(TEST_DATA.path)

    #SET BATCG SIZE
    BATCH_SIZE = 32
    
    #CALL HELPER FUNCTION TO CREATE TENSORFLOW DATASET
    TESTING_DS = helperfunc_create_dataset(TESTING_DATA, shuffle=False, batch_size=BATCH_SIZE)
    
    #EVALUATE MODEL WITH TEST DATA
    loss, accuracy = reloaded_model.evaluate(TESTING_DS)
    
    accuracy = float(accuracy)
    loss = float(loss)
    dep_decision = 'false'
    
    #PRINT ACCURACY METRIC
    print("Accuracy", accuracy)
    print("Loss", loss)
    
    
    from tensorflow.python.lib.io import file_io    
    
    #Write Metrics to BigQuery Table for Validation and possible promotion to Deployment
    from google.cloud.bigquery import Client, QueryJobConfig
    
    #Initiate BigQuery Client
    client = Client(project='mlops-dev-999-c6b8', location="us")
    
    print('Sending Metrics into BigQuery')
    
    #Define DML Query to Insert Metrics into BugQuery
    query = """INSERT `mlops-dev-999-c6b8.earnings_prediction.model_metrics_history` (model_name, pipeline, framework, accuracy, loss)
    VALUES ("{}", "{}", "{}", {}, {})  
    """.format(MODEL.path, pipeline, framework, accuracy, loss)
    
    #Run Query
    job = client.query(query)
    
    print('Metrics sent to BigQuery!')
    
    # Export two metrics
    pipeline_metrics.log_metric('accuracy', accuracy)
    pipeline_metrics.log_metric('loss', loss)

    from collections import namedtuple
    
    component_outputs = namedtuple('ComponentOutputs',
        ['accuracy', 'loss'])
    
    
    from datetime import datetime
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    MY_PROJECT = 'mlops-dev-999-c6b8'
    REGION = 'us-central1'
    EXPERIMENT_NAME = 'earnings-classifier-ver1'
    RUN_NAME = "tensorflow-dl-model-" + TIMESTAMP
    
    
    #Store Experiment Metrics in Vertex AI
    from google.cloud import aiplatform

    aiplatform.init(project=MY_PROJECT, location=REGION, experiment=EXPERIMENT_NAME)
    aiplatform.start_run(run=RUN_NAME)
    
    PARAMETERS = {
        #"Model" : MODEL,
        #"Pipeline" : pipeline,
        #"Dataset" : input_path,
        "Dataset Version" : dataset_version
    }
    
    aiplatform.log_params(PARAMETERS)
    
    METRICS = {
        'Num_of_examples': num_of_examples,
        'Categorical_col': categorical_col,
        'Numeric_col': numeric_col,
        "Accuracy": accuracy, 
        "Loss": loss
    }
    
    aiplatform.log_metrics(METRICS)
    
    from collections import namedtuple
    
    component_outputs = namedtuple('ComponentOutputs', 
                                   ['accuracy', 'loss', 'dep_decision'])
    
        
    return component_outputs(accuracy, loss, dep_decision)

In [14]:
@component(output_component_file='model_deploy.yaml',
          base_image='python:3.9',
          packages_to_install=['pandas',
                         'google-cloud-bigquery',
                         'pyarrow', 
                         'gcsfs',
                         'tensorflow==2.5',
                         'google-cloud-aiplatform',
                         'db_dtypes'])
def model_deploy():
    
    
    print("Model Deployed to Endpoint")
     

### Define Pipeline

In [15]:
@dsl.pipeline(
  name='earnings-classifier',
  description='Binary Classification Model with Tensorflow Deep Learning and Connected Pre-processing Layers'
)
def binary_classifier_earnings_v2(
    pipeline: str = 'DL Version 2 (Tensorflow)',
    framework: str = 'Tensorflow',
    input_path: str = 'mlops-dev-999-c6b8.earnings_prediction.earnings_raw_data',
    dataset_version: int = 2
    ):
    
    first_step = data_ingestion(input_path,
                              dataset_version)
   
    second_step = data_transformation(first_step.outputs['DATASET'])

   
    third_step = model_training(second_step.outputs['TRAINING_DATA'], 
                             second_step.outputs['TEST_DATA'], 
                             second_step.outputs['VALIDATION_DATA'])
    

    fourth_step = model_evaluation(third_step.outputs['MODEL'], 
                                          second_step.outputs['TEST_DATA'],
                                          first_step.outputs['num_of_examples'],
                                          first_step.outputs['categorical_col'],
                                          first_step.outputs['numeric_col'],
                                          pipeline, 
                                          framework, 
                                          input_path, 
                                          dataset_version,
                                  )
    
    with dsl.Condition(
        fourth_step.outputs["dep_decision"] == "true",
        name="deploy_decision",
     ):
        
        fift_step = model_deploy()
        
      


Compile through KFP compiler

In [16]:
from kfp.v2 import compiler  

compiler.Compiler().compile(
    pipeline_func=binary_classifier_earnings_v2, package_path="earnings_pipeline.json"
)



### Test Pipeline in Vertex AI

Set run parameters

In [1]:
DISPLAY_NAME = "earnings-classifier-pipeline"
COMPILED_PIPELINE_PATH = "earnings_pipeline.json"
JOB_ID = ""
PIPELINE_ROOT_PATH = PIPELINE_ROOT
PIPELINE_PARAMETERS = {}
ENABLE_CACHING = False
SERVICE_ACCOUNT = 'ml1-dev-sa@mlops-dev-999-c6b8.iam.gserviceaccount.com'

NameError: name 'PIPELINE_ROOT' is not defined

Run pipeline in Vertex AI

In [18]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(display_name = DISPLAY_NAME,
                             template_path = COMPILED_PIPELINE_PATH,
                             job_id = JOB_ID,
                             pipeline_root = PIPELINE_ROOT_PATH,
                             parameter_values = PIPELINE_PARAMETERS,
                             enable_caching = ENABLE_CACHING,
                             project = PROJECT_ID,
                             location = REGION)

job.submit(
    service_account=SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/965234628650/locations/us-central1/pipelineJobs/earnings-classifier-20220629151728
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/965234628650/locations/us-central1/pipelineJobs/earnings-classifier-20220629151728')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/earnings-classifier-20220629151728?project=965234628650
