In [1]:
import numpy as np
import pandas as pd
import csv
import math


In [2]:
ratings = np.random.rand(10, 1)
ratings

array([[0.56818966],
       [0.04751378],
       [0.4727467 ],
       [0.92182323],
       [0.62708224],
       [0.88296485],
       [0.50020001],
       [0.84162086],
       [0.05875582],
       [0.1517881 ]])

In [3]:
ITEM_NUM = 10

In [4]:
all_items = set(range(ITEM_NUM))

In [5]:
training_items = [4, 3, 6, 9]

In [6]:
test_items = list(all_items - set(training_items))
test_items

[0, 1, 2, 5, 7, 8]

In [47]:
import numpy as np
import heapq
import torch
from torch.utils.data import DataLoader, Dataset

# Functions to compute metrics
def ranklist_by_sorted(user_pos_test, test_items, rating, Ks):
    item_score = {i: rating[i] for i in test_items}
    K_max = max(Ks)
    K_max_item_score = heapq.nlargest(K_max, item_score, key=item_score.get)

    r = [1 if i in user_pos_test else 0 for i in K_max_item_score]
    auc = get_auc(item_score, user_pos_test)
    return r, auc

def get_auc(item_score, user_pos_test):
    item_score = sorted(item_score.items(), key=lambda kv: kv[1], reverse=True)
    item_sort = [x[0] for x in item_score]
    posterior = [x[1] for x in item_score]

    r = [1 if i in user_pos_test else 0 for i in item_sort]
    auc = AUC(ground_truth=r, prediction=posterior)
    return auc

def precision_at_k(r, k):
    r = np.asarray(r)[:k]
    return np.mean(r)

def recall_at_k(r, k, all_pos_num):
    r = np.asarray(r)[:k]
    return np.sum(r) / all_pos_num

def ndcg_at_k(r, k):
    r = np.asarray(r)[:k]
    if r.size == 0:
        return 0.
    return np.sum(r / np.log2(np.arange(2, r.size + 2)))

def hit_at_k(r, k):
    r = np.asarray(r)[:k]
    return 1.0 if np.sum(r) > 0 else 0.0

def AUC(ground_truth, prediction):
    from sklearn.metrics import roc_auc_score
    return roc_auc_score(ground_truth, prediction)

# Custom Dataset for DataLoader
class UserDataset(Dataset):
    def __init__(self, num_users, item_num):
        self.num_users = num_users
        self.item_num = item_num
        self.data = self._generate_data()

    def _generate_data(self):
        data = []
        for _ in range(self.num_users):
            preds = np.random.uniform(0.8, 1.0, self.item_num)
            reals = np.random.choice([0, 1], size=self.item_num)
            user_pos_test = [i for i, x in enumerate(reals) if x == 1]
            data.append((torch.tensor(preds, dtype=torch.float32), torch.tensor(reals, dtype=torch.int32), user_pos_test))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# Function to pad user_pos_test lists to the same length
def pad_user_pos_test(batch):
    preds_batch, reals_batch, user_pos_test_batch = zip(*batch)
    max_len = max(len(ut) for ut in user_pos_test_batch)
    padded_user_pos_test_batch = []
    for user_pos_test in user_pos_test_batch:
        padded = user_pos_test + [-1] * (max_len - len(user_pos_test))  # use -1 as padding
        padded_user_pos_test_batch.append(padded)
    return torch.stack(preds_batch), torch.stack(reals_batch), torch.tensor(padded_user_pos_test_batch)

# Parameters
num_users = 5
batch_size = 3
Ks = [5, 10, 15]
ITEM_NUM = 10
all_items = set(range(ITEM_NUM))
training_items = [4, 3, 6, 9]
test_items = list(all_items - set(training_items))

# Dataset and DataLoader
dataset = UserDataset(num_users, ITEM_NUM)
user_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_user_pos_test)

# Result dictionary
result = {'precision': np.zeros(len(Ks)), 'recall': np.zeros(len(Ks)), 'ndcg': np.zeros(len(Ks)), 'hit_ratio': np.zeros(len(Ks)), 'auc': 0.}
all_precision, all_recall, all_ndcg, all_hit_ratio, all_auc = [], [], [], [], []

# Processing batches
for batch in user_loader:
    preds_batch, reals_batch, user_pos_test_batch = batch
    batch_precision, batch_recall, batch_ndcg, batch_hit_ratio, batch_auc = [], [], [], [], []

    for i in range(len(preds_batch)):
        preds = preds_batch[i].numpy()
        reals = reals_batch[i].numpy()
        user_pos_test = [x for x in user_pos_test_batch[i].numpy() if x != -1]  # remove padding

        r, auc = ranklist_by_sorted(user_pos_test, test_items, preds, Ks)

        precision, recall, ndcg, hit_ratio = [], [], [], []
        for K in Ks:
            precision.append(precision_at_k(r, K))
            recall.append(recall_at_k(r, K, len(user_pos_test)))
            ndcg.append(ndcg_at_k(r, K))
            hit_ratio.append(hit_at_k(r, K))

        batch_precision.append(precision)
        batch_recall.append(recall)
        batch_ndcg.append(ndcg)
        batch_hit_ratio.append(hit_ratio)
        batch_auc.append(auc)

    all_precision.extend(batch_precision)
    all_recall.extend(batch_recall)
    all_ndcg.extend(batch_ndcg)
    all_hit_ratio.extend(batch_hit_ratio)
    all_auc.extend(batch_auc)

# Calculate mean values
result['precision'] = np.mean(all_precision, axis=0)
result['recall'] = np.mean(all_recall, axis=0)
result['ndcg'] = np.mean(all_ndcg, axis=0)
result['hit_ratio'] = np.mean(all_hit_ratio, axis=0)
result['auc'] = np.mean(all_auc)

result

{'precision': array([0.48      , 0.53333333, 0.53333333]),
 'recall': array([0.47      , 0.63333333, 0.63333333]),
 'ndcg': array([1.49680485, 1.7817706 , 1.7817706 ]),
 'hit_ratio': array([1., 1., 1.]),
 'auc': 0.45}

In [8]:
r, auc 

([1, 0, 0, 1, 0, 0], 0.75)