In [4]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import os

import pandas as pd
from tqdm.autonotebook import tqdm

tqdm.pandas()

  from tqdm.autonotebook import tqdm


# How to use this notebook 
- This notebook describes in words my approach to Task 3: Restaurant Recommendation System 
- It contains some EDA and is a playground for my experiments
- This notebook can be executed from top to bottom to arrive at a served service
- However, the requested source codes for solution should be found at `../pipeline/`. Instructions on how to run are in README.md

# Approach to train a Restaurant Recommender

## Choose method
- I came across <a href="https://arxiv.org/abs/1507.08439">LightFM paper</a> months ago and was fascinated about how intuitive it is. Based on that paper, LightFM does not only able to incorporate metadata from users and items, it also is capable of outputing semantics embeddings. Furthermore, LightFM is made to exploit implicit feedback, which is the type of rating we have for this problem.
- Apart from LightFM, a content-based recommendation system comes top of my mind as I see the sparse interactions where 75% of our users are one-off.
    
## Define rating
- There are three kinds of rating I have tried:
    - The count number of reservations
    - The weigthed count number of reservations (based on both average of that user and average of that restaurant)
    - A random rating, to estimate impact of rating on our performance metrics (AUC and Precision At K)
- Since the experimental result suggests no significant change in performance w.r.t. to rating calculation, the final model sticks with the basic one (count reservations).

## Split train test
- As in this scope I don't exploit contextual information to recommend, I try to model the taste of users.
- Given the above objective, I apply a simple ratings drop-out for test set

## Fit model
- LightFM offers a variety of hyper-parameters. Reading the docs gives me enough information to select a decent default parameter including `loss='warp'` and `n_components=50`.
- I try some quick random searches and find no significant improvement.
- I try incorporating the categorical metadata of restaurants as features, but no significance found either.
- Therefore, I believe that the next experiment should be to re-examine the way we feed data into model training. Then to change the input and maybe combine other approaches.

## Serve model
- As the time of this writing, BentoML is a new (2 years old) and promising open-source package to help us ship our model.
- As BentoML has not yet introduced a pre-defined framework for LightFM, I implement two custom artifacts to encapsulate LightFM modeling objects: dataset and model. It is convenient that the LightFM artifacts can be persisted using Python `pickle` package (just like Scikit-Learn).
- In case input user not found by LightFM model, there is a Popular Recommender as fallback

## Shipping
- As they are shared many environment requirements, I extend the bentoml docker image to build a new one ready to run our scripts
- The training outputs artifacts to a shared volume so that later processes can pick up

## Next step
- As mentioned above, the approach to building recommendation should be revisited. Ideally recommendations should incorporate contextual information and can have many different models.
- Precomputation should also be exploited whenever possible. For this type of problem, we normally know in advanced our input information (unlike service that can predict animals from random user uploading images). This make recommendations possible to be precomputed and reduce the work load of our service.
- To orchestrate the precomputation workflow, we should consider offline, nearline and online computing jobs.

# Load data

In [5]:
DATA_RAW_DIR = "../data/datasets/"

In [6]:
rez_df = pd.read_csv(DATA_RAW_DIR + "reservations.csv", dtype={"rez_id": str})

rez_df["reservation_time"] = pd.to_datetime(
    rez_df["reservation_time"], unit="s", utc=True
)
rez_df["booking_time"] = pd.to_datetime(rez_df["booking_time"], unit="s", utc=True)

In [7]:
rez_df

