In [None]:
from collections import defaultdict
from scipy.sparse import coo_array, csr_array, csc_array, lil_array
import numpy as np
from numpy import linalg as LA
import pandas as pd
from tempfile import mkdtemp
import time
import multiprocessing
import os
import pickle
import cupy as cp
import matplotlib.pyplot as plt

In [None]:
os.environ['KAGGLE_USERNAME'] = "XXXXXXXXXX"
os.environ['KAGGLE_KEY'] = "XXXXXXXXXX"
!kaggle datasets download yelp-dataset/yelp-dataset -f  yelp_academic_dataset_review.json
!kaggle datasets download yelp-dataset/yelp-dataset -f  yelp_academic_dataset_business.json

Downloading yelp_academic_dataset_review.json.zip to /content
100% 2.06G/2.07G [00:20<00:00, 126MB/s]
100% 2.07G/2.07G [00:20<00:00, 110MB/s]
Downloading yelp_academic_dataset_business.json.zip to /content
 43% 9.00M/20.8M [00:00<00:00, 38.3MB/s]
100% 20.8M/20.8M [00:00<00:00, 74.8MB/s]


In [None]:
'''
BMap class is a bidirectional map used to mantain the relashonship between alphanumeric ids and numeric ids

Parameters
----------
map : list of tuples [(t, i)] or a set of tokens

'''
def t2if(): return -1
def i2tf(): return 'NULL'

class BMap:

  def __init__(self, map=None):

    self.token2index = defaultdict(t2if)
    self.index2token = defaultdict(i2tf)

    if not map == None:
      if type(map) is list:
        for entry in map:
          self.token2index[entry[0]] = entry[1]
          self.index2token[entry[1]] = entry[0]
      elif type(map) is set:
        for index, token in enumerate(map):
          self.token2index[token] = index
          self.index2token[index] = token
      else:
        raise ValueError('map type must be a list of tuples or a set')


  '''
  to_indices method returns numeric ids corresponding to the aplhanumeric ids passed as parameter.

  Parameters
  ----------
  tokens : list of alphanumeric ids
  append : when set to true, if no numeric ids are found, new ids are assigned

  Returns
  -------
  List of numeric ids

  '''
  def to_indices(self, tokens : list, append : bool=True) -> list:
    indices = []

    for token in tokens:
      if self.token2index[token] == -1 and append:
        id = len(self.token2index) - 1
        self.token2index[token] = id
        self.index2token[id] = token

      indices.append(self.token2index[token])

    return indices

  '''
  to_tokens menthod returns alphanumeric ids corresponding to the numeri ids passed as parameter.

  Parameters
  ----------
  indices : list of numeric ids

  Returns
  -------
  List of alphanumeric ids

  '''
  def to_tokens(self, indices : list) -> list:
    return [self.index2token[index] for index in indices]


In [None]:

'''
UtilityMatrix class represents the relashoship between users and items.

Parameters
----------
users_map       : bidirectional map of the users
items_map       : bidirectional map of the items
utility_matrix  : sparse matrix where each row correpsonds to a user and each column corresponds to a item

'''
class UtilityMatrix:

  def __init__(self, users_map : BMap=None, items_map: BMap=None, utility_matrix : coo_array=None):
    self.users_map = users_map
    self.items_map = items_map
    self.utility_matrix = utility_matrix
    self.shape = utility_matrix.shape

  def __getitem__(self, key):

    if type(key) is str:
      raise ValueError("Operation not supported yet, try with: um[['UID'], :], um[:, ['IID']], um['UID', 'IID']")

    if type(key[0]) is list and type(key[1]) is slice:

      user_ids = [uid for uid in key[0] if type(uid) is str]
      rows = self.users_map.to_indices(user_ids, append=False)

      user_ids = [user_ids[i] for i in range(len(user_ids)) if rows[i] != -1]
      rows = [row for row in rows if row != -1]

      if len(rows) == 0:
        raise ValueError('indeces out of boud')

      coo_um = self.utility_matrix.tocsr()[rows, :].tocoo()

      users_map = [(uid, i) for i, uid in enumerate(user_ids)]
      items_map = [(iid, coo_um.col[i]) for i, iid in enumerate(self.items_map.to_tokens(coo_um.col))]

      return UtilityMatrix(BMap(users_map), BMap(items_map), coo_um)


    if type(key[0]) is slice and type(key[1]) is list:

      item_ids = [iid for iid in key[1] if type(iid) is str]
      columns = self.items_map.to_indices(item_ids, append=False)

      item_ids = [item_ids[i] for i in range(len(item_ids)) if columns[i] != -1]
      columns = [column for column in columns if column != -1]

      if len(columns) == 0:
        raise ValueError('indeces out of bound')

      coo_um = self.utility_matrix.tocsc()[:, columns].tocoo()

      items_map = [(iid, i) for i, iid in enumerate(item_ids)]
      users_map = [(uid, coo_um.row[i])  for i, uid in enumerate(self.users_map.to_tokens(coo_um.row))]

      return UtilityMatrix(BMap(users_map), BMap(items_map), coo_um)

    if type(key[0]) is str and type(key[1]) is str:

      row = self.users_map.to_indices([key[0]], append=False)[0]
      column = self.items_map.to_indices([key[1]], append=False)[0]

      if row == -1 or column == -1:
        raise ValueError("index out of bound")

      return self.utility_matrix.tocsr()[[row], [column]][0]

    raise ValueError("Operation not supported yet, try with: um[['UID'], :], um[:, ['IID']], um['UID', 'IID']")

  def __setitem__(self, key, value):

    if type(key) is str:
      raise ValueError("Operation not supported, try with um['UID', 'IID'] = V, um[['U1', 'U2', 'U3'], ['I1', 'I2', 'I3']] = [V1, V2, V3]")

    if type(key[0]) is str and type(key[1]) is str and (type(value) is float or type(value) is int):

      row = self.users_map.to_indices([key[0]], append=False)[0]
      column = self.items_map.to_indices([key[1]], append=False)[0]

      if row == -1 or column == -1:
        raise ValueError("index out of bound")

      lil_um = self.utility_matrix.tolil()

      lil_um[[row], [column]] = value

      self.utility_matrix = lil_um.tocoo()

      return ;

    if type(key[0]) is list and type(key[1]) is list and type(value) is list:

      rows = []
      columns = []
      values = []

      for entry in zip(key[0], key[1], value):

        if not type(entry[0]) is str or not type(entry[1]) is str or (not type(entry[2]) is int and not type(entry[2]) is float):
          continue

        row = self.users_map.to_indices([entry[0]], append=False)[0]
        column = self.items_map.to_indices([entry[1]], append=False)[0]

        if row == -1 or column == -1:
          continue

        rows.append(row)
        columns.append(column)
        values.append(entry[2])

      if len(rows) == 0:
        raise ValueError('indeces out of bound')

      lil_um = self.utility_matrix.tolil()

      lil_um[rows, columns] = values

      self.utility_matrix = lil_um.tocoo()

      return ;

    raise ValueError("Operation not supported, try with um['UID', 'IID'] = V, um[['U1', 'U2', 'U3'], ['I1', 'I2', 'I3']] = [V1, V2, V3]")

  '''
  to_dense method converts the sparse matrix to a dense one

  '''
  def to_dense(self):
    return self.utility_matrix.todense()

  '''
  get_active_reviewers method returns the top k active users

  Parameters
  ----------
  n : number of top k active users

  Returns
  -------
  List of the top k active user ids

  '''
  def get_active_reviewers(self, n: int):
    csr_um = self.utility_matrix.tocsr()

    index = np.zeros(shape=self.shape[0], dtype='int32')
    nz_entries = np.zeros(shape=self.shape[0], dtype='int32')

    for i, row in enumerate(np.unique(self.utility_matrix.row)):

      start = csr_um.indptr[row]
      end = csr_um.indptr[row + 1]
      index[i] = row
      nz_entries[i] = csr_um.indices[start : end].size

    arg = nz_entries.argsort()[::-1]

    return self.users_map.to_tokens((index[arg])[:n])

  '''
  train_test_mask method builds a random boolean mask used to hide elements during the training and testing phases.

  Parameters
  ----------
  train_size : precent of training samples for each row, 0.8 is the default value

  Returns
  -------
  Random boolean mask as a dict where the key is the numeric id of the user

  '''
  def train_test_mask(self, train_size: float=0.8):
    csr_um = self.utility_matrix.tocsr()
    mask = dict()
    rng = np.random.default_rng()

    for row in np.unique(self.utility_matrix.row):
      start = csr_um.indptr[row]
      end = csr_um.indptr[row + 1]

      cols = csr_um.indices[start : end]

      n = round(cols.size * train_size)
      msk = np.zeros(cols.size, dtype='bool')
      msk[:n] = True
      rng.shuffle(msk)
      mask[row] = msk

    return mask

  def __str__(self):
    coo_um_str = ''

    if self.utility_matrix.row.shape[0] < 20:

      user_ids = self.users_map.to_tokens(self.utility_matrix.row)
      items_ids = self.items_map.to_tokens(self.utility_matrix.col)

      for entry in zip(user_ids, items_ids, self.utility_matrix.data):
        coo_um_str += "  ({:<5}, {:>5}){:>6}\n".format(entry[0], entry[1], entry[2])

      return f'{coo_um_str}\n[{self.shape[0]} rows x {self.shape[1]} columns]'


    user_ids = self.users_map.to_tokens(self.utility_matrix.row[:10])
    items_ids = self.items_map.to_tokens(self.utility_matrix.col[:10])

    for entry in zip(user_ids, items_ids, self.utility_matrix.data[:10]):
      coo_um_str += "  ({:<5}, {:>5}){:>6}\n".format(entry[0], entry[1], entry[2])

    coo_um_str += '\t . . . \n'

    user_ids = self.users_map.to_tokens(self.utility_matrix.row[-10:])
    items_ids = self.items_map.to_tokens(self.utility_matrix.col[-10:])

    for entry in zip(user_ids, items_ids, self.utility_matrix.data[-10:]):
      coo_um_str += "  ({:<5}, {:>5}){:>6}\n".format(entry[0], entry[1], entry[2])

    return f'{coo_um_str}\n[{self.shape[0]} rows x {self.shape[1]} columns]'


