In [35]:
import os
import urllib
import zipfile
import pandas as pd
from pathlib import Path

In [36]:
data_dir = Path.home() / 'data/fastai/lesson4'
model_dir = Path('/tmp/fastai/lesson4')

if not data_dir.is_dir():
    data_dir.mkdir(parents=True)

if not model_dir.is_dir():
    model_dir.mkdir(parents=True)
    
movielens_folder = data_dir / 'ml-latest-small'

Download the dataset if we don't have it locally

In [37]:
fallback_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

if not movielens_folder.is_dir():
    local_zip_path = data_dir / os.path.basename(fallback_url)

    if not local_zip_path.is_file():
        # Download zip file
        urllib.request.urlretrieve(fallback_url, local_zip_path)
    # Unzip file
    with zipfile.ZipFile(local_zip_path, 'r') as z:
        z.extractall(data_directory)

## Data setup

In [38]:
ratings_path = movielens_folder / 'ratings.csv'
ratings = pd.read_csv(ratings_path)
ratings = ratings.drop(['timestamp'], axis=1)
ratings.head()

Unnamed: 0,userId,movieId,rating
0,1,31,2.5
1,1,1029,3.0
2,1,1061,3.0
3,1,1129,2.0
4,1,1172,4.0


In [39]:
len(ratings)

100004

In [40]:
movies_path = movielens_folder / 'movies.csv'
movie_names = pd.read_csv(movies_path, index_col='movieId')['title']

In [41]:
users = ratings['userId'].unique()
movies = movie_names.index

In [42]:
userid2idx = { userId: index for index, userId in enumerate(users) }
movieid2idx = { movieId: index for index, movieId in enumerate(movies) }

In [43]:
# Write index to movie label metadata to model folder for tensorboard to use and display
# Use by loading into tensorboard as described by https://www.tensorflow.org/programmers_guide/embedding#metadata
movie_names.to_csv(model_dir / 'movies.tsv', sep='\t', index=False, header=['MovieName'])

Update movie and user Ids in ratings to be the index so we have a contiguous integer range for embeddings

In [44]:
ratings['userId'] = ratings['userId'].apply(userid2idx.get)
ratings['movieId'] = ratings['movieId'].apply(movieid2idx.get)

In [45]:
ratings.head()

Unnamed: 0,userId,movieId,rating
0,0,30,2.5
1,0,833,3.0
2,0,859,3.0
3,0,906,2.0
4,0,931,4.0


What if we also used the genre?

In [46]:
movies_extd = pd.read_csv(movies_path, index_col='movieId')

In [47]:
x = movies_extd['genres'].str.split('|', expand=True).stack().reset_index(level=1, drop=True).to_frame('genre')
pd.get_dummies(x, prefix='', prefix_sep='', columns=['genre']).groupby(level=0).sum().join(movies_extd['title']).head()

Unnamed: 0_level_0,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,...,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,title
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0,0,1,1,1,1,0,0,0,1,...,0,0,0,0,0,0,0,0,0,Toy Story (1995)
2,0,0,1,0,1,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,Jumanji (1995)
3,0,0,0,0,0,1,0,0,0,0,...,0,0,0,0,1,0,0,0,0,Grumpier Old Men (1995)
4,0,0,0,0,0,1,0,0,1,0,...,0,0,0,0,1,0,0,0,0,Waiting to Exhale (1995)
5,0,0,0,0,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,Father of the Bride Part II (1995)


## Neural Net

In [48]:
import tensorflow as tf

In [15]:
tf.__version__

'1.4.0'

In [16]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(ratings, test_size=0.2)

In [26]:
x_train = train[['userId', 'movieId']]
y_train = train['rating']
x_eval = test[['userId', 'movieId']]
y_eval = test['rating']

In [30]:
train_input_fn = tf.estimator.inputs.pandas_input_fn(
    x_train, 
    y=y_train, 
    target_column='rating', 
    batch_size=128,
    shuffle=True, 
    num_epochs=None)

eval_input_fn = tf.estimator.inputs.pandas_input_fn(
    x_eval, 
    y=y_eval, 
    target_column='rating', 
    shuffle=False, 
    num_epochs=None)

In [31]:
user_embedding = tf.feature_column.embedding_column(
    categorical_column=tf.feature_column.categorical_column_with_identity('userId', num_buckets=len(users), default_value=0), 
    dimension=50, 
    max_norm=1e-4)

