In [None]:
import boto3
import sagemaker


# Setup Inital Parameters

In [3]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = "PipelineModelPackageGroup"
pipeline_name = "RetrievalPipeline"

In [5]:
# S3 bucket pathing
input_reviews_uri = "s3://beer-reviews-models-pb/Rec Automation/Review Data/Initial Data/final_reviews.csv".format(region)

In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# Define the default pipeline parameters
# These parameters can be easily changed when executing a pipeline

# raw input data
raw_s3_input = "s3://beer-reviews-models-pb/Rec Automation/Review Data/Initial Data".format(region)
input_data = ParameterString(name="InputData", default_value=raw_s3_input)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

# status of newly trained model in registry
# PendingManualApproval | Rejected | Approved
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval") 


# training step parameters + hyperparameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.2xlarge")
training_epochs = ParameterString(name="TrainingEpochs", default_value="20")
training_recommendations = ParameterString(name="ReturnRecommendationsNumber", default_value = "500")
training_learning_rate = ParameterString(name="LearningRate", default_value = "0.5")

# model performance step parameters
accuracy_threshold = ParameterFloat(name="AccuracyTop500Threshold", default_value=0.5)

# Processing Job

In [2]:
%%writefile retrieval_preprocessing.py

import argparse
import os
import warnings

import pandas as pd
import numpy as np


def users_test_train_split(df, split_ratio=0.15):
    split_len = int(len(df)*split_ratio)
    test_df = df[:split_len]
    train_df = df[split_len:]
    
    return test_df, train_df


if __name__ == "__main__":
    input_data_path_reviews = os.path.join("/opt/ml/processing/input/reviews", "final_reviews.csv")
    
    train_output_path = os.path.join("/opt/ml/processing/train", "train.csv")
    test_output_path = os.path.join("/opt/ml/processing/test", "test.csv")
    
    print("Reading review input data from {}".format(input_data_path_reviews))
    review_df = pd.read_csv(input_data_path_reviews, index_col="Unnamed: 0")
    
    # Shuffle dataframe
    review_df = review_df.sample(frac=1)
    
    # only get users with at least 5 reviews
    users_with_favorable_ratings = (review_df['username'].value_counts()
                                .loc[lambda x: x>10]
                                .loc[lambda x: x<100]
                                .index.values)
    
    review_df = review_df[review_df['username'].isin(users_with_favorable_ratings)]
    
    #Generate test train split
    test_df, train_df = users_test_train_split(review_df)
    
    
    print("Saving Train Data {}".format(train_output_path))
    train_df.to_csv(train_output_path, header=True, index=True)
    
    print("Saving Test Data {}".format(test_output_path))
    test_df.to_csv(test_output_path, header=True, index=True)
    

Overwriting retrieval_preprocessing.py


In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor

# create the processor for the job + pass in pipeline parameters

sklearn_processing = SKLearnProcessor(
    framework_version="0.20.0", 
    role=role,
    instance_type=processing_instance_type, 
    instance_count=processing_instance_count,
    base_job_name = "retrieval-data-processing"
)

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [11]:
# Define inputs and outputs for the job

processing_inputs = [
            
            ProcessingInput(source=input_reviews_uri, destination="/opt/ml/processing/input/reviews",
                            input_name = "input review data"),
        ]

processing_outputs = [
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ]



In [12]:
# Create the processing step

PROCESSING_SCRIPT_LOCATION = "retrieval_preprocessing.py"
#cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

