# Titanic on Vertex AI - MLOps intoductory lab

<table align="left">
  <td>
    <a>
       <img src="https://s9i7q5a6.rocketcdn.me/solutions/wp-content/uploads/2022/10/Vertex-AI-Logo.webp" style="max-width: 25%; height: auto;">
    </a>
    <a>
       <img src="https://cdn1.naekranie.pl/media/cache/amp/2023/12/Titanic_6576d7d208376.jpg" style="max-width: 50%; height: auto;">
    </a>
  </td> 
</table>

### Overview

The purpose of this notebook is to present capabilities of Vertex AI in ML models development, serving and monitoring. It covers following components:

- Exploratory Data Analysis
- Model Experimentation
- Model Training in Vertex AI
- Model Serving
- Model Monitoring (including Tensorboard)

### Import packages
Import the following packages required to execute this notebook.

In [None]:
!pip install tensorflow-io==0.31.0

In [None]:
import datetime
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import random
import string

from google.cloud import bigquery
from tensorflow.keras import regularizers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score, accuracy_score
from google.cloud import aiplatform as vertex_ai

# Set the seed fixed to make the output deterministic
tf.random.set_seed(42)

# Generate unique ID to help w/ unique naming of certain pieces
ID = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))

### Setup your environment

Run the next cell to set your project ID and some of the other constants used in the lab.

In [None]:
PROJECT_ID = '...' #ToDo: add project ID
REGION = 'us-central1'
BUCKET_NAME = "gs://..." #ToDo: select existing GS destination
EXPERIMENT_NAME = f"titanic-experiments-{ID}"

### Initialize clients

Next you have to initialize the Vertex AI SDK and the Python BigQuery Client for your project, region and corresponding bucket.

In [None]:
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET_NAME,
    experiment=EXPERIMENT_NAME,
)

In [None]:
bq_client = bigquery.Client(project=PROJECT_ID, location=REGION)

### Data import and EDA

The data is imported from storage.googleapis.com - it's split on train and eval sets - we concatenate them as we require split on: train, eval and test sets later on.

In [None]:
titanic_t = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv')
titanic_e = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv')
union_data = pd.concat([titanic_t, titanic_e], ignore_index=True)

Summary of data

In [None]:
union_data.info()

Examine the label distribution

In [None]:
plt.title("Count of passengers by survived label ('survived' = 0 or 1)")
sns.countplot(x="survived", data=union_data)

And now let's take a look at continuous predictors:

In [None]:
var = list(union_data.columns)
var.remove('survived')

In [None]:
union_data[var].hist(figsize=(20, 10), grid=False)

And now for categorical ones:

In [None]:
categorical_vars = ['sex', 'class', 'deck', 'embark_town', 'alone'] 

In [None]:
# Create an empty dictionary to store the cross-tabulations
cross_tabs = {}

# Generate cross-tabulations for each categorical variable
for var in categorical_vars:
    cross_tabs[var] = pd.crosstab(index=union_data[var], columns='count')

# Print the cross-tabulations
for var, table in cross_tabs.items():
    print(f"Cross-tabulation for {var}:\n{table}\n")

To run correlation matrix, first we run one-hot encoding:

In [None]:
union_d = pd.get_dummies(union_data, columns=['sex', 'class', 'deck', 'embark_town', 'alone'])
union_d = union_d * 1

In [None]:
variables = list(union_d.columns)
variables.remove('survived')

Correlation matrix:

In [None]:
plt.figure(figsize=(8, 5))
sns.heatmap(union_d[variables].corr())

### Feature Engineering

There is a possibility to run feature engineering using BigQuery - let's export the data first:

In [None]:
# dataset schema
dataset_id = f"{PROJECT_ID}.titanic_data"

dataset = bigquery.Dataset(dataset_id)
dataset.location = "us-central1"

In [None]:
#ToDo: uncomment
# create the dataset
# dataset = bq_client.create_dataset(dataset, timeout=30)