Unnamed: 0,rez_id,RestaurantUID,hashed_email,reservation_time,adults,children,client_type,booking_time,country_code
0,22LSM,13gastrowine1603gas,845609244659738b8e6344566d4110c1,2017-10-06 11:30:00+00:00,2,0,v3.3.3,2017-10-02 07:50:50+00:00,SG
1,22PC3,13gastrowine1603gas,122666f2210822f97e1754882caeb6f8,2017-10-06 13:45:00+00:00,3,0,v3.3.5,2017-10-03 00:28:49+00:00,SG
2,22RLN,13gastrowine1603gas,61ffd1c30c2d23acf9f28773a4ae6a46,2017-10-06 12:45:00+00:00,6,0,Mac OS X/Safari 602.1,2017-10-03 06:22:38+00:00,SG
3,22TUY,13gastrowine1603gas,20315e9c1bc332b95c4696e3c2b89299,2017-10-05 12:00:00+00:00,2,0,Mac OS X/Safari 603.2.5,2017-10-03 10:49:02+00:00,SG
4,22UN9,13gastrowine1603gas,02cb147aef506aae301e94e588187828,2017-10-06 13:45:00+00:00,2,0,Mac OS X/Safari 604.1,2017-10-03 12:20:31+00:00,SG
...,...,...,...,...,...,...,...,...,...
517307,2IP8K,zuccchero1704zce,1576eb0c3a25b4ae32453ff04ade9efb,2018-02-17 07:15:00+00:00,2,0,Mac OS X/Safari 604.1,2017-12-26 17:48:27+00:00,HK
517308,2IT6F,zuccchero1704zce,811fd29ddfa727179fe5d47bf1f6ebf8,2017-12-28 07:15:00+00:00,2,0,Mac OS X/Safari 604.1,2017-12-27 08:07:19+00:00,HK
517309,2JDW6,zuccchero1704zce,86a5fefe284072752f4fd605a0b0442f,2017-12-31 07:30:00+00:00,2,0,Windows 7/Chrome 63.0.3239.108,2017-12-29 15:10:19+00:00,HK
517310,2JF95,zuccchero1704zce,88592eda80504476f51d7183e736bc9e,2018-01-04 06:45:00+00:00,2,0,Mac OS X/Safari 602.1,2017-12-30 03:22:26+00:00,HK


# EDA

## Identify low-frequency restaurants

Assumption: Those low-frequency restaurants are not safe to recommend because we know little about them.

In [8]:
res_freq = rez_df.groupby(["RestaurantUID"]).agg(
    {"rez_id": ["nunique"], "hashed_email": ["nunique"]}
)
res_freq.columns = ["_".join(col).strip() for col in res_freq.columns.values]

In [9]:
res_freq.describe(include="all", percentiles=[0.05, 0.1, 0.2, 0.5, 0.75]).T

Unnamed: 0,count,mean,std,min,5%,10%,20%,50%,75%,max
rez_id_nunique,2317.0,223.258524,499.361867,1.0,2.0,5.0,12.2,68.0,207.0,8867.0
hashed_email_nunique,2317.0,199.556323,446.767606,1.0,2.0,4.0,11.0,62.0,192.0,8045.0


If we recommend only restaurants that have more than 10 users visited, we drop about 20% restaurants.

## Identify low-frequency users

In [10]:
user_freq = rez_df.groupby(["hashed_email"]).agg(
    {"rez_id": ["nunique"], "RestaurantUID": ["nunique"]}
)
user_freq.columns = ["_".join(col).strip() for col in user_freq.columns.values]

In [11]:
user_freq.describe(include="all", percentiles=[0.25, 0.5, 0.75, 0.8, 0.9]).T

Unnamed: 0,count,mean,std,min,25%,50%,75%,80%,90%,max
rez_id_nunique,318372.0,1.624515,2.478695,1.0,1.0,1.0,2.0,2.0,3.0,488.0
RestaurantUID_nunique,318372.0,1.452301,1.354262,1.0,1.0,1.0,1.0,2.0,2.0,84.0


If we apply LightFM to recommend for only users visiting more than 1 restaurant, we will not touch 75% of our uses! This finding also indicates that we suffer from cold-start problem and might need to add a content-based recommender system to handle.

# Preprocess

## Count reservations as rating

In [12]:
rating_df = rez_df.groupby(["hashed_email", "RestaurantUID"]).agg(
    {"rez_id": ["nunique"]}
)
rating_df.columns = ["_".join(col).strip() for col in rating_df.columns.values]
rating_df = rating_df.reset_index()
rating_df

