In [18]:
import datetime
import time
import os
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

In [19]:
import sagemaker
sess = sagemaker.Session()
role = sagemaker.get_execution_role()

In [20]:
import boto3
from boto3_utils import *

s3 = boto3.client("s3")
db = boto3.client("dynamodb")
sm = boto3.client("sagemaker")
sfn = boto3.client("stepfunctions")
ssm_client = boto3.client('ssm')

# [Custom] AWS Resource Names

In [21]:
REGION = "ap-southeast-1"

In [22]:
BUCKET_NAME = ssm_client.get_parameter(Name='ngwaf_bucket_name', WithDecryption=False)['Parameter']['Value']

In [23]:
DYNAMO_NAME = ssm_client.get_parameter(Name='ngwaf_dynamodb_table_name', WithDecryption=False)['Parameter']['Value']

In [24]:
NOTEBOOK_INSTANCE_NAME = ssm_client.get_parameter(Name='ngwaf_notebook_name', WithDecryption=False)['Parameter']['Value']

In [25]:
ENDPOINT_NAME = ssm_client.get_parameter(Name='ngwaf_endpoint_name', WithDecryption=False)['Parameter']['Value']

In [26]:
STEP_FUNCTION_ARN = ssm_client.get_parameter(Name='ngwaf_state_machine_arn', WithDecryption=False)['Parameter']['Value']

### DynamoDB Logging

For logging of training jobs:
1. User clicks on "train" with a selected dataset
    - Post to Train API with dataset location in S3
    - Lambda will:
        - Check that there's no pipeline status: Query `job_key=PIPELINE_STATUS` on Dynamo
        - Start a new training job: Add new item
            - job_key = `new_job`
            - status = `training`
        - Update `PIPELINE_STATUS` to busy with job_key
        - Move dataset into `{BUCKET}/_tmp_train` as `data.csv`
        - Trigger notebook bootup
2. At the end of this notebook:
    - Update `new_job` item with status `success`
    - Update `pipeline` status to available
3. If the notebook fails, will be handled in `autostop.py` which runs after notebook has been idle for 5 mins:
    - Will check if the `new_job` status is success. If not, will update `new_job` with status `failure` and update `PIPELINE_STATUS` to available

In [None]:
START_TRAIN_TIME = datetime.datetime.now()

In [None]:
# Get the key of the current job in the pipeline
current_job_key = db.get_item(
    TableName=DYNAMO_NAME,
    Key={"job_key": {"S":"_pipeline_status"}}
)["Item"]["pipeline_job_key"]["S"]
current_job_key

In [None]:
# Logging broad steps to dynamoDB, can help with debugging when job fails
def update_db_step_status(status_str, job_key=current_job_key, table_name=DYNAMO_NAME):
    db.update_item(
        TableName=table_name,
        Key={"job_key": {"S": job_key}},
        AttributeUpdates={"step_status": {"Value": {"S": status_str}, "Action": "PUT"}}
    )
    print(f"Updated dynamo entry step_status to `{status_str}` for key `{job_key}`")

### Find pre-trained models
The **keras** models are stored in the `model.tar.gz` files. 
- Need to unzip them, and then take out the `keras/` folder
    - The other folder (`0001`) is the Tensorflow Serving model (Tensorflow SavedModel format) which is used to deploy Sagemaker endpoints. SavedModel cannot be used to reload/retrain keras models with the architecture inplace.
- Put the entire (unzipped) folder into `{BUCKET_NAME}/_tmp_train` so it can be used in the training container

In [None]:
# os.system("mkdir _tmp")  # at clean up we delete everything
os.mkdir("_tmp")

In [None]:
# # TEMP
# # Manually set the latest_model to be the pretrained model
# baseline_path = "base_model/kaggle_model_test_keras.tar.gz"

# # Download the tar.gz file to local
# with open('_tmp/pretrained_model.tar.gz', 'wb') as data_file:
#     s3.download_fileobj(BUCKET_NAME, baseline_path, data_file)

# # Unzip the tar. Will create `0001` folder (tensorflow serving) and `keras` (what we want) folder
# os.system("tar -xf _tmp/pretrained_model.tar.gz -C _tmp")

# # Upload the keras folder to s3
# sess.upload_data(path="./_tmp/keras_model_kaggle", bucket=BUCKET_NAME, key_prefix="_tmp_train/pretrained_keras")
    
# pretrained = 1

In [None]:
model_objects = s3.list_objects(Bucket=BUCKET_NAME, Prefix="model")['Contents']
pretrained = 0

# Find all model files 
model_files = []
for item in model_objects:
    if 'model.tar.gz' in item['Key']:
        model_files.append(item)

