In [1]:
import os
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'

In [None]:
import os
import json
import joblib

import numpy as np
import pandas as pd
import scipy.sparse as sp
from sklearn.preprocessing import LabelEncoder

from lightfm import LightFM



In [63]:
CYTHON_DTYPE = np.float32
ID_DTYPE = np.int32

from typing import Tuple, Union, Dict
import multiprocessing as mp

import numpy as np
from scipy import sparse as sp

# Set of global variables for multiprocessing
_user_repr = np.array([])   # n_users, n_features
_user_repr_biases = np.array([])
_item_repr = np.ndarray([])  # n_features, n_items
_item_repr_biases = np.array([])
_pool = None
_item_chunks = {}


def _check_setup():
    if not (len(_user_repr)
        and len(_user_repr_biases)
        and len(_item_repr)
        and len(_item_repr_biases)):

        raise EnvironmentError('You must setup mode.batch_setup(item_ids) before using predict')


def _batch_setup(model: LightFM,
                 item_chunks: Dict[int, np.ndarray],
                 item_features: Union[None, sp.csr_matrix]=None,
                 user_features: Union[None, sp.csr_matrix]=None,
                 n_process: int=1):

    global _item_repr, _user_repr
    global _item_repr_biases, _user_repr_biases
    global _pool
    global _item_chunks

    if item_features is None:
        n_items = len(model.item_biases)
        item_features = sp.identity(n_items, dtype=CYTHON_DTYPE, format='csr')

    if user_features is None:
        n_users = len(model.user_biases)
        user_features = sp.identity(n_users, dtype=CYTHON_DTYPE, format='csr')

    n_users = user_features.shape[0]
    user_features = model._construct_user_features(n_users, user_features)
    _user_repr, _user_repr_biases = _precompute_representation(
        features=user_features,
        feature_embeddings=model.user_embeddings,
        feature_biases=model.user_biases,
    )

    n_items = item_features.shape[0]
    item_features = model._construct_item_features(n_items, item_features)
    _item_repr, _item_repr_biases = _precompute_representation(
        features=item_features,
        feature_embeddings=model.item_embeddings,
        feature_biases=model.item_biases,
    )
    _item_repr = _item_repr.T
    _item_chunks = item_chunks
    _clean_pool()
    # Pool creation should go last
    if n_process > 1:
        _pool = mp.Pool(processes=n_process)