Unnamed: 0,hashed_email,RestaurantUID,rez_id_nunique
0,000016d31b46d5b64dec58ace9f0720d,4284,2
1,000016d31b46d5b64dec58ace9f0720d,4634,3
2,000016d31b46d5b64dec58ace9f0720d,basilico47fsa86a-4,1
3,000016d31b46d5b64dec58ace9f0720d,buonaterra1605buo,1
4,000016d31b46d5b64dec58ace9f0720d,cassia1510css,1
...,...,...,...
462367,fffe20dee903920274ac39c2953e85d0,pizzafabbrica1412pfb,1
462368,fffe283d87726f83f8d2aff9d351c74d,truebluecuisine1712tbc,1
462369,fffefc6fe6dc2e68448207a63b058c74,lawrystheprimerib1607lpr,1
462370,ffff8e009ade13a7a04140bb76306ae2,shinminorijapaneseks1710sks,1


In [13]:
rating_df.describe(include="all", percentiles=[0.25, 0.5, 0.75, 0.8, 0.9, 0.95]).T

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,80%,90%,95%,max
hashed_email,462372.0,318372.0,c625da72fbbc9e72b10e6015a5c7bee8,84.0,,,,,,,,,,
RestaurantUID,462372.0,2317.0,jumboseafoodriversidepoint55asd58r9,8045.0,,,,,,,,,,
rez_id_nunique,462372.0,,,,1.118781,0.705395,1.0,1.0,1.0,1.0,1.0,1.0,2.0,105.0


Only 5% users booked a restaurant more than once.

The first naive implicit rater is to clip the `rez_id_nunique` in the range (1, 5)

In [14]:
rating_df["rating"] = rating_df["rez_id_nunique"].clip(lower=1, upper=5)

## Weighted average rating

In [15]:
rating_df["hashed_email_mean_rating"] = rating_df.groupby(["hashed_email"])[
    "rating"
].transform("mean")
rating_df["RestaurantUID_mean_rating"] = rating_df.groupby(["RestaurantUID"])[
    "rating"
].transform("mean")

In [16]:
rating_df["rating_wa"] = (
    rating_df["rating"] / rating_df["hashed_email_mean_rating"]
    + rating_df["rating"] / rating_df["RestaurantUID_mean_rating"]
) / 2

In [17]:
rating_df.describe(include="all").T

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
hashed_email,462372.0,318372.0,c625da72fbbc9e72b10e6015a5c7bee8,84.0,,,,,,,
RestaurantUID,462372.0,2317.0,jumboseafoodriversidepoint55asd58r9,8045.0,,,,,,,
rez_id_nunique,462372.0,,,,1.118781,0.705395,1.0,1.0,1.0,1.0,105.0
rating,462372.0,,,,1.107548,0.41689,1.0,1.0,1.0,1.0,5.0
hashed_email_mean_rating,462372.0,,,,1.107548,0.316258,1.0,1.0,1.0,1.0,5.0
RestaurantUID_mean_rating,462372.0,,,,1.107548,0.081543,1.0,1.066879,1.094375,1.123288,5.0
rating_wa,462372.0,,,,1.0,0.238732,0.362319,0.940998,0.95767,0.971264,4.246429


## Random rating

To test the impact of implementing implicit rating

In [18]:
import numpy as np

In [19]:
random_range = list(range(1, 5))
rating_df["rating_random"] = np.random.choice(random_range, size=rating_df.shape[0])

# Getting restaurant metadata

## Load res_cats

In [20]:
cat_df = pd.read_csv(DATA_RAW_DIR + "restaurant_category.csv", dtype=str)
res_cats_rel_df = pd.read_csv(DATA_RAW_DIR + "res_cats_relationship.csv", dtype=str)

In [21]:
cat_df

Unnamed: 0,id,parent_id,name,country_code
0,1,0,CUISINE,SG
1,2,0,LOCATION,SG
2,6,1,American,SG
3,8,1,Bar,SG
4,11,2,Dempsey,SG
...,...,...,...,...
626,1550,1196,BBQ,PHUKET
627,1551,1196,Indonesian,PHUKET
628,1554,614,Samphantawong-|-|-|สัมพันธวงศ์|,BANGKOK
629,1555,1,BBQ,SG


In [22]:
_cols = ["id", "name"]
_cat_df = cat_df[_cols].rename(columns={"id": "parent_id", "name": "cat_parent_name"})
cat_cross_df = cat_df.merge(_cat_df, how="left", on="parent_id")
cat_cross_df["cat_parent_name"] = cat_cross_df["cat_parent_name"].str.upper()