# Find latest and copy over
if len(model_files):
    sorted_models = sorted(model_files, key=lambda x: x['LastModified'], reverse=True)
    latest_model = sorted_models[0]
    print(f"latest model path = {latest_model['Key']}")
    
    # Set pretrained_flag
    pretrained = 1
    
    # Download the tar.gz file to local
    with open('_tmp/pretrained_model.tar.gz', 'wb') as data_file:
        s3.download_fileobj(BUCKET_NAME, latest_model['Key'], data_file)
    print("downloaded latest model to local")
        
    # Unzip the tar. Will create `0001` folder (tensorflow serving) and `keras` (what we want) folder
    os.system("tar -xf _tmp/pretrained_model.tar.gz -C _tmp")
    print("Unzipped the tar")
    
    # Upload the keras folder to s3
    sess.upload_data(path="./_tmp/keras", bucket=BUCKET_NAME, key_prefix="_tmp_train/pretrained_keras")
    print("Uploaded keras version of the model to _tmp_train in S3")

In [None]:
if pretrained == 1:
    update_db_step_status("pretrained model loaded")
else:
    update_db_step_status("no pretrained model found")

### Split into training & testing data

In [None]:
def train_val_test_split(data, train_pct, val_pct, test_pct):
    testval_frac = test_pct / (val_pct + test_pct)

    train, testval = train_test_split(data, test_size=(val_pct + test_pct), random_state=42)
    val, test = train_test_split(testval, test_size=testval_frac, random_state=42)

    return (train, val, test)

In [None]:
# # TEMP: To copy from the train bucket into the sgaemaker bucket. Usually will be handled by lambda.
# s3.copy_object(
#     CopySource="ngwaf-trainbucket/kaggle_v2_finetune_data.csv",
#     Bucket=BUCKET_NAME,
#     Key="_tmp_train/data.csv"
# )

In [None]:
# Download the data from S3
with open('_tmp/data.csv', 'wb') as data_file:
    s3.download_fileobj(BUCKET_NAME, '_tmp_train/data.csv', data_file)

# Split the data
data = pd.read_csv("_tmp/data.csv")
train, val, test = train_val_test_split(data, 0.6, 0.2, 0.2)

In [None]:
write_df_to_s3(train, "training.csv", BUCKET_NAME, "_tmp_train/training.csv")
write_df_to_s3(val, "validation.csv", BUCKET_NAME, "_tmp_train/validation.csv")
write_df_to_s3(test, "testing.csv", BUCKET_NAME, "_tmp_train/testing.csv")

In [None]:
update_db_step_status("train/test/val data uploaded")

In [None]:
# Sleep a bit to make sure stuff is uploaded
time.sleep(10)

### Specify parameters & train model

In [None]:
MODEL_FILE = 'model_v2.py'
MODEL_SOURCE_DIR = "script"
FRAMEWORK_VERSION = "2.8"  # tensorflow version

# Training params, can increase for production
# In the future, can move definition into the API call/dynamo entry
# INSTANCE_TYPE = "local"1
INSTANCE_TYPE = "ml.m5.large"
INSTANCE_COUNT = 1
N_EPOCHS = 20
PATIENCE = 3
BATCH_SIZE = 128

# For logging
VERSION_NAME = "tensorflow-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
VERSION_NAME

In [None]:
from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point=MODEL_FILE, 
                          source_dir=MODEL_SOURCE_DIR,
                          role=role,
                          #model_dir=MODEL_DIR, # this only saves the checkpoints 
                          instance_count=INSTANCE_COUNT, 
                          instance_type=INSTANCE_TYPE,
                          framework_version=FRAMEWORK_VERSION,
                          py_version='py39',
                          script_mode=True,
                          hyperparameters={
                              "epochs": N_EPOCHS,
                              "batch_size": BATCH_SIZE,
                              "early_stop_patience": PATIENCE, 
                              "pretrained": pretrained,
                              "region": REGION,
                              "db_name": DYNAMO_NAME,
                              "job_key": current_job_key
                          },
                         )

In [None]:
update_db_step_status("launching tensorflow training")

In [None]:
tf_estimator.fit(f"s3://{BUCKET_NAME}/_tmp_train")

In [None]:
tf_estimator.model_data

In [None]:
update_db_step_status("exited tensorflow training")

### Copy objects from sagemaker bucket to main bucket

In [None]:
# Get the directory that sagemaker saved it in
# s3://sagemaker-ap-southeast-1-000394774158/tensorflow-training-2022-09-18-09-07-25-649/output/model.tar.gz' 
# Directory changed when using sagemaker training jobs
sm_model_bucket, sm_model_prefix, _ = tf_estimator.model_data.replace("/model.tar.gz", "").replace("s3://", "").split("/")

# Get all the bucket items
# sm_model_objects = s3.list_objects(Bucket=sm_model_bucket, Prefix=sm_model_prefix)['Contents']

