# Movie recommendation on Amazon SageMaker with Factorization Machines

### This notebook is an extention of the notebook from my colleague Julien Simon's blog https://aws.amazon.com/blogs/machine-learning/build-a-movie-recommender-with-factorization-machines-on-amazon-sagemaker/

The original notebook works fine for movielens data. However, you might encounter "ConnectionClosedError" if your data has too many columns (e.g., if you use Netflix recommendation data, number of data columns will exceed 2 million).
The main contribution of this notebook is I passed sparse JSON request format and solved the "ConnectionClosedError" problem. 

Please check out https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-inference.html to see more on data format requirements for inference. 

### Download ml-100k dataset

In [7]:
!wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip

--2019-10-05 13:56:59--  http://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip.1’


2019-10-05 13:57:00 (18.3 MB/s) - ‘ml-100k.zip.1’ saved [4924029/4924029]

Archive:  ml-100k.zip
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base         
  inflating:

In [8]:
%cd ml-100k
!shuf ua.base -o ua.base.shuffled
!head -10 ua.base.shuffled

/home/ec2-user/SageMaker/Recommendation/dlnotebooks/sagemaker/ml-100k
76	1071	3	882606017
749	843	3	878848998
718	289	3	883348391
1	144	4	875073180
781	318	3	879634124
201	202	3	884112747
693	1135	3	875482689
782	905	4	891498791
236	133	5	890116059
374	651	4	880395320


In [9]:
!head -10 ua.test

1	20	4	887431883
1	33	4	878542699
1	61	4	878542420
1	117	3	874965739
1	155	2	878542201
1	160	4	875072547
1	171	5	889751711
1	189	3	888732928
1	202	5	875072442
1	265	4	878542441


In [10]:
import sagemaker
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role
from sagemaker.predictor import json_deserializer

import boto3, csv, io, json
import numpy as np
from scipy.sparse import lil_matrix

### Build training set and test set

In [11]:
nbUsers=943
nbMovies=1682
nbFeatures=nbUsers+nbMovies+1000000

nbRatingsTrain=90570
nbRatingsTest=9430

In [12]:
# For each user, build a list of rated movies.
# We'd need this to add random negative samples.
moviesByUser = {}
for userId in range(nbUsers):
    moviesByUser[str(userId)]=[]
 
with open('ua.base.shuffled','r') as f:
    samples=csv.reader(f,delimiter='\t')
    for userId,movieId,rating,timestamp in samples:
        moviesByUser[str(int(userId)-1)].append(int(movieId)-1) 

In [13]:
def loadDataset(filename, lines, columns):
    # Features are one-hot encoded in a sparse matrix
    X = lil_matrix((lines, columns)).astype('float32')
    # Labels are stored in a vector
    Y = []
    line=0
    with open(filename,'r') as f:
        samples=csv.reader(f,delimiter='\t')
        for userId,movieId,rating,timestamp in samples:
            X[line,int(userId)-1] = 1
            X[line,int(nbUsers)+int(movieId)-1] = 1
            if int(rating) >= 4:
                Y.append(1)
            else:
                Y.append(0)
            line=line+1
            
    Y=np.array(Y).astype('float32')
    return X,Y

In [14]:
X_train, Y_train = loadDataset('ua.base.shuffled', nbRatingsTrain, nbFeatures)
X_test, Y_test = loadDataset('ua.test',nbRatingsTest,nbFeatures)

In [15]:
X_train

<90570x1002625 sparse matrix of type '<class 'numpy.float32'>'
	with 181140 stored elements in LInked List format>

In [16]:
print(X_train.shape)
print(Y_train.shape)
assert X_train.shape == (nbRatingsTrain, nbFeatures)
assert Y_train.shape == (nbRatingsTrain, )
zero_labels = np.count_nonzero(Y_train)
print("Training labels: %d zeros, %d ones" % (zero_labels, nbRatingsTrain-zero_labels))

print(X_test.shape)
print(Y_test.shape)
assert X_test.shape  == (nbRatingsTest, nbFeatures)
assert Y_test.shape  == (nbRatingsTest, )
zero_labels = np.count_nonzero(Y_test)
print("Test labels: %d zeros, %d ones" % (zero_labels, nbRatingsTest-zero_labels))

(90570, 1002625)
(90570,)
Training labels: 49906 zeros, 40664 ones
(9430, 1002625)
(9430,)
Test labels: 5469 zeros, 3961 ones


