In [None]:
import datetime
import numpy as np
import pandas as pd
from pytz import timezone
from scipy.sparse import csr_matrix

from recpack.algorithms import EASE, ItemKNN, Popularity, KUNN
from recpack.preprocessing.preprocessors import DataFramePreprocessor
from recpack.preprocessing.filters import MinItemsPerUser, MinUsersPerItem, NMostPopular
from recpack.util import get_top_K_values


## Checking Fit

In [None]:
def test_fit():
    values = [1, 1, 1, 1, 1, 1, 1]
    users = [0, 0, 1, 1, 2, 2, 2]
    items = [1, 2, 0, 2, 0, 1, 2]
    X = csr_matrix((values, (users, items)))
    
    algo = KUNN()
    algo.fit(X)
    
    # The fit should have stored an itemKNN model
    assert algo.knn_i_.shape == (X.shape[1], X.shape[1])
    
    itemKNN = ItemKNN(K=1)
    itemKNN.fit(X)
    # The itemKNN model should be an itemKNN model
    np.testing.assert_array_equal(itemKNN.similarity_matrix_.toarray(), algo.knn_i_.toarray())
    

In [None]:
def test_predict_k_1():
    kunn = KUNN(Ku=1, Ki=1)
    
    values = [1, 1, 1, 1, 1, 1, 1]
    users = [0, 0, 1, 1, 2, 2, 2]
    items = [1, 2, 0, 2, 0, 1, 2]
    test_matrix = csr_matrix((values, (users, items)), shape=(5, 3))

    kunn.fit(test_matrix)
    
    
    values_pred = [1, 1, 1, 1]
    users_pred = [3, 3, 4, 4]
    items_pred = [0, 1, 1, 2]
    pred_matrix = csr_matrix((values_pred, (users_pred, items_pred)), shape=test_matrix.shape)

    prediction = kunn.predict(pred_matrix)
    
    # Manual computation of the formulas in the paper
    # We'll compute similarity 3, 2
    u = 3
    i = 2

    ## USER SIMILARITY ##

    # use ItemKNN class to compute user neighbours
    # By fitting it on the transpose of the combination of the test and pred matrices     
    # In this case test_matrix and pred matrix are fully disjunct
    # TODO: add a test with non fully disjunct test and pred matrices,
    #       To make sure it can be used with the other splitters as well.    
    userknn = ItemKNN(K=1)
    userknn.fit((test_matrix + pred_matrix).T.tocsr())

    # Get the one user similiar to user u
    v = np.argmax(userknn.similarity_matrix_[u])
    # We are "lucky", the most similar user is one from the training users,
    # so don't need to do finicking to remove the unwanted similarities
    assert v == 2
    
    # User 2 has seen item 2 we are trying to predict
    # -> the R_v_i term in the formula is 1
    # compute the c(v) value
    c_v = test_matrix[v].nnz

    # Second summation, over all items v and u have in common
    # user 3 and user 2 have 2 items in common (0 and 1)
    # u's interactions are not in the test_matrix,
    # so we need increase the count of occurences + 1
    c_j_0 = test_matrix[:,0].nnz + 1
    c_j_1 = test_matrix[:,1].nnz + 1
    
    # Compute user similarity.
    # 1/sqrt(cv) + sum(j in [0, 1] 1/sqrt(c(j)))
    s_u = ( 1/c_v**0.5) * ((1/c_j_0**0.5) + (1/c_j_1**0.5) )
    
    ## ITEM SIMILARITY ## 

    # K = 1 -> argmax gives us the most similar item
    j = np.argmax(itemKNN.similarity_matrix_[i])
    assert j == 0
    
    # c(j) in paper, only need it for the one j
    # User u has interacted with item 0 -> R_u_j = 1
    c_j = test_matrix[:,j].nnz

    # 2 users have seen both items 0 and 2 => user 1,2
    np.testing.assert_array_equal(
        test_matrix[:,j].multiply(test_matrix[:,i]).toarray().nonzero()[0],
        np.array([1,2])
    )
    
    # Compute history lengths of the two users
    c_v_1 = test_matrix[1].nnz
    c_v_2 = test_matrix[2].nnz
    
    # Compute item similarity
    # 1/sqrt(c_j) * sum(v in [1,2]) 1/sqrt(c(v))
    s_i = (1/c_j**0.5) * ((1/c_v_1**0.5) + (1/c_v_2)**0.5)
    
    # Compute similarity
    # (s_u + s_i) / (sqrt(c(u)*c(i)))
    s_u_i = (s_u + s_i) / (pred_matrix[u].nnz * test_matrix[i].nnz)**0.5
    
    np.testing.assert_almost_equal(prediction[3,2], s_u_i)