# Training Jobs missing other files
# Copying the model.tar.gz
s3.copy_object(
    CopySource=os.path.join(sm_model_bucket, f"{sm_model_prefix}/output/model.tar.gz"),
    Bucket=BUCKET_NAME,
    Key=os.path.join("model", VERSION_NAME, "model.tar.gz")
)

# Copy source.tar.gz
s3.copy_object(
    CopySource=os.path.join(sm_model_bucket, f"{sm_model_prefix}/source/sourcedir.tar.gz"),
    Bucket=BUCKET_NAME,
    Key=os.path.join("model", VERSION_NAME, "source/sourcedir.tar.gz")
)

# for o in sm_model_objects:
#     file_name = o["Key"].split("/")
#     file_name.remove(sm_model_prefix)
#     file_name = "/".join(file_name)
#     copy_source = os.path.join(sm_model_bucket, o["Key"])
#     target_key = os.path.join("model", VERSION_NAME, file_name)
#     print(f"Copying: {file_name}\nfrom: {copy_source}\nto target bucket w filename: {target_key}...\n")
#     s3.copy_object(
#         CopySource=copy_source,
#         Bucket=BUCKET_NAME,
#         Key=target_key
#     )

In [None]:
update_db_step_status("copied objects to sm bucket")

In [None]:
new_artifact_path = "s3://" + os.path.join(BUCKET_NAME, "model", VERSION_NAME, "model.tar.gz")
new_artifact_path

### Deployment

In [None]:
DEPLOYMENT_INSTANCE_TYPE = 'ml.c5.large'
DEPLOYMENT_INSTANCE_COUNT = 1

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.tensorflow.model import TensorFlowModel

In [None]:
# Check if endpoint is already alive
endpoint_exist = False

endpoints = boto3.client("sagemaker").list_endpoints(NameContains=ENDPOINT_NAME)
for ep in endpoints['Endpoints']:
    if ep['EndpointName'] == ENDPOINT_NAME:
        endpoint_exist = True
        break
endpoint_exist

In [None]:
update_db_step_status(f"deployment. endpoint exist = {str(endpoint_exist)}")

If endpoint exists, then we need to
1. First deploy as its own endpoint
2. Then load the existing endpoint as a `Predictor`
3. Update that predictor

In [None]:
from diagnostic_utils import *

In [None]:
if endpoint_exist:
    # 1. Deploy as separate instance
    updated_model = TensorFlowModel(
        model_data = new_artifact_path,
        role=role,
        framework_version=FRAMEWORK_VERSION,
        sagemaker_session=sess,
        name = VERSION_NAME
    )
    updated_predictor = updated_model.deploy(
        initial_instance_count=DEPLOYMENT_INSTANCE_COUNT,
        instance_type=DEPLOYMENT_INSTANCE_TYPE
    )
    print()
    print(f"New model name = {updated_model.name}")
    print(f"Temporary endpoint name = {updated_predictor.endpoint_name}")
    
    # 2. Save diagnostics for model performance
    diagnostic, scores = test_diagnostics(updated_predictor, test)
    write_df_to_s3(diagnostic, VERSION_NAME + "_diagnostic.csv",
                   BUCKET_NAME, os.path.join("model", VERSION_NAME, "diagnostic.csv"))
    write_df_to_s3(scores, VERSION_NAME + "_diagnostic.csv",
                   BUCKET_NAME, os.path.join("model", VERSION_NAME, "scores.csv"))
    
    ## TODO: Only deploy if it is better
    
    # 3. Load existing endpoint
    current_predictor = Predictor(
        endpoint_name = ENDPOINT_NAME,
        sagemaker_session=sess,
    )
    print(f"Before updating, {ENDPOINT_NAME} endpoint is using model {get_model_variant(ENDPOINT_NAME)}")
    
    # 4. Update endpoint
    current_predictor.update_endpoint(
        model_name = updated_model.name,
        initial_instance_count = DEPLOYMENT_INSTANCE_COUNT,
        instance_type = DEPLOYMENT_INSTANCE_TYPE
    )
    print()
    print(f"Updated: {ENDPOINT_NAME} endpoint is now using model {get_model_variant(ENDPOINT_NAME)}")
    
    # 5. Delete the separate endpoint
    temp_endpoint_name = updated_predictor.endpoint_name
    temp_endpoint_config_name = get_config_name(temp_endpoint_name)

    sess.delete_endpoint(updated_predictor.endpoint_name)
    print(f"Cleaned up the temporary endpoint {temp_endpoint_name}")
    sess.delete_endpoint_config(temp_endpoint_config_name)
    print(f"Cleaned up the temporary endpoint config {temp_endpoint_config_name}")

