# Introduction

Recommender systems are well-studied and proven to provide tremendous values to internet businesses and their consumers.In this project, I use amazon sagemaker to build a movie recommendation system, which can provide top 5 recommendation movies for users.


I first created a Factorization Matrix model, and then extend it to a knn model using Amazon sagemaker's built-in algorithm. I deployed the knn model using batch transformation, which gives resultd for all users.

I then tried to use ALS in Pyspark millib to build another recommendation model. After searching online, I found that sagemaker has no built-in image for ALS, so I need to use docker to build my own algorithm, upload the container on EC2, and run it on sagemaker. I did not managed to have the docker successfully run in the sagemaker, but I still want to include the code that I use to build the ALS recommendor.

# Content
* [1. Binary Classifier: Factorization Machines](#1.-Binary-Classifier:-Factorization-Machines)
    * [1.1 Set up environment ](#1.1-Set-up-environment)
    * [1.2 Data Preparation ](#1.2-Data-Preparation)
    * [1.3 Build a Factorization Machine (FM) Model ](#1.3-Build-a-Factorization-Machine-(FM)-Model)
    * [1.4 Convert to Protobuf ](#1.4-Convert-to-Protobuf)
    * [1.5 Run the job and save it on S3 ](#1.5-Run-the-job-and-save-it-on-S3)    
* [2. KNN model](#2.-KNN-model)
    * [2.1 Download the model data ](#2.1-Download-the-model-data)
    * [2.2 Extract model data to create user and item latent matrices ](#2.2-Extract-model-data-to-create-user-and-item-latent-matrices)
    * [2.3 Build K-Nearest-Neighbour Model ](#2.3-Build-K-Nearest-Neighbour-Model)
    * [2.4 Deploy with Batch Transformation ](#2.4-Deploy-with-Batch-Transformation)
    * [2.5 User predictions ](#2.5-User-predictions)       
* [3. ALS Model](#3.-ALS-Model)
    * [3.1 Use Sagemaker Pyspark SDK](#3.1-Use-Sagemaker-Pyspark-SDK)
    * [3.2 ALS training](#3.2-ALS-training)
    * [3.3 ALS model selection and Evaluation](#3.3-ALS-model-selection-and-Evaluation) 
    * [3.4 Model Testing](#3.4-Model-Testing)

## Binary Classifier: Factorization Machines

### Set up environment

In [1]:
# import libraries
import boto3, re, sys, math, json, os, io, sagemaker, urllib.request
from sagemaker import get_execution_role
import numpy as np                                
import pandas as pd                                                          
from time import gmtime, strftime                 
from sagemaker.predictor import csv_serializer
import sagemaker.amazon.common as smac
from sagemaker.amazon.amazon_estimator import get_image_uri
from scipy.sparse import lil_matrix

# Define IAM role
role = get_execution_role()

containers = {'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/factorization-machines:latest',
              'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/factorization-machines:latest',
              'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/factorization-machines:latest',
              'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/factorization-machines:latest'}

# set the region of the instance
my_region = boto3.session.Session().region_name
print("Success - the MySageMakerInstance is in the " + my_region + " region. You will use the " + containers[my_region] + " container for your SageMaker endpoint.")

Success - the MySageMakerInstance is in the us-east-2 region. You will use the 404615174143.dkr.ecr.us-east-2.amazonaws.com/factorization-machines:latest container for your SageMaker endpoint.


### Data Preparation
Fetch the data from Movielens.

In [3]:
#download data
!wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip

--2020-04-06 06:51:44--  http://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip.1’


2020-04-06 06:51:45 (5.38 MB/s) - ‘ml-100k.zip.1’ saved [4924029/4924029]

Archive:  ml-100k.zip
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base         
  inflating:

The ml-100k file contains lots of movie datasets. In this section, I will use ua.base to train a Factorization Machines (FM) Model and ua.test for testing. Before loading the dataset, I would like to shuffle it.

In [2]:
%cd ml-100k
!shuf ua.base -o ua.base.shuffled

/home/ec2-user/SageMaker/ml-100k


Load training and test data to instance.

In [3]:
# Load training data
user_movie_ratings_train = pd.read_csv('ua.base.shuffled', sep='\t', index_col=False, 
                 names=['user_id' , 'movie_id' , 'rating'])
user_movie_ratings_train.head(5)

Unnamed: 0,user_id,movie_id,rating
0,198,237,2
1,460,137,5
2,458,762,3
3,289,125,2
4,450,781,4


In [4]:
# Load test data
user_movie_ratings_test = pd.read_csv('ua.test', sep='\t', index_col=False, 
                 names=['user_id' , 'movie_id' , 'rating'])
user_movie_ratings_test.head(5)

Unnamed: 0,user_id,movie_id,rating
0,1,20,4
1,1,33,4
2,1,61,4
3,1,117,3
4,1,155,2


In [5]:
# Preview of the dataset
nb_users= user_movie_ratings_train['user_id'].max()
nb_movies=user_movie_ratings_train['movie_id'].max()
nb_features=nb_users+nb_movies
nb_ratings_test=len(user_movie_ratings_test.index)
nb_ratings_train=len(user_movie_ratings_train.index)
print(" # of users: ", nb_users)
print(" # of movies: ", nb_movies)
print(" Training Count: ", nb_ratings_train)
print(" Test Count: ", nb_ratings_test)
print(" Features (# of users + # of movies): ", nb_features)

 # of users:  943
 # of movies:  1682
 Training Count:  90570
 Test Count:  9430
 Features (# of users + # of movies):  2625


### Build a Factorization Machine (FM) Model
We can start the work by building a simple model, Factorization Machine, a binary recommendor, which tells you whether the user like it or not. To build a one-hot encoded sparse matrix as input for FM model, we define ratings over 4 and above are considered as 1, and ratings 3 and below as 0.

In [6]:
def loadDataset(df, lines, columns):
    # Features are one-hot encoded in a sparse matrix
    X = lil_matrix((lines, columns)).astype('float32')
    # Labels are stored in a vector
    Y = []
    line=0
    for index, row in df.iterrows():
            X[line,row['user_id']-1] = 1
            X[line, nb_users+(row['movie_id']-1)] = 1
            if int(row['rating']) >= 4:
                Y.append(1)
            else:
                Y.append(0)
            line=line+1

    Y=np.array(Y).astype('float32')            
    return X,Y


X_train, Y_train = loadDataset(user_movie_ratings_train, nb_ratings_train, nb_features)
X_test, Y_test = loadDataset(user_movie_ratings_test, nb_ratings_test, nb_features)

In [7]:
print(X_train.shape)
print(Y_train.shape)
assert X_train.shape == (nb_ratings_train, nb_features)
assert Y_train.shape == (nb_ratings_train, )
zero_labels = np.count_nonzero(Y_train)
print("Training labels: %d zeros, %d ones" % (zero_labels, nb_ratings_train-zero_labels))

print(X_test.shape)
print(Y_test.shape)
assert X_test.shape  == (nb_ratings_test, nb_features)
assert Y_test.shape  == (nb_ratings_test, )
zero_labels = np.count_nonzero(Y_test)
print("Test labels: %d zeros, %d ones" % (zero_labels, nb_ratings_test-zero_labels))

(90570, 2625)
(90570,)
Training labels: 49906 zeros, 40664 ones
(9430, 2625)
(9430,)
Test labels: 5469 zeros, 3961 ones


### Convert to Protobuf

In [9]:
bucket = 'movie-recom-sys-0420'
prefix = 'fm'

train_key      = 'train.protobuf'
train_prefix   = '{}/{}'.format(prefix, 'train')

test_key       = 'test.protobuf'
test_prefix    = '{}/{}'.format(prefix, 'test')

output_prefix  = 's3://{}/{}/output'.format(bucket, prefix)

In [10]:
def writeDatasetToProtobuf(X, bucket, prefix, key, d_type, Y=None):
    buf = io.BytesIO()
    if d_type == "sparse":
        smac.write_spmatrix_to_sparse_tensor(buf, X, labels=Y)
    else:
        smac.write_numpy_to_dense_tensor(buf, X, labels=Y)
        
    buf.seek(0)
    obj = '{}/{}'.format(prefix, key)
    boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf)
    return 's3://{}/{}'.format(bucket,obj)
    
fm_train_data_path = writeDatasetToProtobuf(X_train, bucket, train_prefix, train_key, "sparse", Y_train)    
fm_test_data_path  = writeDatasetToProtobuf(X_test, bucket, test_prefix, test_key, "sparse", Y_test)    
  
print("Training data S3 path: ",fm_train_data_path)
print("Test data S3 path: ",fm_test_data_path)
print("FM model output S3 path: {}".format(output_prefix))

Training data S3 path:  s3://movie-recom-sys-0420/fm/train/train.protobuf
Test data S3 path:  s3://movie-recom-sys-0420/fm/test/test.protobuf
FM model output S3 path: s3://movie-recom-sys-0420/fm/output


### Run the job
To run the job on sagemaker, I first creat an Estimator based on the FM container available in my AWS Region. Then, I have to set some FM-specific hyperparameters:

- feature_dim: the number of features in each sample (2,625 in this case).
- predictor_type: ‘binary_classifier'.
- num_factors: the common dimension for the user and item matrices.

After 50 epochs, test accuracy is 69.5% and the F1 score (a typical metric for a binary classifier) is 0.736 (1 indicates a perfect classifier). Not great, but with all that sparse matrix and protobuf excitement.

In [11]:
instance_type='ml.m4.xlarge'
fm = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "factorization-machines"),
                                   get_execution_role(), 
                                   train_instance_count=1, 
                                   train_instance_type=instance_type,
                                   output_path=output_prefix,
                                   sagemaker_session=sagemaker.Session())

fm.set_hyperparameters(feature_dim=nb_features,
                      predictor_type='binary_classifier',
                      mini_batch_size=1000,
                      num_factors=64,
                      epochs=100)

fm.fit({'train': fm_train_data_path, 'test': fm_test_data_path})

2020-04-14 02:33:06 Starting - Starting the training job...
2020-04-14 02:33:07 Starting - Launching requested ML instances...
2020-04-14 02:34:05 Starting - Preparing the instances for training......
2020-04-14 02:34:54 Downloading - Downloading input data......
2020-04-14 02:35:57 Training - Training image download completed. Training in progress.[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
  from numpy.testing import nosetester[0m
[34m[04/14/2020 02:35:59 INFO 139834076616512] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-conf.json: {u'factors_lr': u'0.0001', u'linear_init_sigma': u'0.01', u'epochs': 1, u'_wd': u'1.0', u'_num_kv_servers': u'auto', u'use_bias': u'true', u'factors_init_sigma': u'0.001', u'_log_level': u'info', u'bias_init_method': u'normal', u'linear_init_method': u'normal', u'linear_lr': u'0.001', u'factors_init_method': u'normal', u'_tuni

[34m[2020-04-14 02:36:02.256] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 6, "duration": 882, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:02 INFO 139834076616512] #quality_metric: host=algo-1, epoch=2, train binary_classification_accuracy <score>=0.600428571429[0m
[34m[04/14/2020 02:36:02 INFO 139834076616512] #quality_metric: host=algo-1, epoch=2, train binary_classification_cross_entropy <loss>=0.664291426774[0m
[34m[04/14/2020 02:36:02 INFO 139834076616512] #quality_metric: host=algo-1, epoch=2, train binary_f_1.000 <score>=0.728054626908[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 886.2810134887695, "sum": 886.2810134887695, "min": 886.2810134887695}}, "EndTime": 1586831762.257128, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831761.369665}
[0m
[34m[04/14/2020 02:36:02 INFO 139834076616512] #progress_metric: host=algo-1

[34m[2020-04-14 02:36:11.930] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 34, "duration": 689, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:11 INFO 139834076616512] #quality_metric: host=algo-1, epoch=16, train binary_classification_accuracy <score>=0.725252747253[0m
[34m[04/14/2020 02:36:11 INFO 139834076616512] #quality_metric: host=algo-1, epoch=16, train binary_classification_cross_entropy <loss>=0.585919196831[0m
[34m[04/14/2020 02:36:11 INFO 139834076616512] #quality_metric: host=algo-1, epoch=16, train binary_f_1.000 <score>=0.768979154347[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 691.436767578125, "sum": 691.436767578125, "min": 691.436767578125}}, "EndTime": 1586831771.931336, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831771.239032}
[0m
[34m[04/14/2020 02:36:11 INFO 139834076616512] #progress_metric: host=algo-

[34m[2020-04-14 02:36:22.058] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 64, "duration": 682, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:22 INFO 139834076616512] #quality_metric: host=algo-1, epoch=31, train binary_classification_accuracy <score>=0.733648351648[0m
[34m[04/14/2020 02:36:22 INFO 139834076616512] #quality_metric: host=algo-1, epoch=31, train binary_classification_cross_entropy <loss>=0.555751251556[0m
[34m[04/14/2020 02:36:22 INFO 139834076616512] #quality_metric: host=algo-1, epoch=31, train binary_f_1.000 <score>=0.769885122947[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 685.0631237030029, "sum": 685.0631237030029, "min": 685.0631237030029}}, "EndTime": 1586831782.059601, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831781.373685}
[0m
[34m[04/14/2020 02:36:22 INFO 139834076616512] #progress_metric: host=al

[34m[2020-04-14 02:36:31.606] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 92, "duration": 652, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:31 INFO 139834076616512] #quality_metric: host=algo-1, epoch=45, train binary_classification_accuracy <score>=0.742692307692[0m
[34m[04/14/2020 02:36:31 INFO 139834076616512] #quality_metric: host=algo-1, epoch=45, train binary_classification_cross_entropy <loss>=0.540877646645[0m
[34m[04/14/2020 02:36:31 INFO 139834076616512] #quality_metric: host=algo-1, epoch=45, train binary_f_1.000 <score>=0.774957471143[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 654.0749073028564, "sum": 654.0749073028564, "min": 654.0749073028564}}, "EndTime": 1586831791.60694, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831790.951872}
[0m
[34m[04/14/2020 02:36:31 INFO 139834076616512] #progress_metric: host=alg

[34m[2020-04-14 02:36:41.736] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 122, "duration": 687, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:41 INFO 139834076616512] #quality_metric: host=algo-1, epoch=60, train binary_classification_accuracy <score>=0.745230769231[0m
[34m[04/14/2020 02:36:41 INFO 139834076616512] #quality_metric: host=algo-1, epoch=60, train binary_classification_cross_entropy <loss>=0.530325803987[0m
[34m[04/14/2020 02:36:41 INFO 139834076616512] #quality_metric: host=algo-1, epoch=60, train binary_f_1.000 <score>=0.776198934281[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 689.4540786743164, "sum": 689.4540786743164, "min": 689.4540786743164}}, "EndTime": 1586831801.736684, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831801.046382}
[0m
[34m[04/14/2020 02:36:41 INFO 139834076616512] #progress_metric: host=a

[34m[2020-04-14 02:36:51.760] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 152, "duration": 655, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:36:51 INFO 139834076616512] #quality_metric: host=algo-1, epoch=75, train binary_classification_accuracy <score>=0.747549450549[0m
[34m[04/14/2020 02:36:51 INFO 139834076616512] #quality_metric: host=algo-1, epoch=75, train binary_classification_cross_entropy <loss>=0.522786696549[0m
[34m[04/14/2020 02:36:51 INFO 139834076616512] #quality_metric: host=algo-1, epoch=75, train binary_f_1.000 <score>=0.777873393733[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 657.61399269104, "sum": 657.61399269104, "min": 657.61399269104}}, "EndTime": 1586831811.761323, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831811.102761}
[0m
[34m[04/14/2020 02:36:51 INFO 139834076616512] #progress_metric: host=algo-1,

[34m[2020-04-14 02:37:01.875] [tensorio] [info] epoch_stats={"data_pipeline": "/opt/ml/input/data/train", "epoch": 182, "duration": 668, "num_examples": 91, "num_bytes": 5796480}[0m
[34m[04/14/2020 02:37:01 INFO 139834076616512] #quality_metric: host=algo-1, epoch=90, train binary_classification_accuracy <score>=0.74956043956[0m
[34m[04/14/2020 02:37:01 INFO 139834076616512] #quality_metric: host=algo-1, epoch=90, train binary_classification_cross_entropy <loss>=0.516941042387[0m
[34m[04/14/2020 02:37:01 INFO 139834076616512] #quality_metric: host=algo-1, epoch=90, train binary_f_1.000 <score>=0.77949571376[0m
[34m#metrics {"Metrics": {"update.time": {"count": 1, "max": 671.3879108428955, "sum": 671.3879108428955, "min": 671.3879108428955}}, "EndTime": 1586831821.8766, "Dimensions": {"Host": "algo-1", "Operation": "training", "Algorithm": "factorization-machines"}, "StartTime": 1586831821.204333}
[0m
[34m[04/14/2020 02:37:01 INFO 139834076616512] #progress_metric: host=algo-


2020-04-14 02:37:15 Uploading - Uploading generated training model
2020-04-14 02:37:15 Completed - Training job completed
Training seconds: 141
Billable seconds: 141


The FM model cannot predict several movie_id with the user_id as input, thus we need to further extend this model to satisfy the requirement of this coursework.

## KNN model

To further build on the recommendor, I would like to extend FM model to predict top 5 recommendations using SageMaker's built-in model KNN and Batch Transform.

In [31]:
! pip install mxnet

Collecting mxnet
[?25l  Downloading https://files.pythonhosted.org/packages/81/f5/d79b5b40735086ff1100c680703e0f3efc830fa455e268e9e96f3c857e93/mxnet-1.6.0-py2.py3-none-any.whl (68.7MB)
[K    100% |████████████████████████████████| 68.7MB 704kB/s eta 0:00:01
[?25hCollecting graphviz<0.9.0,>=0.8.1 (from mxnet)
  Downloading https://files.pythonhosted.org/packages/53/39/4ab213673844e0c004bed8a0781a0721a3f6bb23eb8854ee75c236428892/graphviz-0.8.4-py2.py3-none-any.whl
Collecting numpy<2.0.0,>1.16.0 (from mxnet)
[?25l  Downloading https://files.pythonhosted.org/packages/07/08/a549ba8b061005bb629b76adc000f3caaaf881028b963c2e18f811c6edc1/numpy-1.18.2-cp36-cp36m-manylinux1_x86_64.whl (20.2MB)
[K    100% |████████████████████████████████| 20.2MB 2.5MB/s eta 0:00:01
Installing collected packages: graphviz, numpy, mxnet
  Found existing installation: numpy 1.14.3
    Uninstalling numpy-1.14.3:
      Successfully uninstalled numpy-1.14.3
Successfully installed graphviz-0.8.4 mxnet-1.6.0 numpy-1

### Download the model data
Since we have already have our FM model saved in sagemaker, we could download the model data and repackage it to fit a KNN model.

In [12]:
# Define model path
import mxnet as mx
model_file_name = "model.tar.gz"
model_full_path = fm.output_path +"/"+ fm.latest_training_job.job_name +"/output/"+model_file_name
#model_full_path = 's3://movie-recom-sys-0420/fm/output/factorization-machines-2020-04-06-06-55-06-671/output/model.tar.gz'
print("Model Path: ", model_full_path)

Model Path:  s3://movie-recom-sys-0420/fm/output/factorization-machines-2020-04-14-02-33-06-151/output/model.tar.gz


In [13]:
#Download FM model 
import os
%cd ..
os.system('aws s3 cp '+ model_full_path + ' ./')

/home/ec2-user/SageMaker


0

In [14]:
#Extract model file for loading to MXNet
os.system('tar xzvf '+model_file_name)
os.system("unzip -o model_algo-1")
os.system("mv symbol.json model-symbol.json")
os.system("mv params model-0000.params")

0

### Extract model data to create user and item latent matrices

In [15]:
#Extract model data
m = mx.module.Module.load('./model', 0, False, label_names=['out_label'])
V = m._arg_params['v'].asnumpy()
w = m._arg_params['w1_weight'].asnumpy()
b = m._arg_params['w0_weight'].asnumpy()

# item latent matrix - concat(V[i], w[i]).  
knn_item_matrix = np.concatenate((V[nb_users:], w[nb_users:]), axis=1)
knn_train_label = np.arange(1,nb_movies+1)

#user latent matrix - concat (V[u], 1) 
ones = np.ones(nb_users).reshape((nb_users, 1))
knn_user_matrix = np.concatenate((V[:nb_users], ones), axis=1)

### Build K-Nearest-Neighbour Model

We upload the model input data to S3, create a KNN model and save the same. Saving the model will aid in calling batch transform down the line or even deploying it as an end point for real-time inference.

In [26]:
print('KNN train features shape = ', knn_item_matrix.shape)
knn_prefix = 'knn'
knn_output_prefix  = 's3://{}/{}/output'.format(bucket, knn_prefix)
knn_train_data_path = writeDatasetToProtobuf(knn_item_matrix, bucket, knn_prefix, train_key, "dense", knn_train_label)
print('uploaded KNN train data: {}'.format(knn_train_data_path))

nb_recommendations = 5

# set up the estimator
knn = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "knn"),
    get_execution_role(),
    train_instance_count=1,
    train_instance_type=instance_type,
    output_path=knn_output_prefix,
    sagemaker_session=sagemaker.Session())

knn.set_hyperparameters(feature_dim=knn_item_matrix.shape[1], k=nb_recommendations, index_metric="INNER_PRODUCT", predictor_type='classifier', sample_size=200000)
fit_input = {'train': knn_train_data_path}
knn.fit(fit_input)
knn_model_name =  knn.latest_training_job.job_name
print("created model: ", knn_model_name)

# save the model so that we can reference it in the next step during batch inference
sm = boto3.client(service_name='sagemaker')
primary_container = {
    'Image': knn.image_name,
    'ModelDataUrl': knn.model_data,
}

knn_model = sm.create_model(
        ModelName = knn.latest_training_job.job_name,
        ExecutionRoleArn = knn.role,
        PrimaryContainer = primary_container)
print("saved the model")

KNN train features shape =  (1682, 65)
uploaded KNN train data: s3://movie-recom-sys-0420/knn/train.protobuf
2020-04-14 02:50:59 Starting - Starting the training job...
2020-04-14 02:51:00 Starting - Launching requested ML instances......
2020-04-14 02:52:24 Starting - Preparing the instances for training......
2020-04-14 02:53:29 Downloading - Downloading input data
2020-04-14 02:53:29 Training - Downloading the training image......
2020-04-14 02:54:28 Uploading - Uploading generated training model[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
[34m[04/14/2020 02:54:20 INFO 140145955112768] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-conf.json: {u'index_metric': u'L2', u'_tuning_objective_metric': u'', u'_num_gpus': u'auto', u'_log_level': u'info', u'faiss_index_ivf_nlists': u'auto', u'epochs': u'1', u'index_type': u'faiss.Flat', u'_faiss_index_nprobe': u'5',


2020-04-14 02:54:34 Completed - Training job completed
Training seconds: 71
Billable seconds: 71
created model:  knn-2020-04-14-02-50-59-332
saved the model


### Deploy with Batch Transformation
we will use SageMaker's batch transform option to batch predict top X for all the users.

In [28]:
#upload inference data to S3
knn_batch_data_path = writeDatasetToProtobuf(knn_user_matrix, bucket, knn_prefix, train_key, "dense")
print("Batch inference data path: ",knn_batch_data_path)

# Initialize the transformer object
transformer =sagemaker.transformer.Transformer(
    base_transform_job_name="knn",
    model_name=knn_model_name,
    instance_count=1,
    instance_type=instance_type,
    output_path=knn_output_prefix,
    accept="application/jsonlines; verbose=true"
)

# Start a transform job:
transformer.transform(knn_batch_data_path, content_type='application/x-recordio-protobuf')
transformer.wait()


#Download predictions 
results_file_name = "inference_output"
inference_output_file = "knn/output/train.protobuf.out"
s3_client = boto3.client('s3')
s3_client.download_file(bucket, inference_output_file, results_file_name)
with open(results_file_name) as f:
    results = f.readlines()

Batch inference data path:  s3://movie-recom-sys-0420/knn/train.protobuf
........................
.[34mDocker entrypoint called with argument(s): serve[0m
[34mRunning default environment configuration script[0m
[35mDocker entrypoint called with argument(s): serve[0m
[35mRunning default environment configuration script[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded entry point class algorithm.serve.server_config:config_api[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loading entry points[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded request iterator text/csv[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded request iterator application/x-recordio-protobuf[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded request iterator application/json[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded request iterator application/jsonlines[0m
[34m[04/14/2020 03:03:48 INFO 140166981982016] loaded response encoder application/x-recordio

### User predictions

We now have predictions for all users. This could be directly used as an endpoint.
If we want to see top 5 movies for users in the *user_prediction.csv*, we can load the json file into the notebook and see the results.

In [39]:
# User prediction path
user_pred_path = 's3://movie-recom-sys-0420/user_predictions.csv'

# Load data
user_pred = pd.read_csv(user_pred_path, header = None)
user_pred.rename(columns = {0:'user_id'}, inplace = True)
user_pred

Unnamed: 0,user_id
0,198
1,11
2,314
3,184
4,163
5,710
6,881
7,504
8,267
9,653


In [50]:
import json
test_user_idx = list(user_pred.user_id)
for idx in test_user_idx:
    pred = json.loads(results[idx])
    movie_Ids = [int(movie_id) for movie_id in pred['labels']]
    print("Recommended movie Ids for user #{} : {}".format(idx, movie_Ids))

Recommended movie Ids for user #198 : [357, 98, 479, 127, 483]
Recommended movie Ids for user #11 : [22, 79, 50, 174, 64]
Recommended movie Ids for user #314 : [64, 98, 127, 479, 483]
Recommended movie Ids for user #184 : [98, 50, 479, 64, 483]
Recommended movie Ids for user #163 : [125, 332, 328, 117, 121]
Recommended movie Ids for user #710 : [98, 127, 64, 479, 483]
Recommended movie Ids for user #881 : [172, 98, 174, 50, 64]
Recommended movie Ids for user #504 : [332, 328, 117, 121, 300]
Recommended movie Ids for user #267 : [357, 479, 98, 127, 483]
Recommended movie Ids for user #653 : [172, 79, 50, 174, 64]


## ALS Model

### Use Sagemaker Pyspark SDK
The SageMaker PySpark SDK provides a pyspark interface to Amazon Sagemaker, allowing me to train using the Spark Estimator API, host model on Amazon SageMaker, and make predictions using the Spark Transformer API.

In [63]:
import sagemaker_pyspark
from pyspark.sql import SparkSession

classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

In [61]:
# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

### Data preparation
KNN model suffered from popularity bias, cold start problem and scalability issue.Therefore, we try to improve KNN model using ALS model.
#### Load data to RDD

In [64]:
# load data
movie_rating = spark.read.csv("ml-100k/u.data", sep = '\\t')

# preprocess data -- only need ["userId", "movieId", "rating"]
movie_rating = movie_rating.rdd
rating_data = movie_rating.map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
# check three rows
rating_data.take(3)

[(196, 242, 3.0), (186, 302, 3.0), (22, 377, 1.0)]

#### Split data
Now we spilt data into training/test/validation sets using 6/6/2 ratio.

In [65]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed=42)
# cache data
train.cache()
validation.cache()
test.cache()

PythonRDD[17] at RDD at PythonRDD.scala:52

### ALS model selection and Evaluation
With ALS model, we can use **grid search** to find the optimal hyperparameters.

In [16]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=42)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [17]:
import time
# hyper-param config
num_iterations = 5
ranks = [10, 15, 20]
reg_params = [0.01, 0.05, 0.1]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

10 latent factors and regularization = 0.01: validation RMSE is 1.1619561194228456
10 latent factors and regularization = 0.05: validation RMSE is 1.0104148079567132
10 latent factors and regularization = 0.1: validation RMSE is 0.9627032179598456
15 latent factors and regularization = 0.01: validation RMSE is 1.2023807327615241
15 latent factors and regularization = 0.05: validation RMSE is 1.0096420541467912
15 latent factors and regularization = 0.1: validation RMSE is 0.9571341850451307
20 latent factors and regularization = 0.01: validation RMSE is 1.232445293827424
20 latent factors and regularization = 0.05: validation RMSE is 1.0142326998434243
20 latent factors and regularization = 0.1: validation RMSE is 0.9566770702589341

The best model has 20 latent factors and regularization = 0.1
Total Runtime: 24.98 seconds


### Model test
Finally, we make predictions using ALS model and test the error using the out-of-sample data.

In [20]:
# make prediction using test data
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

The out-of-sample RMSE of rating predictions is 0.9539


I did not find built-in algorithms in sagemaker to use ALS. To use ALS in sagemaker may need to build a docker, convert what has been written above to .py file, include inference code and other files into the container, and let sagemaker run the container. I tried several times on this but somehow I did not successfully run data flow. Will look on this if more time is given.