In [23]:
cat_denom_df = res_cats_rel_df.merge(
    cat_cross_df, how="left", left_on="cat_id", right_on="id"
)

cat_denom_df = cat_denom_df.drop(["id"], axis=1)
cat_denom_df = cat_denom_df.rename(
    columns={
        "parent_id": "cat_parent_id",
        "name": "cat_name",
        "country_code": "cat_country_code",
    }
)
cat_denom_df["cat_name"] = cat_denom_df["cat_name"].str.lower()

In [24]:
res_agg_df = cat_denom_df.groupby(
    ["RestaurantUID", "cat_country_code"], as_index=False
)["cat_id"].count()
res_agg_df = res_agg_df.drop(columns=["cat_id"])

assert res_agg_df["RestaurantUID"].duplicated().sum() == 0

In [25]:
res_agg_df

Unnamed: 0,RestaurantUID,cat_country_code
0,1,SG
1,111,SG
2,112,SG
3,1128bar1311bar,SG
4,116,SG
...,...,...
3787,zsgroupmoimoi1707mmn,HK
3788,zsofitapasbar1501ztb,SG
3789,zuccchero1704zce,HK
3790,zuma1503bkk,BANGKOK


## Format res_cats

In [26]:
res_features = res_agg_df.to_dict(orient="records")

# Prepare rating format 

In [27]:
rating_col = "rating"
cols = ["hashed_email", "RestaurantUID", rating_col]
rating_dicts = rating_df[cols].to_dict(orient="records")

In [28]:
from lightfm.data import Dataset

dataset = Dataset()
dataset.fit(
    (x["hashed_email"] for x in rating_dicts),
    (x["RestaurantUID"] for x in rating_dicts),
)



In [29]:
dataset.fit_partial(
    items=(x["RestaurantUID"] for x in res_features),
    item_features=(x["cat_country_code"] for x in res_features),
)

In [30]:
num_users, num_items = dataset.interactions_shape()
print("Num users: {}, Num restaurants: {}.".format(num_users, num_items))

Num users: 318372, Num restaurants: 3853.


In [31]:
(interactions, weights) = dataset.build_interactions(
    ((x["hashed_email"], x["RestaurantUID"], x[rating_col]) for x in rating_dicts)
)

print(repr(interactions))

<318372x3853 sparse matrix of type '<class 'numpy.int32'>'
	with 462372 stored elements in COOrdinate format>


In [32]:
sparsity = weights.nnz / (weights.shape[0] * weights.shape[1])

In [33]:
sparsity

0.0003769273505718714

#### Build item features

In [34]:
item_features = dataset.build_item_features(
    ((x["RestaurantUID"], [x["cat_country_code"]]) for x in res_features)
)

In [35]:
print(repr(item_features))

<3853x3863 sparse matrix of type '<class 'numpy.float32'>'
	with 7645 stored elements in Compressed Sparse Row format>


# Split train test

In [36]:
from lightfm.cross_validation import random_train_test_split

In [37]:
train, test = random_train_test_split(weights, test_percentage=0.2, random_state=13)

# Fit model

In [38]:
from lightfm import LightFM
from lightfm.evaluation import precision_at_k, auc_score

## WARP

In [39]:
model = LightFM(
    no_components=50, learning_schedule="adagrad", learning_rate=0.01, loss="warp"
)

model.fit_partial(train, epochs=10, sample_weight=train, verbose=True)

train_precision = precision_at_k(model, train, k=10).mean()
test_precision = precision_at_k(
    model, test, k=10, train_interactions=train, check_intersections=True,
).mean()

train_auc = auc_score(model, train).mean()
test_auc = auc_score(model, test, train_interactions=train,).mean()

print("Precision: train %.2f, test %.2f." % (train_precision, test_precision))
print("AUC: train %.2f, test %.2f." % (train_auc, test_auc))

Epoch: 100%|██████████| 10/10 [00:04<00:00,  2.15it/s]


Precision: train 0.08, test 0.01.
AUC: train 0.96, test 0.92.


# Hyper-param tuning

In [54]:
import itertools


