# Book Recommender

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import warnings


from keras.layers import Input, Embedding, Flatten, Dot, Dense, Reshape, Concatenate, Dropout
from keras.models import Model, load_model
from keras.callbacks import ModelCheckpoint, TensorBoard, EarlyStopping
from keras import regularizers

from sklearn.model_selection import train_test_split

#warnings.filterwarnings('ignore')
%matplotlib inline
# TIMESTAMP IMPORT MUST BE CHANGED WHEN USING PANDAS 23.0+
#from pandas.lib import Timestamp
# CHANGE TO THE FOLLOWING
from pandas._libs.tslibs.timestamps import Timestamp

##Import book rating data

In [4]:
#use mount point you previously created when setting up the cluster
# Open and read the file from mounted storage
container = "data/5.Recommenders"
inputFilePath = "/dbfs/mnt/{}/{}".format(container, 'ratings.csv') 
dataset = pd.read_csv(inputFilePath)
dataset.head()

Let's have a look at how many data points we have

In [6]:
dataset.shape

Let's check how many users and items we have

In [8]:
n_users = len(dataset.user_id.unique())
print("Number of users: ", n_users)
n_items = len(dataset.book_id.unique())
print("Number of items: ", n_items)

To visualize the model, you can use Netron https://lutzroeder.github.io/netron/ in a browser.
Download the h5 file locally and point Netron to it.

Read book data

In [11]:
inputFilePath = "/dbfs/mnt/{}/{}".format(container, 'books.csv') 
books = pd.read_csv(inputFilePath)
books.head(10)

In [12]:
books.loc[0]

## Add item metadata
We extend the model with data pertaining to the items. This requires numerical embedding of titles

In [14]:
books_subset=books[['id', 'original_title']]
books_subset=books_subset.rename({'id':'book_id'}, axis='columns')

books_subset.head(10)

In [15]:
#create a new dataset by joining the title information
new_dataset=dataset.join(books_subset.set_index('book_id'), on='book_id')
new_dataset.head()
n_metadata=len(new_dataset.original_title.unique())
n_metadata

Embedding only works on integer vectors. We need to encode the titles.

In [17]:
from keras.preprocessing.text import Tokenizer

titles=np.array(new_dataset.original_title)

t=Tokenizer()
titles=[str(i) for i in new_dataset.original_title]
t.fit_on_texts(titles)

vocab_size = len(t.word_index) + 1
# integer encode the documents
encoded_titles = t.texts_to_sequences(titles)

from keras.preprocessing.sequence import pad_sequences
max_length = 10
padded_titles = pad_sequences(encoded_titles, 
                              maxlen=max_length, 
                              padding='post')
padded_titles

In [18]:
# Join the encoded titles to the dataset
new_dataset=new_dataset.join(pd.DataFrame(padded_titles))
new_dataset.head(10)

In [19]:
#split for training
train, test = train_test_split(new_dataset, test_size=0.1, random_state=42)


##Distributed training with Horovod

In [21]:
import time
FUSE_MOUNT_LOCATION = '/dbfs/horovod_keras/'
checkpoint_dir = FUSE_MOUNT_LOCATION + '{}/'.format(time.time())
os.makedirs(checkpoint_dir)

###Build a new hybrid recommender model
Use embedded item metadata in addition to user, item and rating information

In [23]:
def shard_data (train_data, rank=0, size=1):
  train_data=train_data[rank::size]
  return (train_data)

In [24]:
def get_model(n_items, n_users, embedding_size=5):
  item_input = Input(shape=[1], name="Item-Input")
  item_embedding = Embedding(input_dim=n_items+1, output_dim=embedding_size, name="Item-Embedding")(item_input)
  item_vec = Flatten(name="Flatten-Items")(item_embedding)

  user_input = Input(shape=[1], name="User-Input")
  user_embedding = Embedding(input_dim=n_users+1, output_dim=embedding_size, name="User-Embedding")(user_input)
  user_vec = Flatten(name="Flatten-Users")(user_embedding)

  metadata_input=Input(shape=[max_length], name="Metadata-Input")
  metadata_embedding = Embedding(input_dim=vocab_size, output_dim=embedding_size, name="Metadata-Embedding")(metadata_input)
  metadata_vec = Flatten(name="Flatten-Metadata")(metadata_embedding)

  # new model with metadata and several regularization techniques to prevent overfitting
  input_vecs = Concatenate()([user_vec, item_vec, metadata_vec])
  input_vecs = Dropout(0.5)(input_vecs)
  x = Dense(128, activation='relu',
            #kernel_regularizer=regularizers.l2(0.01),
            kernel_initializer='random_normal',
            activity_regularizer=regularizers.l1(10e-4) #to prevent overfitting of training data
           )(input_vecs)
  #x = Dropout(0.5)(x)
  #output layer
  y = Dense(1)(x)

  new_model = Model([user_input, item_input, metadata_input], y)
  return (new_model)

In [25]:
## train new model with sampling
nb_epoch = 12
batch_size = 128 # batch is bigger because execution is split across GPUs
outputFilePath = "/dbfs/mnt/{}/{}".format(container, 'hybrid_deep_model.h5')

In [26]:
import keras
import horovod.keras as hvd

from keras import backend as K
import tensorflow as tf

def train_model(train, n_items, n_users):
  # Horovod: initialize Horovod.
  hvd.init()
  
  # Horovod: pin GPU to be used to process local rank (one GPU per process)
  config = tf.ConfigProto()
  config.gpu_options.allow_growth = True
  config.gpu_options.visible_device_list = str(hvd.local_rank())
  K.set_session(tf.Session(config=config))
  
  #get data
  train_data=shard_data(train, hvd.rank(), hvd.size())
  #get model
  new_model=get_model(n_items, n_users, 5)
  
  # Horovod: adjust learning rate based on number of GPUs.
  learning_rate=1.0
  optimizer = keras.optimizers.Adadelta(learning_rate * hvd.size())

  # Horovod: Wrap optimizer with Horovod DistributedOptimizer.
  optimizer = hvd.DistributedOptimizer(optimizer)
  
  #compile model
  new_model.compile(optimizer=optimizer, loss='mean_squared_error')
  
  # Horovod: Broadcast initial variable states from rank 0
  # to all other processes. This is necessary to ensure 
  # consistent initialization of all workers when training is
  # started with random weights or restored from a checkpoint.
  callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
  
  # Horovod: Save checkpoints only on worker 0 to prevent 
  # other workers from overwriting and corrupting them.
  if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint(filepath=outputFilePath, save_best_only=True))

  #more callbacks
  earlystop = EarlyStopping(monitor='val_loss',
                           patience=5,
                           verbose=1,
                           restore_best_weights=True
                           )
  callbacks.append(earlystop)
  # train model
  new_model.fit([train_data.user_id, train_data.book_id, train_data.iloc[:,4:4+max_length]], train_data.rating,
                      epochs=nb_epoch,
                      batch_size=batch_size,
                      shuffle=True,
                      validation_split=0.1,
                      verbose=2,
                      callbacks=callbacks)

In [27]:
from sparkdl import HorovodRunner

hr = HorovodRunner(np=2)
hr.run(train_model, train=train, n_items=n_items, n_users=n_users)

In [28]:
new_model= load_model(outputFilePath)

In [29]:
eval_loss=new_model.evaluate([test.user_id, test.book_id, test.iloc[:, 4:4+max_length]], test.rating)
print("Evaluation loss: ", eval_loss)

Less overfit, better mse overall!