In [None]:
# export data table
table_id = f"{PROJECT_ID}.titanic_data.titanic"

job_config = bigquery.LoadJobConfig(
    autodetect=True,  
    write_disposition="WRITE_TRUNCATE"
)

union_d['age'] = union_d['age'].astype(int)
job = bq_client.load_table_from_dataframe(
    union_d, table_id, job_config=job_config
)

job.result() 

You can write any SQL statements you want - including creating new variables:

In [None]:
%%bigquery

SELECT
  AVG(titanic.age) AS avg_age,
  titanic.embark_town_Cherbourg,
  titanic.embark_town_Queenstown,
  titanic.embark_town_Southampton,
  titanic.embark_town_unknown
FROM
  `... .titanic_data.titanic` AS titanic #ToDo: put project ID in front of
GROUP BY
  2,
  3,
  4,
  5;

### Experimenting

Before training the model, you can set some hyperparameters to help us improve the model's performance. We advise you to use Vertex AI Vizier, which automates the optimization of hyperparameters, to help with hyperparameter tuning. However, in this notebook, we specify these hyperparameters manually and randomly for the sake of simplicity and expedience.

In [None]:
# data split
train, test = train_test_split(union_d, test_size=0.2)
train, val = train_test_split(train, test_size=0.25)
print(len(train), 'train examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')

#### Monitoring with tensorboard

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(len(variables),)),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

In [None]:
# a utility method to create a tf.data dataset from a Pandas Dataframe
def df_to_dataset(dataframe, shuffle=True, batch_size=32):
  dataframe = dataframe.copy()
  labels = dataframe.pop('survived')
  # Convert features to a numpy array with the correct shape
  features = dataframe.values  
  ds = tf.data.Dataset.from_tensor_slices((features, labels))
  if shuffle:
    ds = ds.shuffle(buffer_size=len(dataframe))
  ds = ds.batch(batch_size)
  return ds

In [None]:
batch_size = 32 
train_dataset = df_to_dataset(train, batch_size=batch_size)
val_dataset = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_dataset = df_to_dataset(test, shuffle=False, batch_size=batch_size)

In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# Clear any logs from previous runs
!rm -rf ./logs/ 

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
history_model = model.fit(train_dataset, 
                                      validation_data=val_dataset,
                                      epochs=50,
                                      callbacks=[tensorboard_callback])

In [None]:
%load_ext tensorboard
%tensorboard --logdir logs/fit --port 6005

#### Registering model experiments in Vertex AI

In [None]:
parameters = [
    {"nneur1": 64, "dropout": 0.5, "nneur2": 30, "l2reg": 0.01},
    {"nneur1": 64, "dropout": 0.6, "nneur2": 20, "l2reg": 0.10},
    {"nneur1": 64, "dropout": 0.7, "nneur2": 10, "l2reg": 0.01},
    {"nneur1": 32, "dropout": 0.5, "nneur2": 30, "l2reg": 0.01},
    {"nneur1": 32, "dropout": 0.6, "nneur2": 20, "l2reg": 0.10},
    {"nneur1": 32, "dropout": 0.7, "nneur2": 10, "l2reg": 0.01},
    {"nneur1": 16, "dropout": 0.5, "nneur2": 30, "l2reg": 0.05},
    {"nneur1": 16, "dropout": 0.6, "nneur2": 20, "l2reg": 0.10},
    {"nneur1": 16, "dropout": 0.7, "nneur2": 10, "l2reg": 0.01},
]

