In [1]:
#Calc Standard Discounted Cumulative Squared Error

import numpy as np
import pandas as pd


def sdcse(relevance_vector, predicted_vector):
    
    """ Calculate STANDARD DISCOUNTED CUMULATIVE SQUARED ERROR
    as described in the paper Evaluating Top-N Recommendations
    Using Ranked Error Approach: An Empirical Analysis


    Parameters
    ----------
    relevance_vector : relevance vector
        relevance vector for each user
    predicted_vector : predicted scores for N items
    
    Each relvec and each predvec are within [1, 5] and [0, 1]
    intervals for explicit and implicit feedback.

    Returns
    -------
    float
        the calculated SDCSE
    """
        
        
    # Sort the relevance and predicted vectors in descending order
    relvec = np.sort(relevance_vector)[::-1]
    predvec = np.sort(predicted_vector)[::-1]
    
    # Initialize variables for discounted cumulative squared error
    # at N, descending squared error, and worst DCSE at N
    DCSE_at_N = 0
    n = len(relvec)
    SE_des = []
    WDCSE_at_N = 0
    
    # Calculate the Discounted Cumulative Squared Error (DCSE) at N
    for i in range (1,n+1):
        
        squared_difference = (relvec[i - 1] - predvec[i - 1])**2
        DCSE = squared_difference / np.log2(i + 1)
        DCSE_at_N += DCSE
    
    # Calculate the Squared Error vector
    for j in range(1,n+1):
        element = (relvec[j - 1] - predvec[j - 1])**2
        SE_des.append(element)

    # Sort the Squared Error vector in descending order
    SE_des.sort(reverse=True)

    # Calculate the Worst Discounted Cumulative Squared Error (WDCSE) at N
    for k in range(1,n+1):
        element2 = SE_des[k - 1] / np.log2(k + 1)
        WDCSE_at_N += element2
    
    # Check if nominator and denominator are 0 (result in "nan")
    if DCSE_at_N == 0 and WDCSE_at_N == 0:
        return 0
    else:
            SDCSE_at_N = 1 - (DCSE_at_N / WDCSE_at_N)
            return SDCSE_at_N
        
        


In [2]:
def get_predicted_and_actual(user_id, model, plays_train, plays_test, k):
    # Get the model's top-k recommendations for the user
    ids, scores = model.recommend(user_id, plays_train[user_id], N=k)
    user_recommendations = [(x, y) for x, y in zip(ids, scores)]

    # Get the indices of the user's actual interactions in the test set
    actual_indices = set(plays_test[user_id].nonzero()[1])

    # Get the indices of the recommended items for the user
    recommended_indices = [item_id for item_id, _ in user_recommendations]

    # Get the intersection of the actual and recommended item indices (i.e., the hits)
    hit_indices = actual_indices.intersection(recommended_indices)

    # Convert the hit indices to arrays of recommended and actual items
    predicted_items = np.array([1 if i in recommended_indices else 0 for i in range(plays_train.shape[1])])
    actual_items = np.array([1 if i in actual_indices else 0 for i in range(plays_train.shape[1])])

    return predicted_items, actual_items

In [3]:
from implicit.datasets.lastfm import get_lastfm
from implicit.evaluation import train_test_split

# Get lastfm dataset
artists, users, artist_user_plays = get_lastfm()

# transpose item-user to user-item matrix
user_plays = artist_user_plays.T.tocsr()

# split user-item matrix into training set and test set (80%-20%)
train_split, test_split = train_test_split(user_plays)

In [4]:
from implicit.als import AlternatingLeastSquares

# Choose ALS as method, train the model with the training dataset
model = AlternatingLeastSquares(factors=64, regularization=0.05, alpha=2.0)
model.fit(train_split)

  0%|          | 0/15 [00:00<?, ?it/s]

In [5]:
# Calc SDCSE for a single user

user_id = 12345
K = 10

# Function call to get the numpy-array of predicted and actual items of a single user
predicted, actual = get_predicted_and_actual(user_id, model, train_split, test_split, K)

# Function call for the calulation of SDCSE of a single user
result = sdcse(actual, predicted)

# Result
print(result)

0.7210570543488701


In [6]:
# Counter for how many user the average SDCSE get calulated
user_count = 10
# Float result of a single SDCSE calculation
result = 0
# User index where to start the calculation
i = 100
# Nominator for average calculation
summ = 0
# Denominator for average calculation, where no "nan" is returned from sdcse()
successful = 0

# Calculate the average SDCSE for the given user count
for i in range(user_count):
    predicted, actual = get_predicted_and_actual(i, model, train_split, test_split, 10)
    result = sdcse(actual, predicted)
    if(result == 'nan'):
        continue
    else:
        successful += 1
        summ += result
    
average_sdcse = summ / successful

print(average_sdcse)

0.5426394045324289
