In [1]:
%store -r
%store

Stored variables and their in-db values:


In [2]:
import sagemaker
from sagemaker.lineage import context, artifact, association, action
import boto3

from model_package_src.inference_specification import InferenceSpecification
import json
import numpy as np
import pandas as pd
import datetime
import time
from scipy.sparse import csr_matrix, hstack, load_npz
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

In [3]:
from sklearn.svm import LinearSVC
from sklearn.svm import SVC
from sklearn import svm
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn import metrics
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.ensemble import BaggingClassifier
from sklearn.metrics import accuracy_score
from sklearn.ensemble import AdaBoostClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

In [4]:
assert sagemaker.__version__ >= "2.21.0"

In [5]:
region = boto3.Session().region_name
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

bucket = 'quicksight-pavan-cjp'

prefix = "personalization"

output_prefix = f"s3://{bucket}/{prefix}/output"

In [6]:
#Prepare Data For Modeling
# load array
X_train = load_npz("./data/X_train.npz")
X_test = load_npz("./data/X_test.npz")
y_train_npzfile = np.load("./data/y_train.npz")
y_test_npzfile = np.load("./data/y_test.npz")
y_train = y_train_npzfile.f.arr_0
y_test = y_test_npzfile.f.arr_0

In [7]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((219519, 9284), (54880, 9284), (219519,), (54880,))

In [8]:
input_dims = X_train.shape[1]
%store input_dims

Stored 'input_dims' (int)


In [9]:
#Train the factorization machine model
container = sagemaker.image_uris.retrieve("factorization-machines", region=boto_session.region_name)

fm = sagemaker.estimator.Estimator(
    container,
    sagemaker_role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    output_path=output_prefix,
    sagemaker_session=sagemaker_session,
)

fm.set_hyperparameters(
    feature_dim=input_dims,
    predictor_type="regressor",
    mini_batch_size=1000,
    num_factors=64,
    epochs=20,
)

In [10]:
if 'training_job_name' not in locals():
    
    fm.fit({'train': 's3://quicksight-pavan-cjp/personalization/train/train.protobuf', 'test': 's3://quicksight-pavan-cjp/personalization/test/test.protobuf'})
    training_job_name = fm.latest_training_job.job_name
    %store training_job_name
    
else:
    print(f'Using previous training job: {training_job_name}')