models = {}
for i, params in enumerate(parameters):
    run_name = f"titanic-model-{ID}-{i}"
    print(run_name)
    vertex_ai.start_run(run=run_name)
    vertex_ai.log_params(params)
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(len(variables),)),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(train_dataset,validation_data=val_dataset,epochs=50)
    models[run_name] = model
    
    y_pred = model.predict(test_dataset)
    y_pred = (y_pred >= 0.5).astype(int) 
    y_pred_series = pd.Series(y_pred.flatten()) 
    
    y_pred = model.predict(test_dataset)
    acc_score = accuracy_score(test['survived'], y_pred_series)
    val_f1_score = f1_score(test['survived'], y_pred_series, average="weighted")
    vertex_ai.log_metrics({"acc_score": acc_score, "f1score": val_f1_score})
    vertex_ai.end_run()

### Building a ML model using Vertex AI custom training

#### Building a Vertex AI dataset

In this section,we will create a managed Vertex AI dataset. Vertex AI datasets can be used to train AutoML models or custom-trained models.

In [None]:
# create Vertex AI managed dataset
dataset = vertex_ai.TabularDataset.create(
    display_name="titanic-dataset",
    bq_source=f"bq://{table_id}",
)

#### Preparing model procedure for coming container

In [None]:
# create a folder for all container-related files
!mkdir -p -m 777 build_training

In [None]:
%%writefile build_training/train_model.py

# Libraries --------------------------------------------------------------------------------------------------------------------------

import datetime
import argparse
import os
import tensorflow as tf
import numpy as np
import pandas as pd

from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

from tensorflow.keras import regularizers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score, accuracy_score

# Variables --------------------------------------------------------------------------------------------------------------------------

def get_args():
    parser = argparse.ArgumentParser()

    # Data files arguments
    parser.add_argument("--ID", dest="ID", type=str,
                        required=True, help="generated random ID")
    
    return parser.parse_args()

def gcs_path_to_local_path(old_path):
    new_path = old_path.replace("gs://", "/gcs/")
    return new_path

# Read environmental variables
args = get_args()
ID = args.ID

TRAINING_DATA_PATH = os.environ["AIP_TRAINING_DATA_URI"]
TEST_DATA_PATH = os.environ["AIP_TEST_DATA_URI"]

MODEL_DIR = gcs_path_to_local_path(os.environ["AIP_MODEL_DIR"])
MODEL_PATH = MODEL_DIR

def main():

    def features_and_labels(features):
      label = features.pop('survived') # this is what we will train for
      return features, label

    def read_dataset(client, batch_size=48):
        GCP_PROJECT_ID='...'  #ToDo: change
        COL_NAMES = ['age', 'sex_female', 'class_First', 'class_Second', 'alone_y', 'survived']
        COL_TYPES = [dtypes.int64, dtypes.int64, dtypes.int64, dtypes.int64, dtypes.int64, dtypes.int64]
        DATASET_GCP_PROJECT_ID = GCP_PROJECT_ID
        DATASET_ID, TABLE_ID,  = 'titanic_data.titanic'.split('.')
        bqsession = client.read_session(
            "projects/" + GCP_PROJECT_ID,
            DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
            COL_NAMES, COL_TYPES,
            requested_streams=2)
        dataset = bqsession.parallel_read_rows()

        def preprocess(features):
            label = features.pop('survived')
            label = tf.cast(label, tf.int64)

            numerical_features = []

            for feature_name, feature_value in features.items():
                    numerical_features.append(feature_name)

            # Cast to tf.float64
            numerical_features = [tf.cast(features[name], tf.int64) for name in numerical_features]

            # Stack the numerical features into a 1D tensor
            concatenated_features = tf.stack(numerical_features, axis=0) 

            return concatenated_features, label  # No need for tf.expand_dims

        transformed_ds = dataset.map(preprocess)
        return transformed_ds    

    client = BigQueryClient()

    newset = read_dataset(client, 2).batch(21)
    
    model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(5,)),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    model.fit(newset, validation_data=newset, epochs=50)

    model.save(MODEL_PATH)
    
if __name__ == "__main__":
    main()

#### Building a Docker container

In [None]:
IMAGE_REPOSITORY = f"titanic-repo-{ID}"