movie_embedding = tf.feature_column.embedding_column(
    categorical_column=tf.feature_column.categorical_column_with_identity('movieId', num_buckets=len(movies), default_value=0), 
    dimension=50, 
    max_norm=1e-4)

In [32]:
feature_columns = [user_embedding, movie_embedding]

estimator = tf.estimator.DNNRegressor(
    feature_columns=feature_columns,
    hidden_units=[70], 
    dropout=0.3,
    optimizer=tf.train.AdamOptimizer(learning_rate=0.001),
    model_dir=str(model_dir)
)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/fastai/lesson4', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x12133d278>, '_task_type': 'worker', '_task_id': 0, '_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [34]:
train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=12*len(x_train)//128)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, start_delay_secs=0)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 600 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from /tmp/fastai/lesson4/model.ckpt-5000
INFO:tensorflow:Saving checkpoints for 5001 into /tmp/fastai/lesson4/model.ckpt.
INFO:tensorflow:loss = 127.056, step = 5001
INFO:tensorflow:global_step/sec: 178.97
INFO:tensorflow:loss = 145.584, step = 5101 (0.560 sec)
INFO:tensorflow:global_step/sec: 226.492
INFO:tensorflow:loss = 133.102, step = 5201 (0.442 sec)
INFO:tensorflow:global_step/sec: 228.335
INFO:tensorflow:loss = 190.577, step = 5301 (0.438 sec)
INFO:tensorflow:global_step/sec: 227.743
INFO:tensorflow:loss = 114.187, step = 5401 (0.439 sec)
INFO:tensorflow:global_step/sec: 229.532
INFO:tensorflow:loss = 143.556, step = 5501 (0.435 sec)
INFO:tensorflow:global_step/sec: 229.91
INFO:tensorflow:

## Custom model

A custom model function allows us a little more customisation than the `DNNClassifier` i.e. being able to implement multiple dropout rates throughout the model

In [58]:
def model_fn(features, labels, params, mode):
    is_training = mode == tf.estimator.ModeKeys.TRAIN
    loss, train_op, export_outputs = None, None, None
    
    feature_columns = [user_embedding, movie_embedding]
    
    input_layer = tf.feature_column.input_layer(features, feature_columns)
    
    dropout1 = tf.layers.dropout(input_layer, rate=0.3)
    hidden1 = tf.layers.dense(dropout1, units=70, activation=tf.nn.relu)
    dropout2 = tf.layers.dropout(hidden1, rate=0.75)
    logits = tf.layers.dense(dropout2, units=1)
    logits = tf.squeeze(logits)
    
    predictions = {
        'ratings': logits
    }
    
    if mode in [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL]:
        loss = tf.losses.mean_squared_error(labels, logits)
        optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
        train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        
    if mode == tf.estimator.ModeKeys.PREDICT:
        export_outputs = {
            tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: logits,
            tf.saved_model.signature_constants.REGRESS_OUTPUTS: logits,
            tf.saved_model.signature_constants.PREDICT_OUTPUTS: tf.estimator.export.PredictOutput(predictions)
        }
        
    return tf.estimator.EstimatorSpec(
        mode, 
        predictions=predictions, 
        loss=loss, 
        train_op=train_op, 
        export_outputs=export_outputs)

In [59]:
estimator = tf.estimator.Estimator(model_fn, model_dir=str(model_dir))

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/fastai/lesson4', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x1264203c8>, '_task_type': 'worker', '_task_id': 0, '_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [60]:
train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=12*len(x_train)//128)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, start_delay_secs=0)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 600 secs (eval_spec.throttle_secs) or training is finished.
INFO:tensorflow:Skipping training since max_steps has already saved.
INFO:tensorflow:Starting evaluation at 2017-12-21-03:33:33
INFO:tensorflow:Restoring parameters from /tmp/fastai/lesson4/model.ckpt-7500
INFO:tensorflow:Evaluation [1/100]
INFO:tensorflow:Evaluation [2/100]
INFO:tensorflow:Evaluation [3/100]
INFO:tensorflow:Evaluation [4/100]
INFO:tensorflow:Evaluation [5/100]
INFO:tensorflow:Evaluation [6/100]
INFO:tensorflow:Evaluation [7/100]
INFO:tensorflow:Evaluation [8/100]
INFO:tensorflow:Evaluation [9/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [11/100]
INFO:tensorflow:Evaluation [12/100]
INFO:tensorflow:Evaluation [13/100]
INFO:tensorflow:Evaluation [14/100]
INFO:tensorflow:Evaluation [15/100]
INFO:tensorflow:Evaluation [16/100]
INFO:tensorflo

## Serving

In [23]:
feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)

