# Creating a Data Pipeline for a Recommender System using Blue Brain Nexus 

In this notebook, you will create a recommendation engine using Blue Brain Nexus. Using some movie rating data,
you will train a collaborative filtering model for movie recommendation using a given matrix factorization class and export the trained model to Elasticsearch. Once exported, 
you can test your recommendations by querying Elasticsearch and displaying the results.

![Movie Recommendation](assets/recommendation.png)

### _Prerequisites_

The notebook assumes you have installed [Nexus SDK](https://github.com/BlueBrain/nexus-python-sdk).


## Overview

You will work through the following steps

1. Set up the Nexus environment in Python 
2. Pull the data from Nexus
3. Prepare the data
4. Train the recommendation model
5. Push the model results to Nexus
6. Show recommendations by querying Nexus

## Step 1: Set up Nexus environment

Blue Brain Nexus provides a Python SDK to facilitate the use of Nexus, including functionalities of authentication and data access, etc. The SDK is available at https://github.com/BlueBrain/nexus-python-sdk.

Here, we assume you have installed the Nexus python SDK. Otherwise, you could pip install the sdk. Then, we will set up the Nexus environment in this notebook with a provided access token. 

In [1]:
#!pip install git+https://github.com/BlueBrain/nexus-python-sdk

Before using the SDK to ingest and access data in Nexus, you should set up your SDK environment.

First, please get your access token through Github.

Then, set the access token and your Nexus deployment endpoint in your SDK configuration.

In [2]:
import nexussdk as nexus

deployment = 'https://sandbox.bluebrainnexus.io/v1'
token = 'YOUR_ACCESS_TOKEN'

nexus.config.set_environment(deployment)
nexus.config.set_token(token)

Try listing the organization to see if the access is okay

In [4]:
nexus.organizations.list()

OrderedDict([('@context',
              ['https://bluebrain.github.io/nexus/contexts/admin.json',
               'https://bluebrain.github.io/nexus/contexts/resource.json',
               'https://bluebrain.github.io/nexus/contexts/search.json']),
             ('_total', 3),
             ('_results',
              [OrderedDict([('@id', 'https://sandbox.bluebrainnexus.io/v1/orgs/amld'),
                            ('@type', 'Organization'),
                            ('description', 'AMLD Workshop'),
                            ('_uuid', '2b8df7c8-238f-476d-8a07-5fbb6c92fa3d'),
                            ('_label', 'amld'),
                            ('_rev', 1),
                            ('_deprecated', False),
                            ('_createdAt', '2019-01-26T20:38:48.311Z'),
                            ('_createdBy',
                             'https://sandbox.bluebrainnexus.io/v1/realms/github/users/bogdanromanx'),
                            ('_updatedAt', '2019-01-26T2

## Step 2: Pull data from Nexus

Now we will start pulling data that has been ingested into your Nexus project previously. 

For building a classical recommendation system using matrix factorization, we will need a user-by-item matrix where nonzero elements of the matrix are ratings that a user has given an item. To do that, we will 

1.   Query all the rating data for building a U-I matrix
2.   Query the movie id data for the recomendation

SPARQL is an RDF query language which is able to retrieve and manipulate data stored in Resource Description Framework (RDF) format. Given that all the movielens data is put into the knowledge graph meaning they are inter-connected, it is straightforward to query the data using SPARQL.

### Setting up the environment for SPARQL
We will first need to install a python wrapper around a SPARQL service. It helps in creating the query URI and, possibly, convert the result into a more manageable format. 

In [None]:
!pip install git+https://github.com/RDFLib/sparqlwrapper

In [5]:
import pandas as pd
from itertools import repeat
import concurrent.futures
import json
import csv
import math
import requests
import time
import numpy as np
import os
from sklearn.metrics import mean_squared_error
from numpy.linalg import solve

np.random.seed(0)

from urllib.parse import urlencode, quote_plus
from collections import OrderedDict
from io import StringIO
from functools import reduce
from SPARQLWrapper import SPARQLWrapper, XML, N3, TURTLE, JSON, POST, POSTDIRECTLY, CSV

We will now set up the SPARQL client based on the Nexus setting.

In [6]:
endpoint = os.path.join(deployment, "views/amld/recommender/graph/sparql/")
headers = {}
headers["Authorization"] ="Bearer {}".format(token)
headers["Content-Type"] ="application/sparql-query"

sparql_client = SPARQLWrapper(endpoint)
sparql_client.addCustomHttpHeader("Content-Type", "application/sparql-query")
sparql_client.addCustomHttpHeader("Authorization","Bearer {}".format(token))
sparql_client.setMethod(POST)
sparql_client.setReturnFormat(JSON)
sparql_client.setRequestMethod(POSTDIRECTLY)

We now define some helper function that can convert the JSON payload from the SPARQL client to DataFrame, which can be later used to build the U-I matrix.

In [9]:
# Convert SPARQL results into a Pandas data frame
def sparql2dataframe(json_sparql_results, df):
    cols = json_sparql_results['head']['vars']
    out = []
    for row in json_sparql_results['results']['bindings']:
        item = []
        for c in cols:
            item.append(row.get(c, {}).get('value'))
        out.append(item)
    df_temp = pd.DataFrame(out, columns=cols)
    return pd.concat([df, df_temp])

#Use a client and send a query
def query(query, sparql_client):
    sparql_client.setQuery(query)
    result_object = sparql_client.query()
    return result_object._convertJSON()
   


### Query the rating data from Nexus

Here we define a function that will first perform a query to the ElasticSearch view in Nexus to fetch the number of the total rating data, which is about 100k in the case of a small Movielens data. Then, we create a SPARQL query that will fill in a dataframe with the fields of ['userId', 'movieId', 'rating']. We perform this query with a limit of 5000 entries per batch and loop over through the 100k data. This is to avoid overloading a large HTTP response. 

In [10]:
# Configure and run query
def load_rating_from_nexus(batch_size=5000):
    
    es_query = {
        "query": {
        "terms" : {"@type":["https://sandbox.bluebrainnexus.io/v1/vocabs/amld/recommender/Rating"]}
        }
    }
    
    es_payload = nexus.views.query_es(org_label="amld", project_label="recommender", \
                                      view_id='documents', query=es_query)
    total_items = es_payload['hits']['total']
        
    batches = math.ceil(float(total_items)/batch_size)
    
    df = pd.DataFrame(columns=['userId', 'movieId', 'rating'])
    
    for i in range(batches): 
        start_idx = i * batch_size
        if i == batches - 1:
            size = total_items % batch_size
        else:
            size = batch_size
            
        sparql_query = """
        PREFIX vocab: <https://sandbox.bluebrainnexus.io/v1/vocabs/amld/recommender/>

        Select ?userId ?movieId ?rating 
         WHERE  {
            ?ratingNode a vocab:Rating.
            ?ratingNode vocab:movieId ?movieId.
            ?ratingNode vocab:rating ?rating.
            ?ratingNode vocab:userId ?userId.
        }
        ORDER BY ASC(?userId) ASC (?movieId)
        OFFSET %start_idx%
        LIMIT %batch_size%""".replace('%start_idx%', str(start_idx)).replace('%batch_size%', str(size))
                
        results = query(sparql_query, sparql_client)
        df = sparql2dataframe(results, df)
            
    return df

We now will start loading the data. Depending on the infrastructure of the Nexus deployment, this might take up to a few minutes. After that, we will verify the shape of the data is correct.

In [11]:
df_rating = load_rating_from_nexus()
df_rating.shape

Now we will retrieve the movie id information using the same way. The retrieved data will be store in a dataframe with fields ['movieId', 'title'].

In [15]:
# Configure and run query
def load_movie_from_nexus(batch_size=5000):
    
    es_query = {
        "query": {
        "terms" : {"@type":["https://sandbox.bluebrainnexus.io/v1/vocabs/amld/recommender/Movie"]}
        }
    }
    
    es_payload = nexus.views.query_es(org_label="amld", project_label="recommender", \
                                      view_id='documents', query=es_query)
    total_items = es_payload['hits']['total']
        
    batches = math.ceil(float(total_items)/batch_size)
    
    df = pd.DataFrame(columns=['movieId', 'title'])
    
    for i in range(batches): 
        start_idx = i * batch_size
        if i == batches - 1:
            size = total_items % batch_size
        else:
            size = batch_size
            
        sparql_query = """
        PREFIX vocab: <https://sandbox.bluebrainnexus.io/v1/vocabs/amld/recommender/>

        Select ?movieId  ?title
         WHERE  {
            ?movieNode a vocab:Movie.
            ?movieNode vocab:movieId ?movieId.
            ?movieNode vocab:title ?title.

        }
        ORDER BY ASC(?movieId) ASC (?title)
        OFFSET %start_idx%
        LIMIT %batch_size%""".replace('%start_idx%', str(start_idx)).replace('%batch_size%', str(size))
        
        results = query(sparql_query, sparql_client)
        df = sparql2dataframe(results, df)
            
    return df

In [16]:
df_movie = load_movie_from_nexus()
df_movie.shape

(8633, 2)

## Step 3: Prepare the data

To get the recommendation right, we must construct and transform the data correctly. This is usually a very important step so that you are sure your machine learning algorithm is consuming the correct data in a good way.

In the case of collaborative filtering using matrix factorization, the preparation of the data contains the following steps:

- As in the U-I matrix we will have the user id and the item id as incremental integers, we will assign a unique number between (0, #users) to each user and do the same for movies. The mapping between the id in the U-I matrix will be stored, which can be further used to query the recommendation. 


- Then, we will create the U-I matrix by assigning each user's rating to each movie on a zero matrix created using numpy.


- Finally, we will split the data into training and testing. This is done by removing 10 ratings for each user and assign them to the test set. 


In [17]:
movie_mapping = dict( enumerate(df_rating.movieId.astype('category').cat.categories) )
inv_movie_mapping = {v: k for k, v in movie_mapping.items()}

df_rating.userId = df_rating.userId.astype('category').cat.codes.values
df_rating.movieId = df_rating.movieId.astype('category').cat.codes.values

In [18]:
n_users = df_rating.userId.unique().shape[0]
n_items = df_rating.movieId.unique().shape[0]

# Create r_{ui}, our ratings matrix
ratings = np.zeros((n_users, n_items))
for row in df_rating.itertuples():
    ratings[row[1]-1, row[2]-1] = row[3]

We also need to provide a map from movie id to title for the recommendation system.

In [20]:
idx_to_movie = {}
movie_list = df_movie.movieId.unique()
for k, v in movie_mapping.items():
    movie_v = str(int(float(v)))
    if movie_v in movie_list:
        idx_to_movie[k] = df_movie[df_movie.movieId==movie_v].title.values[0]

## Step 4: Train a recommmender model on the ratings data

Your data is now prepared as a U-I matrix and you will use it to build a collaborative filtering recommendation model.

Collaborative filtering is a recommendation approach that is effectively based on the "wisdom of the crowd". It makes the assumption that, if two people share similar preferences, then the things that one of them prefers could be good recommendations to make to the other. In other words, if user A tends to like certain movies, and user B shares some of these preferences with user A, then the movies that user A likes, that user B has not yet seen, may well be movies that user B will also like.

In a similar manner, we can think about items as being similar if they tend to be rated highly by the same people, on average.

Hence these models are based on the combined, collaborative preferences and behavior of all users in aggregate. They tend to be very effective in practice (provided you have enough preference data to train the model). The ratings data you have is a form of explicit preference data, perfect for training collaborative filtering models. 

Matrix factorization (MF) is a classical method to perform collaborative filtering model. The core idea of MF is to represent the ratings as a user-item ratings matrix. In the diagram below you will see this matrix on the left (with users as rows and movies as columns). The entries in this matrix are the ratings given by users to movies. 

You may also notice that the matrix has missing entries because not all users have rated all movies. In this situation we refer to the data as sparse.

![](assets/collaborative_filtering.png)

MF methods aim to find two much smaller matrices (one representing the users and the other the items) that, when multiplied together, re-construct the original ratings matrix as closely as possible. This is know as factorizing the original matrix, hence the name of the technique.

The two smaller matrices are called factor matrices (or latent features). The user and movie factor matrices are illustrated on the right in the diagram above. The idea is that each user factor vector is a compressed representation of the user's preferences and behavior. Likewise, each item factor vector is a compressed representation of the item. Once the model is trained, the factor vectors can be used to make recommendations, which is what you will do in the following sections.

The optimization of the MF can be done using different methods. In this example, we will use 2 popular methods:

- Alternating Least Squares (ALS)

- Stochastic Gradient Descent (SGD)


Further reading:

[Explicit Matrix Factorization: ALS, SGD, and All That Jazz](https://www.ethanrosenthal.com/2016/01/09/explicit-matrix-factorization-sgd-als/)

[ALS Implicit Collaborative Filtering
](https://medium.com/radon-dev/als-implicit-collaborative-filtering-5ed653ba39fe)


Below we have provided an explicit MF class adapted from the example of an amazing [tutorial](https://www.ethanrosenthal.com/2016/01/09/explicit-matrix-factorization-sgd-als/) by Ethan Rosenthal, which can perform MF with both ALS and SGD optimizations.

In [21]:
from sklearn.metrics import mean_squared_error

def get_mse(pred, actual):
    # Ignore nonzero terms.
    pred = pred[actual.nonzero()].flatten()
    actual = actual[actual.nonzero()].flatten()
    return mean_squared_error(pred, actual)

In [22]:
class ExplicitMF():
    def __init__(self, 
                 ratings,
                 n_factors=40,
                 learning='sgd',
                 item_fact_reg=0.0, 
                 user_fact_reg=0.0,
                 item_bias_reg=0.0,
                 user_bias_reg=0.0,
                 verbose=False):
        """
        Train a matrix factorization model to predict empty 
        entries in a matrix. The terminology assumes a 
        ratings matrix which is ~ user x item
        
        Params
        ======
        ratings : (ndarray)
            User x Item matrix with corresponding ratings
        
        n_factors : (int)
            Number of latent factors to use in matrix 
            factorization model
        learning : (str)
            Method of optimization. Options include 
            'sgd' or 'als'.
        
        item_fact_reg : (float)
            Regularization term for item latent factors
        
        user_fact_reg : (float)
            Regularization term for user latent factors
            
        item_bias_reg : (float)
            Regularization term for item biases
        
        user_bias_reg : (float)
            Regularization term for user biases
        
        verbose : (bool)
            Whether or not to printout training progress
        """
        
        self.ratings = ratings
        self.n_users, self.n_items = ratings.shape
        self.n_factors = n_factors
        self.item_fact_reg = item_fact_reg
        self.user_fact_reg = user_fact_reg
        self.item_bias_reg = item_bias_reg
        self.user_bias_reg = user_bias_reg
        self.learning = learning
        if self.learning == 'sgd':
            self.sample_row, self.sample_col = self.ratings.nonzero()
            self.n_samples = len(self.sample_row)
        self._v = verbose

    def als_step(self,
                 latent_vectors,
                 fixed_vecs,
                 ratings,
                 _lambda,
                 type='user'):
        """
        One of the two ALS steps. Solve for the latent vectors
        specified by type.
        """
        if type == 'user':
            # Precompute
            YTY = fixed_vecs.T.dot(fixed_vecs)
            lambdaI = np.eye(YTY.shape[0]) * _lambda

            for u in range(latent_vectors.shape[0]):
                latent_vectors[u, :] = solve((YTY + lambdaI), 
                                             ratings[u, :].dot(fixed_vecs))
        elif type == 'item':
            # Precompute
            XTX = fixed_vecs.T.dot(fixed_vecs)
            lambdaI = np.eye(XTX.shape[0]) * _lambda
            
            for i in range(latent_vectors.shape[0]):
                latent_vectors[i, :] = solve((XTX + lambdaI), 
                                             ratings[:, i].T.dot(fixed_vecs))
        return latent_vectors

    def train(self, n_iter=10, learning_rate=0.1):
        """ Train model for n_iter iterations from scratch."""
        # initialize latent vectors        
        self.user_vecs = np.random.normal(scale=1./self.n_factors,\
                                          size=(self.n_users, self.n_factors))
        self.item_vecs = np.random.normal(scale=1./self.n_factors,
                                          size=(self.n_items, self.n_factors))
        
        if self.learning == 'als':
            self.partial_train(n_iter)
        elif self.learning == 'sgd':
            self.learning_rate = learning_rate
            self.user_bias = np.zeros(self.n_users)
            self.item_bias = np.zeros(self.n_items)
            self.global_bias = np.mean(self.ratings[np.where(self.ratings != 0)])
            self.partial_train(n_iter)
    
    
    def partial_train(self, n_iter):
        """ 
        Train model for n_iter iterations. Can be 
        called multiple times for further training.
        """
        ctr = 1
        while ctr <= n_iter:
            if ctr % 10 == 0 and self._v:
                print ('\tcurrent iteration: {}'.format(ctr))
            if self.learning == 'als':
                self.user_vecs = self.als_step(self.user_vecs, 
                                               self.item_vecs, 
                                               self.ratings, 
                                               self.user_fact_reg, 
                                               type='user')
                self.item_vecs = self.als_step(self.item_vecs, 
                                               self.user_vecs, 
                                               self.ratings, 
                                               self.item_fact_reg, 
                                               type='item')
            elif self.learning == 'sgd':
                self.training_indices = np.arange(self.n_samples)
                np.random.shuffle(self.training_indices)
                self.sgd()
            ctr += 1

    def sgd(self):
        for idx in self.training_indices:
            u = self.sample_row[idx]
            i = self.sample_col[idx]
            prediction = self.predict(u, i)
            e = (self.ratings[u,i] - prediction) # error
            
            # Update biases
            self.user_bias[u] += self.learning_rate * \
                                (e - self.user_bias_reg * self.user_bias[u])
            self.item_bias[i] += self.learning_rate * \
                                (e - self.item_bias_reg * self.item_bias[i])
            
            #Update latent factors
            self.user_vecs[u, :] += self.learning_rate * \
                                    (e * self.item_vecs[i, :] - \
                                     self.user_fact_reg * self.user_vecs[u,:])
            self.item_vecs[i, :] += self.learning_rate * \
                                    (e * self.user_vecs[u, :] - \
                                     self.item_fact_reg * self.item_vecs[i,:])
    def predict(self, u, i):
        """ Single user and item prediction."""
        if self.learning == 'als':
            return self.user_vecs[u, :].dot(self.item_vecs[i, :].T)
        elif self.learning == 'sgd':
            prediction = self.global_bias + self.user_bias[u] + self.item_bias[i]
            prediction += self.user_vecs[u, :].dot(self.item_vecs[i, :].T)
            return prediction
    
    def predict_all(self):
        """ Predict ratings for every user and item."""
        predictions = np.zeros((self.user_vecs.shape[0], 
                                self.item_vecs.shape[0]))
        for u in range(self.user_vecs.shape[0]):
            for i in range(self.item_vecs.shape[0]):
                predictions[u, i] = self.predict(u, i)
                
        return predictions
    
    def calculate_learning_curve(self, iter_array, test, learning_rate=0.1):
        """
        Keep track of MSE as a function of training iterations.
        
        Params
        ======
        iter_array : (list)
            List of numbers of iterations to train for each step of 
            the learning curve. e.g. [1, 5, 10, 20]
        test : (2D ndarray)
            Testing dataset (assumed to be user x item).
        
        The function creates two new class attributes:
        
        train_mse : (list)
            Training data MSE values for each value of iter_array
        test_mse : (list)
            Test data MSE values for each value of iter_array
        """
        iter_array.sort()
        self.train_mse =[]
        self.test_mse = []
        iter_diff = 0
        for (i, n_iter) in enumerate(iter_array):
            if self._v:
                print ('Iteration: {}'.format(n_iter))
            if i == 0:
                self.train(n_iter - iter_diff, learning_rate)
            else:
                self.partial_train(n_iter - iter_diff)

            predictions = self.predict_all()

            self.train_mse += [get_mse(predictions, self.ratings)]
            self.test_mse += [get_mse(predictions, test)]
            if self._v:
                print ('Train mse: ' + str(self.train_mse[-1]))
                print ('Test mse: ' + str(self.test_mse[-1]))
            iter_diff = n_iter

We will first train an ALS model. The idea of ALS is that, we first hold one set of latent vectors constant. We then take the derivative of the loss function with respect to the other set of vectors. We set the derivative equal to zero  and solve for the non-constant vectors. With these new, solved-for user vectors in hand, we hold them constant, instead, and take the derivative of the loss function with respect to the previously constant vectors (the item vectors). We alternate back and forth and carry out this two-step dance until convergence. 

In [23]:
als_model = ExplicitMF(ratings, n_factors=20, learning='als', \
                            item_fact_reg=0.01, user_fact_reg=0.01)
als_model.train(50)

user_vec_als = als_model.user_vecs
movie_vec_als = als_model.item_vecs

The second model is the SGD model. The idea is also to take derivatives of the loss function. But instead we take the derivative with respect to each variable in the model. The “stochastic” aspect of the algorithm involves taking the derivative and updating feature weights one individual sample at a time. 

In [25]:
sgd_model = ExplicitMF(ratings, n_factors=40, learning='sgd', \
                            item_fact_reg=0.01, user_fact_reg=0.01, \
                            user_bias_reg=0.01, item_bias_reg=0.01)
sgd_model.train(20, learning_rate=0.001)

user_vec_sgd = sgd_model.user_vecs
movie_vec_sgd = sgd_model.item_vecs

We now save the embedding matrices into local files

In [27]:
np.save('sgd_vec.npy', movie_vec_sgd.astype(np.float16))
np.save('als_vec.npy', movie_vec_als.astype(np.float16))

## Step 5: Push the embedding matrix to Nexus

Now that we have the models trained, we can now push the embedding matrices back to Nexus. 

First, we will create a file that stores the movie embedding matrix of the SGD model in Nexus by using the SDK. We then keep the @id of the created file.

In [58]:
your_org = 'YOUR_ORG'
your_proj = 'YOUR_PROJ'

In [28]:
r = nexus.files.create(org_label=your_org, project_label=your_proj, filepath='als_vec.npy')
als_vec_file_id = r['@id']

We now create a resource of this data linking to the previously pushed file

In [29]:
als_embedding_payload = {
    'modelName': 'ALS',
    'fileId': als_vec_file_id
}
r = nexus.resources.create(org_label=your_org, project_label=your_proj, data=als_embedding_payload)
als_vec_res_id = r['@id']

## Step 6: Show recommendation by querying Nexus

Now that you have loaded your recommendation model into Nexus, we will perform some recommendation by querying the models stored in Nexus.

* fetch the embedding matrix stored in Nexus
* compute the similarity matrix of the movie
* display top k similar movies

We first use the Nexus id of the embedding matrix to fetch the file

In [31]:
r = nexus.files.fetch(org_label=your_org, project_label=your_proj, \
                      file_id = als_vec_file_id, out_filepath='./als_vec.npy')
embedding_mat = np.load('./als_vec.npy')

We can now compute the similarity matrix of the movies for both two outcomes.

In [33]:
def cosine_similarity(vecs):
    sim = vecs.dot(vecs.T)
    norms = np.array([np.sqrt(np.diagonal(sim))])
    return sim / norms / norms.T

In [34]:
sim = cosine_similarity(embedding_mat)

We define a function to retrieve top k movies which are similar to a given movie Id

In [52]:
def display_top_k_movies_name(similarity, mapper, movie_idx, k=5):
    print('The recommended films for user who likes "%s"' % (mapper[movie_idx]))
    
    movie_indices = np.argsort(similarity[movie_idx,:])[::-1]
    images = ''
    k_ctr = 0
    # Start i at 1 to not grab the input movie
    i = 1
    while k_ctr < 5:
        if movie_indices[i] in mapper.keys():
            movie = mapper[movie_indices[i]]
            print(' - ' + movie)
            k_ctr += 1
            i += 1

In [66]:
movie_id = 5 # Heat
display_top_k_movies_name(sim, idx_to_movie, movie_id)

The recommended films for user who likes "Movie 43 (2013)"
 - The Hungover Games (2014)
 - Charlie's Angels: Full Throttle (2003)
 - Behind the Candelabra (2013)
 - Best Defense (1984)
 - Crows Zero (Kurôzu zero) (2007)