In [None]:
IMAGE_NAME = f"classification-{ID}"
IMAGE_TAG = "v1"
IMAGE_URI = f"us-central1-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REPOSITORY}/{IMAGE_NAME}:{IMAGE_TAG}"
TRAIN_COMPUTE = "e2-standard-4"
DEPLOY_COMPUTE = "n1-standard-4"

TRAIN_JOB_NAME = f"titanic-train-{ID}"

In [None]:
# Create image repository
!gcloud artifacts repositories create $IMAGE_REPOSITORY      --repository-format=docker      --location=us-central1      --description="Titanic Docker Image repository"

# List repositories under the project
!gcloud artifacts repositories list

# Get info on the repository
!gcloud artifacts repositories describe $IMAGE_REPOSITORY --location=us-central1

In [None]:
!gcloud auth configure-docker us-central1-docker.pkg.dev -q

In [None]:
%%writefile build_training/Dockerfile
# Specifies base image and tag
FROM python:3.7
WORKDIR /root

# Installs additional packages
RUN pip install tensorflow-io==0.31.0 tensorflow==2.11.0 gcsfs numpy pandas scikit-learn dask distributed xgboost --upgrade

# Copies the trainer code to the docker image.
COPY ./train_model.py /root/train_model.py

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python3", "train_model.py"]

In [None]:
!docker build -t $IMAGE_URI ./build_training/
!docker push $IMAGE_URI

#### Training model in Vertex AI

In [None]:
MODEL_SERVING_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest"
)

In [None]:
job = vertex_ai.CustomContainerTrainingJob(
    display_name=TRAIN_JOB_NAME,
    container_uri=IMAGE_URI,
    model_serving_container_image_uri=MODEL_SERVING_IMAGE_URI,
)

CMDARGS = [
    f"--ID={ID}",
]

In [None]:
model_vertex = job.run(
    dataset=dataset,
    model_display_name= f"titanic-vertex-{ID}",
    args=CMDARGS,
    replica_count=1,
    machine_type=TRAIN_COMPUTE,
    accelerator_count=0,
    bigquery_destination=f"bq://{PROJECT_ID}",
)

### Deploying model to endpoint

In [None]:
ENDPOINT_NAME = f"titanic-{ID}-endpoint"

endpoint = model_vertex.deploy(
    deployed_model_display_name=ENDPOINT_NAME,
    traffic_split={"0": 100},
    machine_type="n1-standard-4",
    accelerator_count=0,
    min_replica_count=1,
    max_replica_count=1,
)

### Setting model monitoring

In [None]:
# Sampling rate (optional, default=.8)
LOG_SAMPLE_RATE = 1  # @param {type:"number"}

# Monitoring Interval in hours (optional, default=1).
MONITOR_INTERVAL = 1  # @param {type:"number"}

# Skew and drift thresholds.
DEFAULT_THRESHOLD = 0.003

SKEW_THRESHOLDS = {
    "age": DEFAULT_THRESHOLD,
    "alone_y": DEFAULT_THRESHOLD,
}
DRIFT_THRESHOLDS = {
    "age": DEFAULT_THRESHOLD,
    "alone_y": DEFAULT_THRESHOLD,
}
ATTRIB_SKEW_THRESHOLDS = {
    "age": DEFAULT_THRESHOLD,
    "alone_y": DEFAULT_THRESHOLD,
}
ATTRIB_DRIFT_THRESHOLDS = {
    "age": DEFAULT_THRESHOLD,
    "alone_y": DEFAULT_THRESHOLD,
}

In [None]:
from google.cloud.aiplatform import model_monitoring

skew_config = model_monitoring.SkewDetectionConfig(
    data_source=f"bq://{table_id}",
    skew_thresholds=SKEW_THRESHOLDS,
    attribute_skew_thresholds=ATTRIB_SKEW_THRESHOLDS,
    target_field='survived',
)