export_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)

estimator.export_savedmodel('exports', export_input_fn)

INFO:tensorflow:Restoring parameters from /tmp/fastai/lesson4/model.ckpt-12502
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: b"exports/temp-b'1509431920'/saved_model.pb"


b'exports/1509431920'

## Prediction w/Serving

Build the server docker image. Will include outputs from export above

In [None]:
!docker build -t movie-rating-tfserving .

Run the docker image and expose `localhost:8500` for the grpc server

In [5]:
!docker run --rm -d -p 8500:8500 movie-rating-tfserving

db45f0c8097afc7e7fab2d2a0b42c07400db1d6543536113cff10934f42a7af4


Generate the python grpc client if needed

In [17]:
serving_src_root = os.path.expanduser('~/developer/serving')

In [18]:
!python3 -m grpc_tools.protoc -I{serving_src_root} -I{serving_src_root}/tensorflow --python_out={os.getcwd()} --grpc_python_out={os.getcwd()} {serving_src_root}/tensorflow_serving/apis/*.proto

In [15]:
import grpc
import tensorflow as tf
from tensorflow_serving.apis.prediction_service_pb2_grpc import PredictionServiceStub
from tensorflow_serving.apis.predict_pb2 import PredictRequest
from tensorflow_serving.apis.model_pb2 import ModelSpec

In [3]:
channel = grpc.insecure_channel('localhost:8500')
prediction_service = PredictionServiceStub(channel)

In [4]:
_int_feature = lambda value: tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [22]:
def make_request(stub, userId:int, movieId:int):
    request = PredictRequest(model_spec=ModelSpec(name='default', signature_name='serving_default'))

    feature_dict = {
        'userId': _int_feature(userId),
        'movieId': _int_feature(movieId)
    }

    example = tf.train.Example(features=tf.train.Features(feature=feature_dict))
    serialized = example.SerializeToString()

    request.inputs['inputs'].CopyFrom(
        tf.contrib.util.make_tensor_proto(serialized, shape=[1]))

    result_future = stub.Predict.future(request, 5.0)
    
    prediction = result_future.result()
    return prediction

In [50]:
%time fut = make_request(prediction_service, userId=0, movieId=30)

CPU times: user 2.46 ms, sys: 1.07 ms, total: 3.53 ms
Wall time: 3.75 ms


In [51]:
fut.outputs['outputs'].float_val[0]

1.4991036653518677

In [52]:
ratings[(ratings['userId'] == 0)]

Unnamed: 0,userId,movieId,rating
0,0,30,2.5
1,0,833,3.0
2,0,859,3.0
3,0,906,2.0
4,0,931,4.0
5,0,1017,2.0
6,0,1041,2.0
7,0,1047,2.0
8,0,1083,3.5
9,0,1087,2.0


In [72]:
def make_batch_request(stub, userId:int, *movieIds):
    request = PredictRequest(model_spec=ModelSpec(name='default', signature_name='serving_default'))
    
    features = []
    for movieId in movieIds:
        feature_dict = {
            'userId': _int_feature(userId),
            'movieId': _int_feature(movieId)
        }
        example = tf.train.Example(features=tf.train.Features(feature=feature_dict))
        features.append(example.SerializeToString())

    request.inputs['inputs'].CopyFrom(
        tf.contrib.util.make_tensor_proto(features, shape=[len(movieIds)]))

    result_future = stub.Predict.future(request, 5.0)
    
    prediction = result_future.result()
    return prediction

In [74]:
batch_fut = make_batch_request(stub=prediction_service, userId=0, *ratings[(ratings['userId'] == 0)]['movieId'].values)

In [76]:
user_ratings = ratings[(ratings['userId'] == 0)].copy()
user_ratings['predicted_rating'] = batch_fut.outputs['outputs'].float_val
user_ratings

Unnamed: 0,userId,movieId,rating,predicted_rating
0,0,30,2.5,1.499104
1,0,833,3.0,2.314406
2,0,859,3.0,1.206314
3,0,906,2.0,2.585009
4,0,931,4.0,2.260231
5,0,1017,2.0,1.185354
6,0,1041,2.0,3.26863
7,0,1047,2.0,2.005588
8,0,1083,3.5,1.462286
9,0,1087,2.0,1.960999
