In [None]:
! pip install numba

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import itertools
import time
import numpy as np
from math import ceil
import numba as nb
from numba.experimental import jitclass

In [3]:
class Recommender:
    def __init__(self, item_factors, user_factors):
        self.item_factors = item_factors
        self.user_factors = user_factors

    def fit(self):
        pass

    def recommend(self, N: int=10,
                      num_threads: int=0, batch_size: int=0,
                      users_items_offset: int=0):
        
        factors_items = self.item_factors.T
        
        users_c = self.user_factors.shape[0]
        items_c = self.item_factors.shape[0]
        batch = num_threads * 100 if batch_size==0 else batch_size
        
        A = np.zeros((batch, items_c))
        B = A
        users_c_b = ceil(users_c / float(batch))

        # Separate all users in batches
        for u_b in range(users_c_b):
            u_low = u_b * batch
            u_high = min([(u_b + 1) * batch, users_c])
            u_len = u_high - u_low
            # Prepare array with scores for batch of users
            users_factors = np.vstack([
                self._user_factor(u + users_items_offset)
                for u
                in range(u_low, u_high, 1)
            ]).astype(np.float32)
            
            A = np.ascontiguousarray(A)
            users_factors.dot(factors_items, out=A[:u_len])
            np.sort(A, axis=1)
            B = np.concatenate([B[:, :10], A[:, :10]])
        return B
        
    def _user_factor(self, userid):
        return self.user_factors[userid]

In [11]:
class NumbaRecommender:
    def __init__(self, item_factors, user_factors):
        self.item_factors = item_factors
        self.user_factors = user_factors

    def fit(self):
        pass
    
    @nb.jit(parallel=True, fastmath=True, boundscheck=True, nogil=True)    
    def numpy_recommend(item_factors: np.ndarray, user_factors: np.ndarray, 
                        N: int=10, num_threads: int=0, batch_size: int=0, 
                        users_items_offset: int=0, users_factors: np.ndarray=np.array([[]])):
        factors_items = item_factors.T

        users_c = user_factors.shape[0]
        items_c = item_factors.shape[0]
        batch = num_threads * 100 if batch_size==0 else batch_size

        A = np.zeros((batch, items_c))
        B = np.zeros((users_c, N))
        users_c_b = ceil(users_c / float(batch))

        # Separate all users in batches
        for u_b in nb.prange(users_c_b):
            u_low = u_b * batch
            u_high = min([(u_b + 1) * batch, users_c])
            u_len = u_high - u_low
                # Prepare array with scores for batch of users
            users_factors_batch = np.vstack(([
                user_factors[u + users_items_offset]
                for u
                in range(u_low, u_high, 1)
            ],)).astype(np.float32)

                #A = np.ascontiguousarray(A)
            A[:u_len] = np.dot(users_factors_batch, factors_items)
            np.sort(A, axis=1)
            B[u_low:u_high] = A[:,:N]
        return B

    def get_numpy_recommend_function(self, item_factors: np.ndarray, 
                                     user_factors: np.ndarray, N: int=10, num_threads: int=0, 
                                     batch_size: int=0, users_items_offset: int=0, 
                                     users_factors: np.ndarray=np.array([[]])):
        return NumbaRecommender.numpy_recommend(item_factors, user_factors, N, num_threads, 
                              batch_size, users_items_offset, users_factors)

    def recommend(self, N: int=10, num_threads: int=0, batch_size: int=0, users_items_offset: int=0):
        item_factors = self.item_factors
        user_factors = self.user_factors
        return self.get_numpy_recommend_function(item_factors, user_factors, N, num_threads, batch_size, users_items_offset)
     

In [5]:
def timer(objects, users, size):
  recommender = Recommender(objects, users)
  start = time.perf_counter()
  recommender.recommend(batch_size=size)
  end = time.perf_counter()
  print(f'without numba time={end - start}')
  return end - start

def numpy_timer(objects, users, size):
  recommender = NumbaRecommender(objects, users)
  start = time.perf_counter()
  recommender.recommend(batch_size=size)
  end = time.perf_counter()
  print(f'numba + numpy={end - start}')
  return end - start

In [6]:
K = [16, 64]
number_users = [1000, 10000, 100000]
number_items = [1000, 10000, 100000]
batch_size = [100]

In [15]:
for size, k, m, n in itertools.product(batch_size, K, number_items, number_users):
  users = np.random.sample((n, k))
  objects = np.random.sample((m, k))

  print(f'k={k}, users={n}, items={m}')
  t1 = timer(objects, users, size)
  t2 = numpy_timer(objects, users, size)
  if min(t1, t2) == t1:
      print('without optimization')
  else:
      print('numba + numpy')
  print('-' * 20)

k=16, users=1000, items=1000
without numba time=0.08547721000002184
numba + numpy=0.07298515699994823
numba + numpy
--------------------
k=16, users=10000, items=1000
without numba time=0.7722835900000291
numba + numpy=0.7561513950000744
numba + numpy
--------------------
k=16, users=100000, items=1000
without numba time=9.971664871999906
numba + numpy=8.923346388000027
numba + numpy
--------------------
k=16, users=1000, items=10000
without numba time=0.9777428450000798
numba + numpy=0.948645205000048
numba + numpy
--------------------
k=16, users=10000, items=10000
without numba time=9.556801878999977
numba + numpy=9.94921835499997
without optimization
--------------------
k=16, users=100000, items=10000
without numba time=101.22378049600002
numba + numpy=102.7546512460001
without optimization
--------------------
k=16, users=1000, items=100000
without numba time=9.614500028000066
numba + numpy=10.004418488000056
without optimization
--------------------
k=16, users=10000, items=1000

KeyboardInterrupt: ignored