drift_config = model_monitoring.DriftDetectionConfig(
    drift_thresholds=DRIFT_THRESHOLDS,
    attribute_drift_thresholds=ATTRIB_DRIFT_THRESHOLDS,
)

objective_config = model_monitoring.ObjectiveConfig(
    skew_config, drift_config
)

# Create sampling configuration
random_sampling = model_monitoring.RandomSampleConfig(sample_rate=LOG_SAMPLE_RATE)

# Create schedule configuration
schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL)

# Create alerting configuration.
emails = ["..."] #ToDo: fill
alerting_config = model_monitoring.EmailAlertConfig(
    user_emails=emails, enable_logging=True
)

In [None]:
MONITORING_JOB_NAME = f"titanic-monitoring-{ID}"

job_monitoring = vertex_ai.ModelDeploymentMonitoringJob.create(
    display_name=MONITORING_JOB_NAME,
    logging_sampling_strategy=random_sampling,
    schedule_config=schedule_config,
    alert_config=alerting_config,
    objective_configs=objective_config,
    project=PROJECT_ID,
    location=REGION,
    endpoint=endpoint,
)

In [None]:
num_requests = 1000

while True:
    print("Simulation started...")
    for idx in range(num_requests):
        
        request = [[0, 0, 0, 0, 0] for _ in range(5000)]

        endpoint.predict(request)

        if idx % 100 == 0:
            print(f'{idx + 1} of {num_requests} prediction requests we`re invoked.')
    print("Simulation finished.")
    
    time.sleep(60*3)

In [None]:
# #close resources:

# # [1] storage bucket
# !gsutil -m rm -r gs://{BUCKET_NAME}/**
# !gsutil rb gs://{BUCKET_NAME}

# # [2] models and endpoints
# def delete_all_models(PROJECT_ID, REGION):

#     vertex_ai.init(project=PROJECT_ID, location=REGION)
#     endpoints = vertex_ai.Endpoint.list()  # Get all endpoints

#     for endpoint in endpoints:
#         endpoint.undeploy_all()
#         print(f"Undeployed endpoints")
    
#     for model in vertex_ai.Model.list():
#         model.delete()
#         print(f"Deleted model: {model.name}")

# delete_all_models(PROJECT_ID, REGION)

# # [3] experiments
# experiments = vertex_ai.Experiment.list()
# for experiment in experiments:
#     experiment.delete(delete_backing_tensorboard_runs=True)

# # [4] datasests
# def delete_all_datasets():
#     try:
#         datasets = vertex_ai.TabularDataset.list() # or ImageDataset, TextDataset, etc.

#         for dataset in datasets:
#             dataset.delete()
#             print(f"Deleted dataset: {dataset.name}")

#         print("All datasets deleted successfully.")

#     except Exception as e:
#         print(f"Error deleting datasets: {e}")

# delete_all_datasets()

# # [5] featurestores

# # [6] artifact registry
# client = artifactregistry_v1.ArtifactRegistryClient()
# repositories = client.list_repositories(parent=f"projects/{PROJECT_ID}/locations/{REGION}")
# for repository in repositories:
#     try:
#         client.delete_repository(name=repository.name)
#         print(f"Deleted repository: {repository.name}")
#     except Exception as e:
#         print(f"Error deleting repository {repository.name}: {e}")
#     print(f"Deleted featurestore: {featurestore.name}")

# # [7] metadata
# # Initialize Metadata Service Client
# metadata_client = MetadataServiceClient(client_options={"api_endpoint": f"{REGION}-aiplatform.googleapis.com"})

# def delete_artifacts(metadata_store_id="default"):  # Use 'default' for the default store
#     parent = f"projects/{PROJECT_ID}/locations/{REGION}/metadataStores/{metadata_store_id}"
#     artifacts = metadata_client.list_artifacts(parent=parent)

#     for artifact in artifacts:
#         metadata_client.delete_artifact(name=artifact.name)
#         print(f"Deleted artifact: {artifact.name}")
       
# delete_artifacts()