In [None]:
def test_predict_k_2():
    kunn = KUNN(Ku=2, Ki=2)

    values = [1, 1, 1, 1, 1, 1, 1]
    users = [0, 0, 1, 1, 2, 2, 2]
    items = [1, 2, 0, 2, 0, 1, 2]
    test_matrix = csr_matrix((values, (users, items)), shape=(5, 3))

    kunn.fit(test_matrix)

    values_pred = [1, 1, 1, 1]
    users_pred = [3, 3, 4, 4]
    items_pred = [0, 1, 1, 2]
    pred_matrix = csr_matrix((values_pred, (users_pred, items_pred)), shape=test_matrix.shape)

    predictions = kunn.predict(pred_matrix)
    
    # Fit the KNN models
    itemknn = ItemKNN(K=2)
    userknn = ItemKNN(K=2)
    itemknn.fit(test_matrix)
    userknn.fit((test_matrix+pred_matrix).T.tocsr())
    
    # Predict score for user 3, item 2
    u = 3
    i = 2

    # V is set of users that are neighbours of u
    V = userknn.similarity_matrix_[u].nonzero()[1]
    # We are 'lucky' our new users are not similar to the other new users 
    #     -> No leakage of info
    np.testing.assert_array_equal(V, np.array([1,2]))

    # J is set of items that are neighbours of i
    J = itemknn.similarity_matrix_[i].nonzero()[1]
    
    # Compute 1/sqrt(c(v)) for each v
    one_over_sqrt_v = {v: 1/test_matrix[v].nnz ** 0.5 for v in V}
    # Compute 1/sqrt(c(j)) for each j
    one_over_sqrt_j = {j: 1/test_matrix[j].nnz ** 0.5 for j in J}
    
    ## USER SIMILARITY ##
    # Iteratively compute the user sim
    score = 0
    for v in V: # 1st sum : v in KNN(u)
        if test_matrix[v, i] != 0: # R_v_i in the first sum.
            # Compute transitive part
            trans_sum = 0
            for j in pred_matrix[u].nonzero()[1]: # Second sum, with R_u_i = 1

                if test_matrix[v, j] != 0: # R_u_j = 1 clause in sum
                    
                    c_j = test_matrix[:, j].nnz
                    
                    # "HACK" To count the interaction of user u in the pred matrix
                    if test_matrix[u,j] == 0: 
                        c_j += 1
                    # End of "HACK"

                    trans_sum += (1/c_j**0.5)

            score += trans_sum * one_over_sqrt_v[v]

    user_sim = score
    
    ## ITEM SIMILARITY ##
    # Compute the item similarity iteratively
    score = 0
    for j in J: # 1st sum j in KNN(i)
        if pred_matrix[u, j] != 0: # R_u_j value
            trans_sum = 0
            for v in test_matrix[:,j].nonzero()[0]: # Second sum, with R_v_j clause
                if test_matrix[v,i] != 0: # R_v_i clause in second sum
                    c_v = test_matrix[v,:].nnz
                    trans_sum += 1/c_v**0.5

            score += trans_sum * one_over_sqrt_j[j]

    item_sim = score
    
    ## FINAL SCORE ##
    final_score = (user_sim + item_sim) / (pred_matrix[u].nnz * test_matrix[:,i].nnz)**0.5
    
    np.testing.assert_almost_equal(prediction[u,i], final_score)

In [None]:
test_predict_k_2()