processing_step = ProcessingStep(
    "ProcessData", 
    processor = sklearn_processing,
    inputs = processing_inputs,
    outputs = processing_outputs,
    code = PROCESSING_SCRIPT_LOCATION
    #cache_config=cache_config)

# Training Job

In [15]:
%%writefile tf_ret_train.py


import tensorflow as tf
import tensorflow_recommenders as tfrs

from typing import Dict, Text
import argparse
import numpy as np
import json
import os
import pandas as pd



# disable tf logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 


def parse_args():
    
    parser = argparse.ArgumentParser()
    
    # Hyperparameters- sent by client passed as command line args to script
    parser.add_argument('--epochs', type=int, default=4)
    parser.add_argument('--learning_rate', type=float, default=0.5)
    parser.add_argument('--returned_recommendations', type=int, default=500)
    
    # data directories
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    
    # model directory - /opt/ml/model   by default for sagemaker
    #parser.add_argument("--model_dir", type=str)
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--hosts", type=list, default=json.loads(os.environ.get("SM_HOSTS")))
    parser.add_argument("--current-host", type=str, default=os.environ.get("SM_CURRENT_HOST"))
    
    return parser.parse_known_args()


def get_train_data(train_dir):
    print("training_directory", train_dir)
    df_train = pd.read_csv(os.path.join(train_dir, 'train.csv'), index_col="Unnamed: 0")

    print('x train: ', np.shape(df_train))
    return df_train



def df_to_tensor(df):
    
    df_beer = df['beer_name'].unique()
    df_beer = pd.DataFrame(df_beer, columns = ['beer_name'])
    
    df_ratings = df[['username', 'beer_name']]
    df_ratings = df_ratings.dropna()
    
    # convert dataframes to tensors
    tf_beer_dict = tf.data.Dataset.from_tensor_slices(dict(df_beer))
    tf_ratings_dict = tf.data.Dataset.from_tensor_slices(dict(df_ratings))
    
    # map rows to a dictionary
    ratings = tf_ratings_dict.map(lambda x: {
        "beer_name": x["beer_name"],
        "username": x["username"]
    })
    beer_list = tf_beer_dict.map(lambda x: x['beer_name'])
    
    print('converted df to tensors')
    return ratings, beer_list


def get_unique_beers_and_users(ratings, beer_list):
    usernames = ratings.map(lambda x: x['username'])
    unique_users = np.unique(np.concatenate(list(usernames.batch(1000))))
    unique_beers = np.unique(np.concatenate(list(beer_list.batch(1000))))

    print("unique users: ", len(unique_users), "unique_beers: ", len(unique_beers))
    return unique_users, unique_beers

    
def test_train_split(ratings, df):
    tf.random.set_seed(42)
    shuffled = ratings.shuffle(len(df), seed=42, reshuffle_each_iteration=False)

    train = shuffled.take(int(len(df)*0.8))
    test = shuffled.skip(int(len(df)*0.8)).take(int(len(df)*0.2))
    print("test data len: ", len(test), "train data len: ", len(train))
    return test, train
    
    
# extend the tfrs class
class BeerRetreival(tfrs.Model):
    def __init__(self):
        super().__init__()
        
        embedding_dims = 32
        self.user_model =  tf.keras.Sequential([
            tf.keras.layers.StringLookup(
                vocabulary= unique_users, mask_token=None),
            tf.keras.layers.Embedding(len(unique_users)+1, embedding_dims)
        ])

        self.beer_model = tf.keras.Sequential([
            tf.keras.layers.StringLookup(
                vocabulary=unique_beers, mask_token=None),
            tf.keras.layers.Embedding(len(unique_beers)+1, embedding_dims)
        ])

        self.task = tfrs.tasks.Retrieval(
                        metrics=tfrs.metrics.FactorizedTopK(
                        candidates=beer_list.batch(128).cache().map(self.beer_model)
                        ))
        
    
    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        user_embeddings = self.user_model(features['username'])
        beer_embeddings = self.beer_model(features['beer_name'])
        return self.task(user_embeddings, beer_embeddings)
        
        

if __name__ == "__main__":

    args, _ = parse_args()
    
    print('Training data location: {}'.format(args.train))
    
    df_train = get_train_data(args.train)
    
    ratings, beer_list = df_to_tensor(df_train)
    unique_users, unique_beers = get_unique_beers_and_users(ratings, beer_list)
    test, train = test_train_split(ratings, df_train)
    
    returned_recommendations = args.returned_recommendations
    epochs = args.epochs
    learning_rate = args.learning_rate

    print('returned reccomendations = {}, epochs = {}, learning rate = {}'
          .format(returned_recommendations, epochs, learning_rate))
    
    # create + train model
    model = BeerRetreival()
    optimizer = tf.keras.optimizers.Adagrad(learning_rate)
    model.compile(optimizer)
    model.fit(train.batch(8192),
             validation_data = test.batch(512),
             validation_freq = 2,
             epochs = epochs,
             verbose = 0)

    # Eval model
    scores = model.evaluate(test.batch(8192), return_dict=True, verbose=0)
    print("top 100 score: ", scores['factorized_top_k/top_100_categorical_accuracy'])

    #save model - need to call first
    brute_force = tfrs.layers.factorized_top_k.BruteForce(model.user_model, k=500)
    brute_force.index_from_dataset(
        beer_list.batch(128).map(lambda beer_name: (beer_name, model.beer_model(beer_name)))
    )

    _ = brute_force(np.array(["pblackburn"]))
    
    if args.current_host == args.hosts[0]:
        
        print("Host arg:", args.hosts[0])
        # save model to an S3 directory with version number '01' in Tensorflow SavedModel Format
        print("Saving Model to: ", args.sm_model_dir)
        tf.saved_model.save(
          brute_force,
          os.path.join(args.sm_model_dir, "01"))
        print("Saved Model to: ", args.sm_model_dir)
        

Overwriting tf_ret_train.py


In [16]:
%%writefile requirements.txt
tensorflow-recommenders
pandas

Overwriting requirements.txt


In [17]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
import time

prefix = 'retrieval-model'
bucket = sagemaker_session.default_bucket()

model_path = f"s3://{bucket}/{prefix}/model/"

hyperparameters = {
                   'epochs': training_epochs, 
                   'returned_recommendations': training_recommendations,
                   'learning_rate': training_learning_rate
                  }

retrieval_estimator = TensorFlow(
                            entry_point = 'tf_ret_train.py',
                            dependencies=['requirements.txt'],                       
                            instance_type = training_instance_type,
                            instance_count = 1,
                            hyperparameters=hyperparameters,
                            role=sagemaker.get_execution_role(),
                            framework_version='2.5',
                            py_version='py37',
                            base_job_name="tensorflow-train-model",
                            output_path = model_path
            )

In [19]:
# NOTE how the input to the training job directly references the output of the previous step.

step_train_model = TrainingStep(
    name="TrainRetrievalModel",
    estimator=retrieval_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

# Validation Step

In [21]:
%%writefile evaluate.py

import os
import json
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile
import tensorflow as tf

if __name__ == "__main__":

    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, "r:gz") as tar:
        tar.extractall("./model")

    model = tf.saved_model.load("./model/01")

    test_path = "/opt/ml/processing/test/"
    test_df = pd.read_csv(test_path+"/test.csv")
    
    # there is probably a better way to do this -
    users = test_df['username'].unique()
    accuracy = []
    
    for i in range(0, len(users)):
        # Get predictions (as tensors)
        _, beers = model(tf.constant([users[i]]))
        # Convert tensors to numpy array
        beer_list = [x.decode('UTF-8') for x in beers[0].numpy()]
        # Get the users selected beers
        true_list = test_df[test_df['username']==users[i]]['beer_name'].values
        user_accuracy = np.isin(true_list, beer_list)
        accuracy.append(np.count_nonzero(user_accuracy)/len(user_accuracy))
        
    top_500_accuracy = np.array(accuracy).mean()
    print("Top 500 Accuracy: ", top_500_accuracy)

    # Available metrics to add to model:
    # https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "multiclass_classification_metrics": {
            "accuracy": {"value": top_500_accuracy, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting evaluate.py


In [22]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor

tf_eval_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version='2.5',
    image_scope="training",
    py_version="py37",
    instance_type=training_instance_type,
)

evaluate_model_processor = ScriptProcessor(
    role=role, 
    image_uri=tf_eval_image_uri, 
    command=['python3'], 
    instance_count=1, 
    instance_type=training_instance_type, 
)


In [23]:
# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

In [24]:
# Use the evaluate_model_processor in a Sagemaker pipelines ProcessingStep.
step_evaluate_model = ProcessingStep(
    name="EvaluateRetrievalPerformance",
    processor=evaluate_model_processor,
    inputs=[
        ProcessingInput(
            source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluate.py",
    property_files=[evaluation_report],
)

# Register The Model

In [26]:
from sagemaker.model import Model
from sagemaker import PipelineModel

tf_model_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version='2.5',
    image_scope="inference",
    py_version="py37",
    instance_type=training_instance_type,
)

tf_model = Model(
    image_uri=tf_model_image_uri,
    model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

pipeline_model = PipelineModel(
    models=[tf_model],
    role=role,
    sagemaker_session=sagemaker_session
)

In [27]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

evaluation_s3_uri = "{}/evaluation.json".format(
    step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

step_register_pipeline_model = RegisterModel(
    name="RegisterRetrievalModel",
    model=pipeline_model,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

### Create condition step -  to check accuracy + conditionally register a model

In [28]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

# Models with a test accuracy lower 0.5 will not be registered with the model registry
# I use the multiclass classification metric in the eval script 
# since I didn't see any metrics for retrieval models, but the metrics could be anything
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_evaluate_model,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
    ),
    right=accuracy_threshold,
)

# Conditionally register the model, otherwise pass
step_cond = ConditionStep(
    name="Top-K-Greater-Than-Condition",
    conditions=[cond_lte],
    if_steps=[step_register_pipeline_model], 
    else_steps=[],
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Define the full pipeline

In [29]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
# The order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        input_data,
        model_approval_status,
        training_epochs,
        training_recommendations,
        training_learning_rate,
        accuracy_threshold,
    ],
    steps=[processing_step, step_train_model, step_evaluate_model, step_cond],
)

# Submit Pipeline to SageMaker + start execution

In [3]:
#pipeline.upsert(role_arn=role)

In [4]:
#execution = pipeline.start()

In [5]:
#execution.wait()