In [None]:
if not endpoint_exist:
    serving_model = TensorFlowModel(
        model_data = new_artifact_path,
        role=role,
        framework_version=FRAMEWORK_VERSION,
        sagemaker_session=sess,
        name=VERSION_NAME
    )
    
    # Delete existing endpoint if exists
    client = boto3.client('sagemaker')
    try:
        response = client.describe_endpoint_config(EndpointConfigName=ENDPOINT_NAME)
        client.delete_endpoint_config(EndpointConfigName=ENDPOINT_NAME)        
    except:
        # Endpoint not exists
        pass

    predictor = serving_model.deploy(
        initial_instance_count=DEPLOYMENT_INSTANCE_COUNT,
        instance_type=DEPLOYMENT_INSTANCE_TYPE,
        endpoint_name=ENDPOINT_NAME
    )

    
    # Diagnostics
    diagnostic, scores = test_diagnostics(predictor, test)
    write_df_to_s3(diagnostic, VERSION_NAME + "_diagnostic.csv",
                   BUCKET_NAME, os.path.join("model", VERSION_NAME, "diagnostic.csv"))
    write_df_to_s3(scores, VERSION_NAME + "_diagnostic.csv",
                   BUCKET_NAME, os.path.join("model", VERSION_NAME, "scores.csv"))

In [None]:
update_db_step_status("endpoints deployed")

### Clean up

In [None]:
# Delete the objects in the _tmp_train bucket
TMP_DIR = "_tmp_train"
tmp_items = s3.list_objects(Bucket=BUCKET_NAME.replace("s3://", ""), Prefix=TMP_DIR)['Contents']

for item in tmp_items:
    if item['Key'] == f"{TMP_DIR}/":
        pass # ignore the common prefix
    else:
        s3.delete_object(
            Bucket=BUCKET_NAME,
            Key=item['Key']
        )
        print(f"Deleted {item['Key']}")

In [None]:
# Delete objects in sagemaker folder
# os.system("rm -r _tmp")
import shutil
shutil.rmtree("_tmp", ignore_errors=True)

In [None]:
update_db_step_status("local sagemaker directory + s3 tmp bucket cleared")

### Post back to DynamoDB

In [None]:
END_TRAIN_TIME = datetime.datetime.now()

train_duration_secs = (END_TRAIN_TIME - START_TRAIN_TIME).total_seconds()

In [None]:
updated_attributes = {
    "status": {"Value": {"S": "success"}, "Action": "PUT"},
    "training_completion_time": {"Value": {"S": time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())},
                                 "Action": "PUT"},
    "training_duration_sec": {"Value": {"N": str(train_duration_secs)}, "Action": "PUT"},
    "model_name": {"Value": {"S": VERSION_NAME}, "Action": "PUT"},
    "model_diagnostics": {
        "Value": build_dynamo_model_diagnostics(diagnostic),
        "Action": "PUT"
    },
    "model_performance": {
        "Value": build_dynamo_model_performance(scores),
        "Action": "PUT"
    }
}

In [None]:
updated_attributes

In [None]:
# Update item
db.update_item(
    TableName=DYNAMO_NAME,
    Key={"job_key": {"S": current_job_key}},
    AttributeUpdates=updated_attributes
)

In [None]:
# Update pipeline status
db.update_item(
    TableName=DYNAMO_NAME,
    Key={"job_key": {"S": "_pipeline_status"}},
    AttributeUpdates={
        "pipeline_available": {"Value": {"BOOL": True}, "Action": "PUT"},
        "pipeline_job_key": {"Value": {"S": ""}, "Action": "PUT"}
    }
)

In [None]:
update_db_step_status("dynamodb updated, notebook shutting down")

In [29]:
# Stop the step function

response = sfn.list_executions(
    stateMachineArn=STEP_FUNCTION_ARN,
    statusFilter='RUNNING'
)

# Extract the latest one (Assuming only one can be running)
executions = response['executions']

if (len(executions) > 0):
    execution_arn = executions[0]['executionArn']
    sfn.stop_execution(
        executionArn=execution_arn,
        cause='notebook successfully completed'
    )

In [None]:
# Shutdown notebook
sm.stop_notebook_instance(NotebookInstanceName=NOTEBOOK_INSTANCE_NAME)

## Other notes
### Proceed to go to `lambda` and input the endpoint name
### Go to API Gateway and
1. Create new API: REST method
2. Create new method -- POST
    - Attach the lambda you created
    - click on the `Use Lambda Proxy integration`. This option makes sure that the data that is sent to the API is then sent directly to the Lambda function with no processing. It also means that the return value must be a proper response object as it will also not be processed by API Gateway.
3. Click on `Actions` --> Deploy API
4.  sure to copy or write down the URL provided to invoke your newly created public API as this will be needed in the next step. This URL can be found at the top of the page, highlighted in blue next to the text Invoke URL.