def _precompute_representation(
        features: sp.csr_matrix,
        feature_embeddings: np.ndarray,
        feature_biases: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    :param: features           csr_matrix         [n_objects, n_features]
    :param: feature_embeddings np.ndarray(float)  [n_features, no_component]
    :param: feature_biases     np.ndarray(float)  [n_features]

    :return:
    TODO:
    tuple of
    - representation    np.ndarray(float)  [n_objects, no_component+1]
    - bias repr
    """

    representation = features.dot(feature_embeddings)
    representation_bias = features.dot(feature_biases)
    return representation, representation_bias


def _get_top_k_scores(scores: np.ndarray, k: int, item_ids: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    :return: indices of items, top_k scores. All in score decreasing order.
    """

    if k:
        top_indices = np.argpartition(scores, -k)[-k:]
        scores = scores[top_indices]
        sorted_top_indices = np.argsort(-scores)
        scores = scores[sorted_top_indices]
        top_indices = top_indices[sorted_top_indices]
    else:
        top_indices = np.arange(len(scores))

    if len(item_ids):
        top_indices = item_ids[top_indices]

    return top_indices, scores


def _batch_predict_for_user(user_id: int, top_k: int=50, chunk_id: int=None, item_ids=None) -> Tuple[np.ndarray, np.ndarray]:
    """
    :return: indices of items, top_k scores. All in score decreasing order.
    """
    # exclude biases from repr (last column of user_repr and last row of transposed item repr)
    user_repr = _user_repr[user_id, :]

    if chunk_id is not None:
        item_ids = _item_chunks[chunk_id]
    elif item_ids is None:
        raise UserWarning('Supply item chunks at setup or item_ids in predict')

    if item_ids is None or len(item_ids) == 0:
        item_repr = _item_repr
        item_repr_biases = _item_repr_biases
    else:
        item_repr = _item_repr[:, item_ids]
        item_repr_biases = _item_repr_biases[item_ids]

    scores = user_repr.dot(item_repr)
    scores += _user_repr_biases[user_id]
    scores += item_repr_biases
    return _get_top_k_scores(scores, k=top_k, item_ids=item_ids)


def _clean_pool():
    global _pool
    if _pool is not None:
        _pool.close()
        _pool = None


def _batch_cleanup():
    global _item_ids, _item_repr, _user_repr, _pool, _item_chunks
    _item_chunks = {}
    _user_repr = np.array([])
    _item_repr = np.ndarray([])
    _clean_pool()

In [None]:
### FILL IN MISSING CODE FROM https://github.com/dmitryhd/lightfm
import itertools
import time

class LightFMWrapper(LightFM):
    def __init__(self, no_components, loss, learning_rate, max_sampled, random_state, user_alpha):
        super().__init__(no_components=200, loss='warp', learning_rate=0.02, max_sampled=400, random_state=1, user_alpha=1e-05)

    @staticmethod
    def _to_cython_dtype(mat):
        if mat.dtype != CYTHON_DTYPE:
            return mat.astype(CYTHON_DTYPE)
        else:
            return mat

    def _construct_item_features(self, n_items: int, item_features) -> sp.csr_matrix:
        # TODO: mb. merge with user features
        if item_features is None:
            item_features = sp.identity(n_items, dtype=CYTHON_DTYPE, format='csr')
        else:
            item_features = item_features.tocsr()

        if n_items > item_features.shape[0]:
            raise Exception('Number of item feature rows does not equal the number of items')

        if self.item_embeddings is not None:
            if not self.item_embeddings.shape[0] >= item_features.shape[1]:
                raise ValueError(
                    'The item feature matrix specifies more features than there are estimated '
                    'feature embeddings: {} vs {}.'.format(self.item_embeddings.shape[0], item_features.shape[1])
                )

        item_features = self._to_cython_dtype(item_features)
        return item_features
    
    def batch_setup(self, item_chunks, item_features, user_features, n_process: int=1):

        global _item_repr, _user_repr
        global _item_repr_biases, _user_repr_biases
        global _pool
        global _item_chunks

        self.n_process = n_process

        if item_features is None:
            n_items = len(self.item_biases)
            item_features = sp.identity(n_items, dtype=CYTHON_DTYPE, format='csr')

        if user_features is None:
            n_users = len(self.user_biases)
            user_features = sp.identity(n_users, dtype=CYTHON_DTYPE, format='csr')

        n_users = user_features.shape[0]
        user_features = self._construct_user_features(n_users, user_features)
        _user_repr, _user_repr_biases = _precompute_representation(
            features=user_features,
            feature_embeddings=self.user_embeddings,
            feature_biases=self.user_biases,
        )

        n_items = item_features.shape[0]
        item_features = self._construct_item_features(n_items, item_features)
        _item_repr, _item_repr_biases = _precompute_representation(
            features=item_features,
            feature_embeddings=self.item_embeddings,
            feature_biases=self.item_biases,
        )
        _item_repr = _item_repr.T
        _item_chunks = item_chunks
        # _clean_pool()
        # # Pool creation should go last
        # if n_process > 1:
        #     _pool = mp.Pool(processes=n_process)
    
    def batch_predict(self, chunk_id, user_ids, top_k: int=50):
        # from lightfm.inference import _batch_predict_for_user, _check_setup, _pool, _item_chunks

        self._check_initialized()
        # print(_item_chunks)
        print('Batch predict: user_ids: {:,}, item_ids: {:,}'.format(len(user_ids), len(_item_chunks[chunk_id])))

        recommendations = {}
        if not isinstance(user_ids, np.ndarray):
            user_ids = np.array(user_ids, dtype=ID_DTYPE)

        # _check_setup()
        btime = time.time()

        if self.n_process == 1:
            print('Start recommending: using single process')
            # self.debug('Start recommending: using single process')
            for user_id in user_ids:
                rec_ids, scores = _batch_predict_for_user(user_id=user_id, top_k=top_k, chunk_id=chunk_id)
                recommendations[user_id] = rec_ids, scores
        else:
            # self.debug('Start recommending: using multiprocessing')
            print('Start recommending: using multiprocessing')
            recs_list = _pool.starmap(
                _batch_predict_for_user,
                zip(user_ids, itertools.repeat(top_k), itertools.repeat(chunk_id)),
            )
            recommendations = dict(zip(user_ids, recs_list))

        elapsed_sec = time.time() - btime
        elapsed_sec_by_user = elapsed_sec / len(user_ids)
        print('Recommendations for chunk {:,} done in {:.3f}s. {:.4f} s by user'.format(
            chunk_id, elapsed_sec, elapsed_sec_by_user,
        ))
        return recommendations


    def _construct_user_features(self, n_users, user_features):
        if user_features is None:
            user_features = sp.identity(n_users, dtype=CYTHON_DTYPE, format='csr')
        else:
            user_features = user_features.tocsr()

        if n_users > user_features.shape[0]:
            raise Exception('Number of user feature rows does not equal the number of users')

        # If we already have embeddings, verify that
        # we have them for all the supplied features
        if self.user_embeddings is not None:
            if not self.user_embeddings.shape[0] >= user_features.shape[1]:
                raise ValueError(
                    'The user feature matrix specifies more features than there are estimated '
                    'feature embeddings: {} vs {}.'.format(self.user_embeddings.shape[0], user_features.shape[1])
                )

        user_features = self._to_cython_dtype(user_features)
        return user_features
    
    def batch_cleanup(self):
        _batch_cleanup()

def _precompute_representation(features, feature_embeddings, feature_biases):
    representation = features.dot(feature_embeddings)
    representation_bias = features.dot(feature_biases)
    return representation, representation_bias

In [4]:
!mkdir models

In [5]:
# Load all datasets using the function
df_tracks = pd.read_hdf('df_data/df_tracks.hdf', 'abc')
df_playlists = pd.read_hdf('df_data/df_playlists.hdf', 'abc')
df_playlists_info = pd.read_hdf('df_data/df_playlists_info.hdf', 'abc')
df_playlists_test = pd.read_hdf('df_data/df_playlists_test.hdf', 'abc')
df_playlists_test_info = pd.read_hdf('df_data/df_playlists_test_info.hdf', 'abc')

# display(df_tracks)
display(df_playlists_info)

Unnamed: 0,collaborative,duration_ms,modified_at,name,num_albums,num_artists,num_edits,num_followers,num_tracks,pid
0,False,4007017,1470355200,Funk,15,11,4,1,16,10000
1,False,5823244,1412985600,Childhood,19,14,3,1,28,10001
2,False,4891462,1421971200,Old Country,19,12,3,1,23,10002
3,False,3498401,1492646400,april showers,14,14,6,1,14,10003
4,False,8765977,1496534400,DnB,21,14,6,1,31,10004
...,...,...,...,...,...,...,...,...,...,...
4995,False,30149399,1509408000,EdM,124,87,75,2,135,100995
4996,False,71434251,1505520000,EME,31,12,17,3,210,100996
4997,False,13524857,1428796800,run!,40,31,11,1,46,100997
4998,False,3875542,1453680000,driving,15,15,6,1,15,100998


In [6]:
train = pd.read_hdf('df_data/train.hdf')
val = pd.read_hdf('df_data/val1.hdf')
val1_pids = joblib.load('df_data/val1_pids.pkl')

In [7]:
user_seen = train.groupby('pid').tid.apply(set).to_dict()
val_tracks = val.groupby('pid').tid.apply(set).to_dict()

In [11]:
display(df_playlists_test_info)

Unnamed: 0,name,num_holdouts,num_samples,num_tracks,pid
0,disney,89,100,189,1000
1,Indie Electro,65,100,165,1001
2,,7,0,17,1002
3,vibes,125,100,225,1003
4,Indie,65,100,165,1004
...,...,...,...,...,...
995,woo,146,0,146,1995
996,NEW YEARS,38,0,38,1996
997,JESUS,40,0,40,1997
998,yep,29,0,29,1998


In [17]:
num_playlists = len(train.pid) + 1

config = {
    'num_playlists': num_playlists + 1,
    'num_tracks': df_tracks.tid.max() + 1,
}

In [19]:
X_train = sp.coo_matrix(
    (np.ones(len(train)), (train.pid, train.tid)),
    shape=(config['num_playlists'], config['num_tracks'])
)

In [20]:
config['model_path'] = 'models/lightfm_model.pkl'

In [65]:
model = LightFMWrapper(no_components=200, loss='warp', learning_rate=0.02, max_sampled=400, random_state=1, user_alpha=1e-05)

best_score = 0
for i in range(60):
    print(f'iteration: {i}')
    
    model.fit_partial(X_train, epochs=1, num_threads=8)

    model.batch_setup(
        item_chunks={0: np.arange(config['num_tracks'])},
        item_features=None,
        user_features=None,
        n_process=1, 
    )

    res = model.batch_predict(chunk_id=0, user_ids=val1_pids, top_k=600)
    model.batch_cleanup()

    # model.fit(X_train, epochs=5, num_threads=6)
    # res = model.predict(user_ids=val1_pids, item_ids=np.arange(config['num_tracks']))
    
    score = []
    for pid in val1_pids:
        tracks_t = val_tracks[pid]
        tracks = [i for i in res[pid][0] if i not in user_seen.get(pid, set())][:len(tracks_t)]
        guess = np.sum([i in tracks_t for i in tracks])
        score.append(guess / len(tracks_t))
    
    score = np.mean(score)
    print(score)
    if score > best_score:
        joblib.dump(model, open(config['model_path'], 'wb'))
        best_score = score

iteration: 0
Batch predict: user_ids: 992, item_ids: 113,331
Start recommending: using single process
Recommendations for chunk 0 done in 11.895s. 0.0120 s by user


AttributeError: 'LightFMWrapper' object has no attribute 'batch_cleanup'