In [None]:

'''
load_reviews function reads the reviews in the file and stores them into a utility matrix.
When multiple reviews made by the same user for the same item are found, the corresponding entry
in the matrix is the average value.

Parameters
----------
path      : dataset file path
chunksize : size of the chunk

Returns
-------
The utility matrix

'''
def load_reviews(path : str, chunksize: int) -> UtilityMatrix:

  users_map = BMap()
  items_map = BMap()

  with pd.read_json(path, lines=True, chunksize=chunksize) as reader:

    rows = []
    columns = []
    data = []
    entries = defaultdict(lambda : -1)

    for chunk in reader:
      for entry in zip(chunk['user_id'], chunk['business_id'], chunk['stars']):

        row = users_map.to_indices([entry[0]])[0]
        column = items_map.to_indices([entry[1]])[0]

        entry_idx = entries[(row, column)]

        if entry_idx != -1:
          data[entry_idx] = (data[entry_idx] + entry[2]) / 2
        else:
          rows.append(row)
          columns.append(column)
          data.append(entry[2])
          entries[(row, column)] = len(entries) - 1


  return UtilityMatrix(users_map, items_map, utility_matrix=coo_array((data, (rows, columns)), dtype='float32'))


In [None]:

'''
load_business function reads the bussines info from file. Bussiness are sotred into a pandas data frame,
each item feature selected is stored into a dictionary, unique features are collected into a set.

Parameters
----------
path      : dataset file path
chunksize : size of the chunck

Returns
-------
A tuple containing:
  - bussiness: dataframe
  - items features: a dict where the key is the user id
  - features set: the set of all possible features

'''
def load_business(path: str, chunksize: int) -> tuple:

  item_features = defaultdict(list)
  features_set = set()
  business = pd.DataFrame()

  with pd.read_json(path, lines=True, chunksize=chunksize) as reader:

    for chunk in reader:

      business = pd.concat([business, chunk[['business_id', 'name', 'address', 'city', 'state', 'postal_code']]])

      for entry in zip(chunk['business_id'], chunk['city'], chunk['state'], chunk['is_open'], chunk['categories']):

        item_features[entry[0]].append(entry[1])
        item_features[entry[0]].append(entry[2])

        features_set.add(entry[1])
        features_set.add(entry[2])

        if not entry[4] is None:
          categories = entry[4].split(', ')
          for cat in categories:
            item_features[entry[0]].append(cat)
            features_set.add(cat)
        #else:
        #   item_features[entry[0]]

  return business, item_features, features_set


In [None]:

'''
LSH class define the Locality Sensitive Hashing technique based on random hyperplanes.

Parameters:
-----------
bands     : number of desidered bands
sig_bits  : number of random hyperplanes
ndim      : dimensions of datapoints
'''
class LSH:
    def __init__(self, bands: int, sig_bits: int, ndim: int):
        if sig_bits % bands != 0:
            raise ValueError('The number of bands must be a divisor of sig_bits')

        self.bands = bands
        self.sig_bits = sig_bits
        self.ndim = ndim
        self.sig_hyperplanes = np.random.randn(sig_bits, ndim)
        self.buckets = {i: defaultdict(set) for i in range(bands)}
        self.powers = np.power(2, np.arange(sig_bits // bands))[::-1]

    def _signature(self, hyperplanes: np.array, dtpoint: np.array):
       return (np.dot(hyperplanes, dtpoint) > 0).astype('int8')

    '''
    hash method hashes the datapoint, then the numeric id is put into the bucket where the hashed band falls

    Parameters
    ----------
    _id     : numeric unique id of the datapoint
    dtpoint : datapoint to be hashed
    '''
    def hash(self, _id: int, dtpoint: np.array):
        dtp_sig = self._signature(self.sig_hyperplanes, dtpoint)
        step = self.sig_bits // self.bands

        for i in range(self.bands):
            self.buckets[i][np.dot(self.powers, dtp_sig[i * step : (i + 1) * step])].add(_id)

    '''
    search method searches similar datpoints in the dataset. Searching is performed by using the same precedent approach
    definded to hash datapoints, so, when a band falls into a bucket the numerics ids contained are returned. The hope
    is that similar datapoints fall in the same buckets.

    Parameters
    ----------
    query : query to execute

    Returns
    -------
    Similar datapoints
    '''
    def search(self, query) -> list:
        query_sig = self._signature(self.sig_hyperplanes, query)
        step = self.sig_bits // self.bands
        query_rs = set()

        for i in range(self.bands):
            query_rs.update(self.buckets[i].get(np.dot(self.powers, query_sig[i * step : (i + 1) * step]), set()))

        return query_rs

    '''
    merge method merges hashed datapoints from another LSH object with the same configuration

    Parameters
    ----------
    other : LSH object
    '''
    def merge(self, other):
        for band, bucket in other.buckets.items():
            for hash_val, itemset in bucket.items():
                self.buckets[band][hash_val].update(itemset)

In [None]:

'''
ShardsHandler class is used to independently manage shard files where items and user profiles are written.

Parameters
----------
path    : file path
prefix  : perfix of the file name
dtype   : file content's data type
shape   : shape of the data (number of rows and columns)
nbytes  : maximum number of bytes per shard

'''
class ShardsHandler:
  def __init__(self, path: str, prefix: str, dtype: str, shape: tuple, nbytes: int):
    self.path = path
    self.prefix = prefix
    self.dtype = np.dtype(dtype)
    self.nbytes = nbytes
    self.conf = (nbytes // (shape[1] * self.dtype.itemsize), shape[1])
    self.shape = shape

  def __setitem__(self, indices, values):

    if isinstance(indices, int) or isinstance(indices, np.int64):
      indices = np.array([indices])

    if isinstance(indices, list):
      indices = np.array(indices)

    if not isinstance(indices, np.ndarray):
      raise ValueError('indices must be a array like object')

    if isinstance(values, int):
      values = np.full(shape=(indices.size, self.conf[1]), fill_value=values, dtype=self.dtype)

    if isinstance(values, list):
      values = np.array(indices)

    if not isinstance(values, np.ndarray):
      raise ValueError('values must be a array like object')

    asc_order = indices.argsort()
    values = values[asc_order]
    shard_ids = indices[asc_order] // self.conf[0]
    local_indices = indices[asc_order] % self.conf[0]
    shards, upper_bound = np.unique(shard_ids, return_index=True)

    for shard_id, index, value in zip(shards, np.split(local_indices, upper_bound[1:]), np.split(values, upper_bound[1:])):
      fname = f'{self.path}/{self.prefix}-{shard_id:03d}.dat'
      if not os.path.exists(fname):
        shard = np.memmap(fname, mode='w+', dtype=self.dtype, shape=self.conf)
      else:
        shard = np.memmap(fname, mode='r+', dtype=self.dtype, shape=self.conf)

      shard[index] = value
      shard.flush()

    self.shape = (max(self.shape[0], indices.max()), self.shape[1])

  def __getitem__(self, indices):

    if isinstance(indices, int):
      indices = np.array([indices])

    if isinstance(indices, list):
      indices = np.array(indices)

    if not isinstance(indices, np.ndarray):
      raise ValueError('indices must be a array like object')

    asc_order = indices.argsort()
    shard_ids = indices[asc_order] // self.conf[0]
    local_indices = indices[asc_order] % self.conf[0]
    shards, upper_bound = np.unique(shard_ids, return_index=True)

    submat = []
    for shard_id, index in zip(shards, np.split(local_indices, upper_bound[1:])):
      shard = np.memmap(f'{self.path}/{self.prefix}-{shard_id:03d}.dat', mode='r+', dtype=self.dtype, shape=self.conf)
      submat.append(shard[index])

    if len(submat) == 1:
      return submat[0]

    return np.concatenate((submat), axis=0, dtype=self.dtype)[asc_order.argsort()]


In [None]:

'''
OHEProcess class is a process lauched by the recommendation system object to compute one-hot-encoded items profiles
'''
class OHEProcess(multiprocessing.Process):
  def __init__(self, oheh: ShardsHandler, shard_id: int, rps: int, item_ids: list, itemf: defaultdict, fmap: BMap):
    super(OHEProcess, self).__init__()
    self.fname = f'{oheh.path}/{oheh.prefix}-{shard_id:03d}.dat'
    self.shard = np.memmap(self.fname, mode='w+', dtype=oheh.dtype, shape=oheh.conf)
    self.shard_id = shard_id
    self.shape = (rps, oheh.conf[1])
    self.item_ids = item_ids
    self.itemf = itemf
    self.fmap = fmap

  '''
  run method defines the behavior of the process
  '''
  def run(self):
    print(f'{self.name} encoding: shard_id: {self.shard_id}, shape: {self.shape}, dumping into {self.fname}')

    exec_t = time.time()

    for i, item_id in  enumerate(self.item_ids):
      cols = self.fmap.to_indices(self.itemf[item_id], append=False)
      self.shard[i, cols] = 1

    self.shard.flush()
    print(f'{self.name}: {self.fname} encoded in {round((time.time() - exec_t), 4)} (s)')


In [None]:

'''
LSHProcess class is a process lauched by the recommendation system object to build the LSH index of items
'''
class LSHProcess(multiprocessing.Process):
  def __init__(self, itemh: ShardsHandler, shard_id: int, bounds: tuple, lsh: LSH, queue: multiprocessing.Queue):
    super(LSHProcess, self).__init__()
    self.fname = f'{itemh.path}/{itemh.prefix}-{shard_id:03d}.dat'
    self.batch = np.memmap(self.fname, mode='r+', dtype=itemh.dtype, shape=itemh.conf)
    self.shard_id = shard_id
    self.bounds = bounds
    self.lsh = lsh
    self.queue = queue

  '''
  run method defines the behavior of the process
  '''
  def run(self):
    print(f'{self.name}: building LSH index: shard_id: {self.shard_id}, batch: {self.bounds}')

    exec_t = time.time()

    for i, row in enumerate(self.batch[self.bounds[0] : self.bounds[1]]):
      self.lsh.hash(self.bounds[0] + i, row)

    self.queue.put(self.lsh)
    self.queue.close()

    print(f'{self.name}: LSH index built in {round((time.time() - exec_t), 4)} (s)')


In [None]:

'''
UserProcess class is a process lauched by the recommendation system object to compute user profiles
'''
class UserProcess(multiprocessing.Process):
  def __init__(self, userh: ShardsHandler, itemh: ShardsHandler, shard_id: int, index: np.array, csr_um: csr_array, mask: dict):
    super(UserProcess, self).__init__()
    self.fname = f'{userh.path}/{userh.prefix}-{shard_id:03d}.dat'
    self.shard = np.memmap(self.fname, mode='w+', dtype=userh.dtype, shape=userh.conf)
    self.itemh = itemh
    self.shard_id = shard_id
    self.index = index
    self.shape = (index.size, userh.conf[1])
    self.csr_um = csr_um
    self.mask = mask

  '''
  run method defines the behavior of the process
  '''
  def run(self):
    print(f'{self.name} encoding: shard_id: {self.shard_id}, shape: {self.shape}, dumping into {self.fname}')

    exec_t = time.time()

    for i, user_indx in enumerate(self.index):
      start = self.csr_um.indptr[user_indx]
      end = self.csr_um.indptr[user_indx + 1]

      cols = self.csr_um.indices[start : end]
      ratings = self.csr_um.data[start : end]

      cols = cols[self.mask[user_indx]]
      ratings = ratings[self.mask[user_indx]]

      item_profiles = self.itemh[cols]


      userp = np.dot(ratings - ratings.mean(), item_profiles)
      #userp = np.dot(ratings, item_profiles) / ratings.sum()
      #userp = np.average(item_profiles, axis=0, weights=(ratings - (ratings.mean() - 1e-6)))
      #userp = userp / np.linalg.norm(userp)

      self.shard[i] = userp

    self.shard.flush()
    print(f'{self.name}: {self.fname} encoded in {round((time.time() - exec_t), 4)} (s)')


In [22]:
'''
RecommendationSystem class reprenset the content-based recommendation system

Parameters
----------
utility_matrix : utility matrix containing the relashponship between users and items
items          : dataframe containing basic info about items
item_features  : dictionary containing features of each items (the key is the items id)
features_set   : set of all possible features
bpsh           : maximum number of bytes per shard
ncomps         : dimensionality of reduced items

'''
class RecommendationSystem:
  def __init__(self, utility_matrix: UtilityMatrix, items: pd.DataFrame, item_features: defaultdict, features_set : set, bpsh: int, ncomps: int):
    self.utility_matrix = utility_matrix
    self.items = items
    self.item_features = item_features
    self.features_map = BMap(features_set)
    self.checkpoint = f'checkpoint-{round(time.time())}'
    self.shards_dir = f'{self.checkpoint}/shards'
    self.ohe_handler = ShardsHandler(path=self.shards_dir, prefix='ohe', dtype='int8', shape=(len(item_features), len(features_set)), nbytes=bpsh)
    self.items_handler = ShardsHandler(path=self.shards_dir, prefix='itemp', dtype='float32', shape=(len(item_features), ncomps), nbytes=bpsh)
    self.users_handler = ShardsHandler(path=self.shards_dir, prefix='userp', dtype='float32', shape=(utility_matrix.shape[0], ncomps), nbytes=bpsh)
    self.lsh = LSH(bands=30, sig_bits=180, ndim=ncomps)

    if not os.path.exists(self.checkpoint):
      os.mkdir(self.checkpoint)
      os.mkdir(self.shards_dir)

  def _reduce_dimensionality(self):

    pname = multiprocessing.current_process().name
    print(f'{pname}: reducing dimensionality: (n, {self.ohe_handler.shape[1]}) -> (n, {self.items_handler.shape[1]})')

    rps = self.ohe_handler.conf[0]
    n_shard = self.ohe_handler.shape[0] // rps + 1

    exec_t = time.time()

    optimal_components = []
    for i in range(1, n_shard + 1):
      if i * rps > self.ohe_handler.shape[0]: rps = rps - (i  * rps - self.ohe_handler.shape[0])

      index = (i - 1) * self.ohe_handler.conf[0] + np.arange(0, rps)
      shard = self.ohe_handler[index]

      shard_gpu = cp.asarray(shard, dtype='float32')

      cov = cp.cov(shard_gpu, rowvar=False)
      eigenvalues, eigenvectors = cp.linalg.eigh(cov)
      sorted_indices = cp.argsort(eigenvalues)[::-1]
      eigenvectors = eigenvectors[:, sorted_indices]
      eigenvalues = eigenvalues[sorted_indices]

      expl_var = eigenvalues / cp.sum(eigenvalues)
      cum_expl_var = cp.asnumpy(cp.cumsum(expl_var))

      optimal_components.append(np.argwhere(cum_expl_var >= 0.8)[0, 0])

      shard = cp.asnumpy(cp.dot(shard_gpu, eigenvectors[:, : self.items_handler.shape[1]])).astype(self.items_handler.dtype)

      self.items_handler[index] = shard


    print(f'Number of optimal components with cumulative explained variance greater than 0.8: {round(np.mean(optimal_components))}')
    print(f'{pname}: dimensionality reduced in {round((time.time() - exec_t), 4)} (s)')

  def _encode_items(self):

    items_map = self.utility_matrix.items_map
    rps = self.ohe_handler.conf[0]
    n_shard = self.ohe_handler.shape[0] // rps + 1

    processes = []
    for i in range(1, n_shard + 1):
      if i * rps > self.ohe_handler.shape[0]: rps = rps - (i  * rps - self.ohe_handler.shape[0])

      item_ids = items_map.to_tokens((i - 1) * self.ohe_handler.conf[0] + np.arange(0, rps))

      p = OHEProcess(self.ohe_handler, (i - 1), rps, item_ids, self.item_features, self.features_map)
      p.start()

      processes.append(p)

    for p in processes:
      p.join()

    self._reduce_dimensionality()

    queue = multiprocessing.Queue()
    ncpu = os.cpu_count()
    rps = self.items_handler.conf[0]
    n_shard = self.items_handler.shape[0] // rps + 1

    for i in range(1, n_shard + 1):
      if i * rps > self.items_handler.shape[0]: rps = rps - (i  * rps - self.items_handler.shape[0])

      batch_size = rps // ncpu + 1

      for icpu in range(1, ncpu + 1):
        batch_shape = ((icpu - 1) * batch_size, icpu * batch_size)

        p = LSHProcess(self.items_handler, (i - 1), batch_shape, self.lsh, queue)
        p.start()

    for _ in range(n_shard * ncpu):
      self.lsh.merge(queue.get())

  def _encode_users(self):

    rps = self.users_handler.conf[0]
    n_shard = self.users_handler.shape[0] // rps + 1

    csr_um = self.utility_matrix.utility_matrix.tocsr()

    processes = []
    for i in range(1, n_shard + 1):
      if i * rps > self.users_handler.shape[0]: rps = rps - (i  * rps - self.users_handler.shape[0])

      index = (i - 1) * self.users_handler.conf[0] + np.arange(0, rps)

      p = UserProcess(self.users_handler, self.items_handler, (i - 1), index, csr_um, self.mask)
      p.start()

      processes.append(p)

    for p in processes:
      p.join()

  '''
  transform method computes item profiles and user profiles. Items are encoded in one-hot representation, and then,
  their dimensionality is reduced using PCA technique. User profiles are computed from item profiles
  rated by the user. Before encoding user profiles, a random boolean mask is computed, which is useful for masking items
  during the training and evaluation phase.
  '''
  def transform(self):

    self._encode_items()
    self.mask = self.utility_matrix.train_test_mask(0.7)
    self._encode_users()

  def _cosine_similarity(self, user_profile: np.array, item_profiles: np.array) -> np.array:
    return np.dot(item_profiles, user_profile) / (np.linalg.norm(user_profile) * np.linalg.norm(item_profiles, axis=1))

  '''
  recommend method recommends to a user a list of interesting items, based on his/her preferences

  Parameters
  ----------
  user_ids : list of users ids
  n        : maximum number of recommended items

  '''
  def recommend(self, user_ids: list, n=10) -> dict:

    result_set = dict()
    csr_um = self.utility_matrix.utility_matrix.tocsr()
    users_map = self.utility_matrix.users_map

    for user_id in user_ids:

      user_indx = users_map.to_indices([user_id], append=False)[0]
      user_profile = self.users_handler[user_indx][0]

      start = csr_um.indptr[user_indx]
      end = csr_um.indptr[user_indx + 1]

      items_rated = set(csr_um.indices[start : end])

      items_indx = self.lsh.search(user_profile)
      items_indx = set.difference(items_indx, items_rated)

      itemps = self.items_handler[list(items_indx)]

      scores = self._cosine_similarity(user_profile, itemps)

      argscores = scores.argsort()[::-1][:n]

      scores = (scores[argscores] + 1) / 2
      top_items_indx = np.array(list(items_indx))[argscores]
      top_items = self.items.iloc[top_items_indx, :][['name', 'address', 'city', 'state', 'postal_code']]

      result_set[user_id] = [('; '.join(row[1]), round(scores[i], 8)) for i, row in enumerate(top_items.iterrows())]

    return result_set

  def _precision_recall_at_k(self, csr_um, user_indx, user_profile, items_rated, ratings, step):

    mean = ratings[self.mask[user_indx]].mean()

    items_rated = items_rated[~self.mask[user_indx]]
    ratings = ratings[~self.mask[user_indx]]

    items_indx = self.lsh.search(user_profile)

    common_items = set.intersection(set(items_rated), items_indx)
    common_items_ratings = csr_um[[user_indx], list(common_items)]
    ground_truth = (common_items_ratings -  mean > 0)

    itemps = self.items_handler[list(common_items)]

    scores = self._cosine_similarity(user_profile, itemps)
    desc_order = scores.argsort()[::-1]

    k_range = np.arange(step, len(common_items) , step)
    precision = []
    recall = []
    for k in k_range:
      rel_items = ((scores[desc_order[:k]] > 0) & ground_truth[desc_order[:k]]).sum()
      tot_rel_items = ground_truth[desc_order[:k]].sum()
      precision.append(rel_items / k)
      recall.append(rel_items / tot_rel_items)

    return k_range, precision, recall

  def _diversity(self, recomended_items):
    jd_mat = np.zeros(shape=(recomended_items.shape[0], recomended_items.shape[0]), dtype='float16')

    for i in range(recomended_items.shape[0]):
      for j in range(i, recomended_items.shape[0]):
        jd = 1 - (recomended_items[i] & recomended_items[j]).sum() / (recomended_items[i] | recomended_items[j]).sum()
        jd_mat[i, j] = jd
        jd_mat[j, i] = jd

    return np.mean(jd_mat.sum(axis=1) / (recomended_items.shape[0] - 1))

  def _diversity_at_k(self, user_profile, items_rated, step, top_n):

    items_indx = self.lsh.search(user_profile)
    items_indx = set.difference(items_indx, set(items_rated))

    item_profiles = self.items_handler[list(items_indx)]

    scores = self._cosine_similarity(user_profile, item_profiles)
    argscores = scores.argsort()[::-1]

    ohe_profiles = self.ohe_handler[list(items_indx)]
    ohe_profiles = ohe_profiles[argscores][:top_n]

    k_range = np.arange(step, ohe_profiles.shape[0] , step)
    diversity = []
    for k in k_range:
      diversity.append(self._diversity(ohe_profiles[:k]))

    return k_range, diversity

  '''
  evaluate_query method tries to evaluate a query computing the Precision@K, Recall@K and Diversity@K

  Parameters
  ----------
  user_id : alphanumeric user id
  step    : step size for metric computation
  n       : maximum number of test items
  '''
  def evaluate_query(self, user_id: str, step: int=5, n: int=200):

    csr_um = self.utility_matrix.utility_matrix.tocsr()
    users_map = self.utility_matrix.users_map

    user_indx = users_map.to_indices([user_id], append=False)[0]
    user_profile = self.users_handler[user_indx][0]

    start = csr_um.indptr[user_indx]
    end = csr_um.indptr[user_indx + 1]

    items_rated = csr_um.indices[start : end]
    ratings = csr_um.data[start : end]

    k_range, precision, recall = self._precision_recall_at_k(csr_um, user_indx, user_profile, items_rated, ratings, step)

    k_range1, diveristy = self._diversity_at_k(user_profile, items_rated, step, n)

    fig, (ax1, ax2) = plt.subplots(2, 1)
    fig.suptitle(f'Query: {user_id}', fontsize=12)
    fig.subplots_adjust(hspace=0.5)

    ax1.set_title('Precision & Recall @ K', fontsize=10)
    ax1.plot(k_range, precision, label='Precision')
    ax1.plot(k_range, recall, label='Recall')
    ax1.legend(loc='upper right')
    ax1.set_ylim(0, 1.1)
    ax1.set_xlabel('K')
    ax1.set_ylabel('Precision & Recall')
    ax1.grid(True)

    ax2.set_title('Diversity @ K', fontsize=10)
    ax2.plot(k_range1, diveristy, label='Diveristy')
    ax2.legend(loc='upper right')
    ax2.set_ylim(0, 1.1)
    ax2.set_xlabel('K')
    ax2.set_ylabel('Diversity')
    ax2.grid(True)

    plt.show()

  '''
  evaluate_system method tries to evaluate the overall performance of the recommendation system
  computing the MAP@K metric

  Parameters
  ----------
  user_ids : alphanumeric user ids
  step     : step size for metric computation
  n        : maximum number of test items
  '''
  def evaluate_system(self, user_ids, step: int=10, n: int=180):
    csr_um = self.utility_matrix.utility_matrix.tocsr()
    users_map = self.utility_matrix.users_map
    m_range = np.arange(step, n, step)

    apk = []
    for user_id in user_ids:
      user_indx = users_map.to_indices([user_id], append=False)[0]
      user_profile = self.users_handler[user_indx][0]

      start = csr_um.indptr[user_indx]
      end = csr_um.indptr[user_indx + 1]

      items_rated = csr_um.indices[start : end]
      ratings = csr_um.data[start : end]

      items_rated = items_rated[~self.mask[user_indx]]

      items_indx = self.lsh.search(user_profile)

      test_items = set.intersection(set(items_rated), items_indx)
      test_ratings = csr_um[[user_indx], list(test_items)]
      ground_truth = (test_ratings -  ratings[self.mask[user_indx]].mean() > 0)

      itemps = self.items_handler[list(test_items)]

      scores = self._cosine_similarity(user_profile, itemps)
      desc_order = scores.argsort()[::-1]

      scores = scores[desc_order]
      ground_truth = ground_truth[desc_order]
      ind_fnct = ground_truth.astype('int8')

      apk.append([1 / ind_fnct[:m].sum() * sum([((scores[:k] > 0) & ground_truth[:k]).sum() / k * ind_fnct[k] for k in range(1, scores[:m].size)]) for m in m_range])


    fig, ax = plt.subplots(1, 1, figsize=(6.3, 2))
    fig.suptitle(f'Mean Average Precision @ K', fontsize=12)

    ax.plot(m_range, np.mean(apk, axis=0), label='MAP@K')
    ax.legend(loc='upper right')
    ax.set_xlabel('K')
    ax.set_ylabel('MAP')
    ax.grid(True)

    plt.show()
    fig.savefig('map.pdf')


In [None]:
print('Building utility matrix...')
um = load_reviews(path='yelp_academic_dataset_review.json.zip', chunksize=10e4)

Building utility matrix...


In [None]:
print('Loading business...')
bus, itemfs, fset = load_business(path='yelp_academic_dataset_business.json.zip', chunksize=10e4)

Loading business...


In [None]:
print('Transforming...')
rs = RecommendationSystem(utility_matrix=um, items=bus, item_features=itemfs, features_set=fset, bpsh=100 * 2 ** 20, ncomps=210)
rs.transform()

In [24]:
top_k = um.get_active_reviewers(1000)

In [None]:
rs.evaluate_query(top_k[0], 5, 180)

In [None]:
rs.evaluate_system(top_k[:200])

In [None]:
p = rs.recommend(top_k[:10], 100)

for q in p:
  print(q, p[q])