### Convert to protobuf and save to S3

In [17]:
bucket = '<Your bucket name>'
prefix = 'sagemaker/fm-movielens'

train_key      = 'train.protobuf'
train_prefix   = '{}/{}'.format(prefix, 'train3')

test_key       = 'test.protobuf'
test_prefix    = '{}/{}'.format(prefix, 'test3')

output_prefix  = 's3://{}/{}/output'.format(bucket, prefix)

In [58]:
def writeDatasetToProtobuf(X, Y, bucket, prefix, key):
    buf = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buf, X, Y)
    buf.seek(0)
    obj = '{}/{}'.format(prefix, key)
    boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf)
    return 's3://{}/{}'.format(bucket,obj)
    
train_data = writeDatasetToProtobuf(X_train, Y_train, bucket, train_prefix, train_key)    
test_data  = writeDatasetToProtobuf(X_test, Y_test, bucket, test_prefix, test_key)    
  
print(train_data)
print(test_data)
print('Output: {}'.format(output_prefix))

### Run training job

In [17]:
containers = {'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/factorization-machines:latest',
              'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/factorization-machines:latest',
              'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/factorization-machines:latest',
              'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/factorization-machines:latest'}

In [60]:
fm = sagemaker.estimator.Estimator(containers[boto3.Session().region_name],
                                   get_execution_role(), 
                                   train_instance_count=1, 
                                   train_instance_type='ml.c4.xlarge',
                                   output_path=output_prefix,
                                   sagemaker_session=sagemaker.Session())

fm.set_hyperparameters(feature_dim=nbFeatures,
                      predictor_type='binary_classifier',
                      mini_batch_size=1000,
                      num_factors=64,
                      epochs=100)

fm.fit({'train': train_data, 'test': test_data})

### Deploy model

In [21]:
fm_predictor = fm.deploy(instance_type='ml.c4.xlarge', initial_instance_count=1)

--------------------------------------------------------------------------------------------------!

If you have endpoint already running, use code below. 

In [26]:
#fm_predictor = sagemaker.predictor.RealTimePredictor(endpoint='factorization-machines-2019-10-05-12-57-06-735')

In [48]:
def fm_serializer(data):
    js = {'instances': []}
    for row in data:
        js['instances'].append({'features': row.tolist()})
    #print (js)
    return json.dumps(js)

fm_predictor.content_type = 'application/json'
fm_predictor.serializer = fm_serializer
fm_predictor.deserializer = json_deserializer

In [49]:
def fm_serializer_sparse(data):
    js = {'instances': []}
    for row in data:
        shape = [row.shape[1]]
        keys = row.nonzero()[1].tolist()
        
        values_raw = row.data[0]
        values = [float(i) for i in values_raw] 
        #refer https://stackoverflow.com/questions/27050108/convert-numpy-type-to-python
        #the type in list value row is numpy,float32.
        
        
        data_row = {'data':{'features':{'keys':keys,
                                        'shape': shape,
                                        'values':values
                                       }
                           }
                   }
        #print(data_row)
        js['instances'].append(data_row)
        
    print (js)
    return json.dumps(js)

fm_predictor.content_type = 'application/json'
fm_predictor.deserializer = json_deserializer

### Run predictions

In [52]:
fm_predictor.serializer = fm_serializer
result = fm_predictor.predict(X_test[1000:1010].toarray())
print(result)
print (Y_test[1000:1010])

ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL: "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/factorization-machines-2019-10-05-12-57-06-735/invocations".

You can see we have "connection closed error". That is because payload size is great than 5MB

In [56]:
import sys
sys.getsizeof(X_test[1000:1010].toarray())

40105112

Instead of passing on dense matrix, we pass sparse matrix to predictor, everything works fine. 

In [53]:
fm_predictor.serializer = fm_serializer_sparse
result = fm_predictor.predict(X_test[1000:1010])
print(result)
print (Y_test[1000:1010])

{'instances': [{'data': {'features': {'keys': [100, 1164], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1194], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1223], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1224], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1246], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1311], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1347], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1413], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1538], 'shape': [1002625], 'values': [1.0, 1.0]}}}, {'data': {'features': {'keys': [100, 1771], 'shape': [1002625], 'values': [1.0, 1.0]}}}]}
{'predictions': [{'score': 0.6554774045944214, 'predicted_label': 1.0}, {'score': 0.2011590600