def sample_hyperparameters():
    """
    Yield possible hyperparameter choices.
    """

    while True:
        yield {
            "no_components": np.random.randint(16, 64),
            "learning_schedule": np.random.choice(["adagrad", "adadelta"]),
            "loss": np.random.choice(["bpr", "warp", "warp-kos"]),
            "learning_rate": np.random.exponential(0.05),
            "item_alpha": np.random.exponential(1e-8),
            "user_alpha": np.random.exponential(1e-8),
            "max_sampled": np.random.randint(5, 15),
            "num_epochs": np.random.randint(5, 50),
        }


def random_search(train, test, num_samples=10, num_threads=1):
    """
    Sample random hyperparameters, fit a LightFM model, and evaluate it
    on the test set.

    Parameters
    ----------

    train: np.float32 coo_matrix of shape [n_users, n_items]
        Training data.
    test: np.float32 coo_matrix of shape [n_users, n_items]
        Test data.
    num_samples: int, optional
        Number of hyperparameter choices to evaluate.


    Returns
    -------

    generator of (auc_score, hyperparameter dict, fitted model)

    """

    i = 1
    for hyperparams in itertools.islice(sample_hyperparameters(), num_samples):
        print(f"{i} - Evaluating {hyperparams}...")
        num_epochs = hyperparams.pop("num_epochs")

        model = LightFM(**hyperparams)
        model.fit(train, epochs=num_epochs, num_threads=num_threads)

        score_auc = auc_score(
            model, test, train_interactions=train, num_threads=num_threads
        ).mean()
        score_pak = precision_at_k(
            model, test, k=10, train_interactions=train, check_intersections=True,
        ).mean()

        hyperparams["num_epochs"] = num_epochs

        yield (score_pak, score_auc, hyperparams, model)

        i += 1

In [55]:
random_search_results = []
rs = random_search(train, test, num_samples=20, num_threads=2)
for result in rs:
    random_search_results.append(result)