2022-03-25 09:54:06 Starting - Starting the training job...
2022-03-25 09:54:25 Starting - Preparing the instances for trainingProfilerReport-1648202046: InProgress
......
2022-03-25 09:55:29 Downloading - Downloading input data...
2022-03-25 09:56:01 Training - Downloading the training image...
2022-03-25 09:56:35 Training - Training image download completed. Training in progress..[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
  from collections import Mapping, MutableMapping, Sequence[0m
  """[0m
  """[0m
[34m[03/25/2022 09:56:39 INFO 139646129960768 integration.py:636] worker started[0m
[34m[03/25/2022 09:56:39 INFO 139646129960768] Reading default configuration from /opt/amazon/lib/python3.7/site-packages/algorithm/resources/default-conf.json: {'epochs': 1, 'mini_batch_size': '1000', 'use_bias': 'true', 'use_linear': 'true', 'bias_lr': '0.1', 'linear_lr': '0.001', 'factors_lr': '0.0001', 'bias_wd': '0.01', '

In [11]:
training_job_info = sagemaker_boto_client.describe_training_job(TrainingJobName=training_job_name)

In [12]:
training_data_s3_uri = training_job_info["InputDataConfig"][0]["DataSource"]["S3DataSource"][
    "S3Uri"
]

matching_artifacts = list(
    artifact.Artifact.list(source_uri=training_data_s3_uri, sagemaker_session=sagemaker_session)
)

if matching_artifacts:
    training_data_artifact = matching_artifacts[0]
    print(f"Using existing artifact: {training_data_artifact.artifact_arn}")
else:
    training_data_artifact = artifact.Artifact.create(
        artifact_name="TrainingData",
        source_uri=training_data_s3_uri,
        artifact_type="Dataset",
        sagemaker_session=sagemaker_session,
    )
    print(f"Create artifact {training_data_artifact.artifact_arn}: SUCCESSFUL")

Using existing artifact: arn:aws:sagemaker:us-east-1:519852036875:artifact/6621cbd0d7bf919399afbdeab7a40571


In [13]:
trained_model_s3_uri = training_job_info["ModelArtifacts"]["S3ModelArtifacts"]

matching_artifacts = list(
    artifact.Artifact.list(source_uri=trained_model_s3_uri, sagemaker_session=sagemaker_session)
)

if matching_artifacts:
    model_artifact = matching_artifacts[0]
    print(f"Using existing artifact: {model_artifact.artifact_arn}")
else:
    model_artifact = artifact.Artifact.create(
        artifact_name="TrainedModel",
        source_uri=trained_model_s3_uri,
        artifact_type="Model",
        sagemaker_session=sagemaker_session,
    )
    print(f"Create artifact {model_artifact.artifact_arn}: SUCCESSFUL")

Using existing artifact: arn:aws:sagemaker:us-east-1:519852036875:artifact/4c8348d93e65cf398e8a98333c7a9b41


In [14]:
#Set artifact associations
trial_component = sagemaker_boto_client.describe_trial_component(
    TrialComponentName=training_job_name + "-aws-training-job"
)
trial_component_arn = trial_component["TrialComponentArn"]

In [15]:
artifact_list = [[training_data_artifact, "ContributedTo"], [model_artifact, "Produced"]]

for art, assoc in artifact_list:
    try:
        association.Association.create(
            source_arn=art.artifact_arn,
            destination_arn=trial_component_arn,
            association_type=assoc,
            sagemaker_session=sagemaker_session,
        )
        print(f"Association with {art.artifact_type}: SUCCEESFUL")
    except:
        print(f"Association already exists with {art.artifact_type}")

Association already exists with DataSet
Association with Model: SUCCEESFUL


In [16]:
model_name = "retail-recommendations"
model_matches = sagemaker_boto_client.list_models(NameContains=model_name)["Models"]

if not model_matches:
    print(f"Creating model {model_name}")
    model = sagemaker_session.create_model_from_job(
        name=model_name,
        training_job_name=training_job_info["TrainingJobName"],
        role=sagemaker_role,
        image_uri=training_job_info["AlgorithmSpecification"]["TrainingImage"],
    )
else:
    print(f"Model {model_name} already exists.")

Model retail-recommendations already exists.


In [17]:
#SageMaker Model Registry
#Create Model Package Group
if 'mpg_name' not in locals():
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M')
    mpg_name = f'retail-recommendation-{timestamp}'
    %store mpg_name

print(f'Model Package Group name: {mpg_name}')

Stored 'mpg_name' (str)
Model Package Group name: retail-recommendation-2022-03-25-09-58


In [18]:
mpg_input_dict = {
    "ModelPackageGroupName": mpg_name,
    "ModelPackageGroupDescription": "Recommendation for Online Retail Sales",
}

In [19]:
matching_mpg = sagemaker_boto_client.list_model_package_groups(NameContains=mpg_name)[
    "ModelPackageGroupSummaryList"
]

if matching_mpg:
    print(f"Using existing Model Package Group: {mpg_name}")
else:
    mpg_response = sagemaker_boto_client.create_model_package_group(**mpg_input_dict)
    print(f"Create Model Package Group {mpg_name}: SUCCESSFUL")

Create Model Package Group retail-recommendation-2022-03-25-09-58: SUCCESSFUL


In [20]:
model_metrics_report = {"regression_metrics": {}}

for metric in training_job_info["FinalMetricDataList"]:
    stat = {metric["MetricName"]: {"value": metric["Value"]}}
    model_metrics_report["regression_metrics"].update(stat)

with open("training_metrics.json", "w") as f:
    json.dump(model_metrics_report, f)

metrics_s3_key = f"training_jobs/{training_job_info['TrainingJobName']}/training_metrics.json"
s3_client.upload_file(Filename="training_metrics.json",Bucket=bucket, Key=metrics_s3_key)

In [21]:
mp_inference_spec = InferenceSpecification().get_inference_specification_dict(
    ecr_image=training_job_info["AlgorithmSpecification"]["TrainingImage"],
    supports_gpu=False,
    supported_content_types=["application/x-recordio-protobuf", "application/json"],
    supported_mime_types=["text/csv"],
)

mp_inference_spec["InferenceSpecification"]["Containers"][0]["ModelDataUrl"] = training_job_info["ModelArtifacts"]["S3ModelArtifacts"]

In [22]:
model_metrics = {
    "ModelQuality": {
        "Statistics": {
            "ContentType": "application/json",
            "S3Uri": f"s3://{bucket}/{metrics_s3_key}",
        }
    }
}

In [23]:
mp_input_dict = {
    "ModelPackageGroupName": mpg_name,
    "ModelPackageDescription": "Factorization Machine Model to create personalized retail recommendations",
    "ModelApprovalStatus": "PendingManualApproval",
    "ModelMetrics": model_metrics,
}

mp_input_dict.update(mp_inference_spec)
mp_response = sagemaker_boto_client.create_model_package(**mp_input_dict)

In [24]:
mp_info = sagemaker_boto_client.describe_model_package(
    ModelPackageName=mp_response["ModelPackageArn"]
)
mp_status = mp_info["ModelPackageStatus"]

while mp_status not in ["Completed", "Failed"]:
    time.sleep(5)
    mp_info = sagemaker_boto_client.describe_model_package(
        ModelPackageName=mp_response["ModelPackageArn"]
    )
    mp_status = mp_info["ModelPackageStatus"]
    print(f"model package status: {mp_status}")
print(f"model package status: {mp_status}")

model package status: Completed


In [25]:
model_package = sagemaker_boto_client.list_model_packages(ModelPackageGroupName=mpg_name)[
    "ModelPackageSummaryList"
][0]
model_package_update = {
    "ModelPackageArn": model_package["ModelPackageArn"],
    "ModelApprovalStatus": "Approved",
}

update_response = sagemaker_boto_client.update_model_package(**model_package_update)

In [26]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker_session)
display(viz.show(training_job_name=training_job_name))

Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...2-03-25-09-54-06-616/output/model.tar.gz,Input,Model,Produced,artifact
1,s3://...n-cjp/personalization/test/test.protobuf,Input,DataSet,ContributedTo,artifact
2,s3://...cjp/personalization/train/train.protobuf,Input,DataSet,ContributedTo,artifact
3,38241...1.amazonaws.com/factorization-machines:1,Input,Image,ContributedTo,artifact
4,s3://...2-03-25-09-54-06-616/output/model.tar.gz,Output,Model,Produced,artifact


In [40]:
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import JSONSerializer

In [41]:
class FMSerializer(JSONSerializer):
    def serialize(self, data):
        js = {"instances": []}
        for row in data:
            js["instances"].append({"features": row.tolist()})
        return json.dumps(js)


fm_predictor = fm.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    serializer=FMSerializer(),
    deserializer=JSONDeserializer(),
)

-------!

In [29]:
# find customer who spent the most money
df = pd.read_csv("data/online_retail_preprocessed.csv")

df["invoice_amount"] = df["Quantity"] * df["UnitPrice"]
top_customer = (
    df.groupby("CustomerID").sum()["invoice_amount"].sort_values(ascending=False).index[0]
)
print("Customer ID of highest spend:", top_customer)

Customer ID of highest spend: 14646.0


In [35]:
def get_recommendations(df, customer_id, n_recommendations, n_ranks=100):
    popular_items = (
        df.groupby(["StockCode", "UnitPrice"])
        .nunique()["CustomerID"]
        .sort_values(ascending=False)
        .reset_index()
    )
    top_n_items = popular_items["StockCode"].iloc[:n_ranks].values
    top_n_prices = popular_items["UnitPrice"].iloc[:n_ranks].values

    # stock codes can have multiple descriptions, so we will choose whichever description is most common
    item_map = df.groupby("StockCode").agg(lambda x: x.value_counts().index[0])["Description"]

    # find customer's country
    df_subset = df.loc[df["CustomerID"] == customer_id]
    country = df_subset["Country"].value_counts().index[0]

    data = {
        "StockCode": top_n_items,
        "Description": [item_map[i] for i in top_n_items],
        "CustomerID": customer_id,
        "Country": country,
        "UnitPrice": top_n_prices,
    }

    df_inference = pd.DataFrame(data)

    # we need to build the data set similar to how we built it for training
    # it should have the same number of features as the training data
    enc = OneHotEncoder(handle_unknown="ignore")
    onehot_cols = ["StockCode", "CustomerID", "Country"]
    enc.fit(df[onehot_cols])
    onehot_output = enc.transform(df_inference[onehot_cols])

    vectorizer = TfidfVectorizer(min_df=2)
    unique_descriptions = df["Description"].unique()
    vectorizer.fit(unique_descriptions)
    tfidf_output = vectorizer.transform(df_inference["Description"])

    row = range(len(df_inference))
    col = [0] * len(df_inference)
    unit_price = csr_matrix((df_inference["UnitPrice"].values, (row, col)), dtype="float32")

    X_inference = hstack([onehot_output, tfidf_output, unit_price], format="csr")

    result = fm_predictor.predict(X_inference.toarray())
    preds = [i["score"] for i in result["predictions"]]
    index_array = np.array(preds).argsort()
    items = enc.inverse_transform(onehot_output)[:, 0]
    top_recs = np.take_along_axis(items, index_array, axis=0)[: -n_recommendations - 1 : -1]
    recommendations = [[i, item_map[i]] for i in top_recs]
    return recommendations

In [54]:
print("Top 5 recommended products:")
get_recommendations(df, top_customer, n_recommendations=5, n_ranks=10)

Top 5 recommended products:


[['22423', 'REGENCY CAKESTAND 3 TIER'],
 ['85123A', 'WHITE HANGING HEART T-LIGHT HOLDER'],
 ['84879', 'ASSORTED COLOUR BIRD ORNAMENT'],
 ['47566', 'PARTY BUNTING'],
 ['23298', 'SPOTTY BUNTING']]

In [33]:
print("Top 10 customers with highest spend")
get_recommendations(df, top_customer, n_recommendations=10, n_ranks=100)

Top 10 customers with highest spend


[['22423', 'REGENCY CAKESTAND 3 TIER'],
 ['22776', 'SWEETHEART CAKESTAND 3 TIER'],
 ['22624', 'IVORY KITCHEN SCALES'],
 ['85123A', 'WHITE HANGING HEART T-LIGHT HOLDER'],
 ['85099B', 'JUMBO BAG RED RETROSPOT'],
 ['21733', 'RED HANGING HEART T-LIGHT HOLDER'],
 ['22386', 'JUMBO BAG PINK POLKADOT'],
 ['84879', 'ASSORTED COLOUR BIRD ORNAMENT'],
 ['23084', 'RABBIT NIGHT LIGHT'],
 ['23199', 'JUMBO BAG APPLES']]