1 - Evaluating {'no_components': 44, 'learning_schedule': 'adagrad', 'loss': 'bpr', 'learning_rate': 0.01783667941844756, 'item_alpha': 7.053406626813405e-09, 'user_alpha': 1.3465427828987647e-08, 'max_sampled': 7, 'num_epochs': 32}...
2 - Evaluating {'no_components': 34, 'learning_schedule': 'adagrad', 'loss': 'warp-kos', 'learning_rate': 0.06246996438405816, 'item_alpha': 1.3141647881324948e-08, 'user_alpha': 6.222805462453687e-09, 'max_sampled': 6, 'num_epochs': 48}...
3 - Evaluating {'no_components': 58, 'learning_schedule': 'adadelta', 'loss': 'warp', 'learning_rate': 0.04024858379095012, 'item_alpha': 2.3228927538001914e-10, 'user_alpha': 2.3522321504531383e-08, 'max_sampled': 9, 'num_epochs': 43}...
4 - Evaluating {'no_components': 48, 'learning_schedule': 'adagrad', 'loss': 'bpr', 'learning_rate': 0.0775034083741008, 'item_alpha': 2.4430031596261906e-08, 'user_alpha': 1.793743634771059e-08, 'max_sampled': 10, 'num_epochs': 47}...
5 - Evaluating {'no_components': 25, 'learning_s

In [64]:
rs_result_df = pd.DataFrame(random_search_results).drop(columns=[3])
rs_result_df = pd.concat([rs_result_df, rs_result_df[2].apply(pd.Series)], axis=1)
rs_result_df = rs_result_df.drop(columns=[2])
rs_result_df = rs_result_df.rename(columns={0: "precision_at_10", 1: "auc"})

In [66]:
rs_result_df.sort_values(["precision_at_10"], ascending=False)

Unnamed: 0,precision_at_10,auc,no_components,learning_schedule,loss,learning_rate,item_alpha,user_alpha,max_sampled,num_epochs
13,0.011633,0.91952,46,adadelta,warp,0.153603,1.456578e-09,2.04807e-09,8,5
17,0.011542,0.917115,52,adagrad,warp-kos,0.008964,3.149371e-09,5.173231e-09,9,19
7,0.011461,0.898658,25,adagrad,warp,0.052366,1.988822e-09,2.059722e-09,10,34
9,0.011376,0.898121,31,adagrad,warp,0.060298,6.675086e-09,2.49618e-09,11,49
6,0.011312,0.822987,27,adadelta,warp,0.027463,1.577195e-08,1.995959e-09,6,21
10,0.011149,0.75255,51,adadelta,warp,0.01177,1.319344e-08,2.29008e-10,9,34
8,0.011125,0.817457,37,adadelta,warp-kos,0.030532,5.544554e-10,2.231982e-09,6,29
18,0.011091,0.897756,40,adagrad,warp,0.091658,5.337662e-09,1.807101e-08,10,49
5,0.011049,0.711607,17,adadelta,warp,0.086658,3.006179e-09,7.612106e-10,12,37
1,0.010874,0.893465,34,adagrad,warp-kos,0.06247,1.314165e-08,6.222805e-09,6,48


Hyper-parameter tuning does not help increase the model performance.

# Popular recommender

In [69]:
pop_res = rez_df.groupby(["RestaurantUID"]).agg({"rez_id": ["nunique"]})
pop_res.columns = ["_".join(cols) for cols in pop_res.columns]

In [89]:
pop_rec_df = pop_res.sort_values(["rez_id_nunique"], ascending=False).iloc[:10]

In [90]:
pop_rec = list(pop_rec_df.index)

In [91]:
pop_rec

['jumboseafoodriversidepoint55asd58r9',
 'jumboseafoodgallery35dfs43r6',
 'jumboseafoodeastcoast1508jse',
 'blukouzina82fps33a-4',
 'wildhoneymg35dfs95r2',
 'nationalkitchenbyviolet1511nkv',
 'level33872668a-4',
 'fatcow1706ftc',
 'newubinseafood89joa12a-4',
 'shinminorijapaneserestaurant1503smj']

# Persist model

In [135]:
import pickle

OUTPUT_MODEL_DIR = "../models/lfm/"

with open(OUTPUT_MODEL_DIR + "lfm_model.pkl", "wb") as f:
    pickle.dump(model, f)

with open(OUTPUT_MODEL_DIR + "lfm_dataset.pkl", "wb") as f:
    pickle.dump(dataset, f)

## Load

In [143]:
with open(OUTPUT_MODEL_DIR + "lfm_model.pkl", "rb") as f:
    model = pickle.load(f)

In [146]:
with open(OUTPUT_MODEL_DIR + "lfm_dataset.pkl", "rb") as f:
    dataset = pickle.load(f)

# Predict

## Predict individual

In [83]:
items_map = np.array(list(dataset.mapping()[2].keys()))
users_map = np.array(list(dataset.mapping()[0].keys()))

In [84]:
users_map

array(['000016d31b46d5b64dec58ace9f0720d',
       '000034f9763e2b654481b97d8752db51',
       '00005ad3e1efe6ca52efe1d67578048f', ...,
       'fffefc6fe6dc2e68448207a63b058c74',
       'ffff8e009ade13a7a04140bb76306ae2',
       'ffffd8a2426ea89ee020c00c71f90653'], dtype='<U32')

In [85]:
def sample_recommendation(model, user_ids, users_map, items_map, k: int):
    n_items = len(items_map)
    for user_id in user_ids:
        scores = model.predict(user_id, np.arange(n_items))
        top_items = items_map[np.argsort(-scores)]
        user = users_map[user_id]
        print(f"Recommendations for user {user}:")

        for x in top_items[:k]:
            print("   %s" % x)

In [86]:
sample_recommendation(model, [10], users_map, items_map, k=10)

Recommendations for user 0001d5416821f2fc4ac68862a87c581c:
   jumboseafoodriversidepoint55asd58r9
   jumboseafoodgallery35dfs43r6
   jumboseafoodeastcoast1508jse
   blukouzina82fps33a-4
   wildhoneymg35dfs95r2
   nationalkitchenbyviolet1511nkv
   level33872668a-4
   shinminorijapaneserestaurant1503smj
   newubinseafood89joa12a-4
   fatcow1706ftc


# Persist model

In [80]:
%%writefile pop_rec_artifact.py

import os
from bentoml.utils import cloudpickle
from bentoml.service.artifacts import BentoServiceArtifact


class PopRecArtifact(BentoServiceArtifact):
    def __init__(self, name):
        super(PopRecArtifact, self).__init__(name)
        self._model = None

    def pack(self, model, metadata=None):
        self._model = model
        return self

    def get(self):
        return self._model

    def save(self, directory):
        path = self._file_path(directory)
        with open(path, "wb") as file:
            cloudpickle.dump(self._model, file)

    def load(self, path):
        with open(self._file_path(path), "rb") as file:
            model = cloudpickle.load(file)
        return self.pack(model)

    def _file_path(self, base_path):
        return os.path.join(base_path, self.name + ".pkl")

Writing pop_rec_artifact.py


In [81]:
%%writefile bento_lfm_artifacts.py

import os
from bentoml.utils import cloudpickle
from bentoml.service.artifacts import BentoServiceArtifact


class LightFMModelArtifact(BentoServiceArtifact):
    def __init__(self, name):
        super(LightFMModelArtifact, self).__init__(name)
        self._model = None

    def pack(self, model, metadata=None):
        self._model = model
        return self

    def get(self):
        return self._model

    def save(self, directory):
        path = self._file_path(directory)
        with open(path, "wb") as file:
            cloudpickle.dump(self._model, file)

    def load(self, path):
        with open(self._file_path(path), "rb") as file:
            model = cloudpickle.load(file)
        return self.pack(model)

    def _file_path(self, base_path):
        return os.path.join(base_path, self.name + ".pkl")


class LightFMDatasetArtifact(LightFMModelArtifact):
    def __init__(self, name):
        super(LightFMDatasetArtifact, self).__init__(name)
        self._model = None

Overwriting bento_lfm_artifacts.py


In [160]:
%%writefile bento_lfm_service.py

from bento_lfm_artifacts import LightFMModelArtifact, LightFMDatasetArtifact
from pop_rec_artifact import PopRecArtifact
from bentoml import BentoService, env, api, artifacts
from bentoml.adapters import JsonInput, JsonOutput
from bentoml.exceptions import BadInput

import numpy as np
from string import punctuation


@env(infer_pip_packages=True)
@artifacts(
    [
        LightFMModelArtifact(name="model"),
        LightFMDatasetArtifact(name="dataset"),
        PopRecArtifact(name="pop_rec"),
    ]
)
class LightFMRecService(BentoService):
    @api(input=JsonInput(), output=JsonOutput(), batch=True)
    def recommend(self, input_data):
        model = self.artifacts.model
        pop_rec = self.artifacts.pop_rec
        dataset = self.artifacts.dataset
        items_map = dataset.mapping()[2]
        users_map = dataset.mapping()[0]
        if isinstance(input_data, dict):
            input_data = [input_data]
        if len(input_data) > 1:
            return [{"message": BadInput(f"input has too many elements")}]
        input_data = input_data[0]
        if not isinstance(input_data, dict):
            return [{"message": BadInput(f"input type is not allowed")}]
        hashed_email = input_data.get('hashed_email')
        if hashed_email is None or any(p in hashed_email for p in punctuation):
            return [{"message": BadInput(f"input {hashed_email} is not accepted")}]
        recommendations = self._recommend(
            model, hashed_email, users_map, items_map, k=3, pop_rec=pop_rec
        )
        result = {
            "meta": input_data,
            "data": {"recommendations": recommendations},
        }
        return [result]

    @staticmethod
    def _recommend(model, user_id, users_map, items_map, k: int, pop_rec):
        items_arr = np.array(list(items_map.keys()))
        n_items = len(items_map)

        _user_id = users_map.get(user_id)
        if _user_id is None:
            return pop_rec[:k]
        scores = model.predict(_user_id, np.arange(n_items))
        top_items = items_arr[np.argsort(-scores)]
        return list(top_items[:k])


Overwriting bento_lfm_service.py


In [156]:
from bento_lfm_service import LightFMRecService

lfm_service = LightFMRecService()

lfm_service.pack("model", model)
lfm_service.pack("dataset", dataset)
lfm_service.pack("pop_rec", pop_rec)

<bento_lfm_service.LightFMRecService at 0x7fa15409d940>

In [157]:
lfm_service.recommend({'hashed_email': 'a'})

[{'meta': {'hashed_email': 'a'},
  'data': {'recommendations': ['jumboseafoodriversidepoint55asd58r9',
    'jumboseafoodgallery35dfs43r6',
    'jumboseafoodeastcoast1508jse']}}]

# Save predict service to disk

In [158]:
saved_path = lfm_service.save(version="v0.1")

[2021-03-31 22:03:56,436] INFO - BentoService bundle 'LightFMRecService:v0.1' saved to: /Users/quydv1/bentoml/repository/LightFMRecService/v0.1


# REST API Model Serving

In [159]:
!bentoml serve LightFMRecService:v0.1

[2021-03-31 22:04:01,554] INFO - Starting BentoML API proxy in development mode..
[2021-03-31 22:04:01,673] INFO - Your system nofile limit is 4096, which means each instance of microbatch service is able to hold this number of connections at same time. You can increase the number of file descriptors for the server process, or launch more microbatch instances to accept more concurrent connection.
(Press CTRL+C to quit)
[2021-03-31 22:04:02,005] INFO - Starting BentoML API server in development mode..
 * Serving Flask app "LightFMRecService" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off
 * Running on http://127.0.0.1:54018/ (Press CTRL+C to quit)
127.0.0.1 - - [31/Mar/2021 22:04:06] "[37mGET / HTTP/1.1[0m" 200 -
127.0.0.1 - - [31/Mar/2021 22:04:07] "[37mGET /docs.json HTTP/1.1[0m" 200 -
[2021-03-31 22:04:11,739] ERROR - Error caught in API function:
Traceback (most recent call last):
  File "/Users/quydv1/miniconda3/env

# Predict for all users

In [41]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [103]:
CHUNK_SIZE = 10000

_item_ids = np.arange(dataset.item_features_shape()[0])
_user_ids = chunks(np.arange(dataset.user_features_shape()[0]), CHUNK_SIZE)

In [43]:
def lfm_predict(model, user_ids, item_ids, k: int):
    user_ids_ = np.repeat(np.int32(user_ids), len(item_ids))
    item_ids_ = np.int32(np.tile(item_ids, len(user_ids)))
    predictions = model.predict(user_ids_, item_ids_)
    predictions_df = pd.DataFrame({'user_id': user_ids_, 'item_id': item_ids_, 'score': predictions })
    output = predictions_df.sort_values('score', ascending=False).groupby(['user_id']).head(k)
    return output

In [44]:
OUTPUT_PRED_DIR = "../data/output"
i = 0
for _uids in tqdm(_user_ids):
    uids = list(_uids)
    _output = lfm_predict(model, uids, _item_ids, k=10)
    output_fpath = f'{OUTPUT_PRED_DIR}/output_predictions_{str(i).zfill(4)}.parquet'
    print(f"Saving {output_fpath}")
    _output.to_parquet(output_fpath)
    i += 1

0it [00:00, ?it/s]

Saving ../data/output_0000.parquet
Saving ../data/output_0001.parquet
Saving ../data/output_0002.parquet
Saving ../data/output_0003.parquet
Saving ../data/output_0004.parquet
Saving ../data/output_0005.parquet
Saving ../data/output_0006.parquet
Saving ../data/output_0007.parquet
Saving ../data/output_0008.parquet
Saving ../data/output_0009.parquet
Saving ../data/output_0010.parquet
Saving ../data/output_0011.parquet
Saving ../data/output_0012.parquet
Saving ../data/output_0013.parquet
Saving ../data/output_0014.parquet
Saving ../data/output_0015.parquet
Saving ../data/output_0016.parquet
Saving ../data/output_0017.parquet
Saving ../data/output_0018.parquet
Saving ../data/output_0019.parquet
Saving ../data/output_0020.parquet
Saving ../data/output_0021.parquet
Saving ../data/output_0022.parquet
Saving ../data/output_0023.parquet
Saving ../data/output_0024.parquet
Saving ../data/output_0025.parquet
Saving ../data/output_0026.parquet
Saving ../data/output_0027.parquet
Saving ../data/outpu