### This notebook produces the metrics for a specific recommendation system and dataset for all the baselines.
# Imports


In [None]:
import pandas as pd
import numpy as np
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
from pathlib import Path
import pickle
from collections import defaultdict
import time
import torch
import torch.nn as nn
import copy
import torch.nn.functional as F
import optuna
import logging
import matplotlib.pyplot as plt
import ipynb
import importlib
import sys
import shap

In [None]:
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512"
torch.cuda.set_per_process_memory_fraction(0.9, 0)  # This sets the fraction of GPU memory for PyTorch, you can adjust this value

In [None]:
data_name = "ML1M" ### Can be ML1M, Yahoo, Pinterest
recommender_name = "MLP" ### Can be MLP, VAE, NCF

DP_DIR = Path("processed_data", data_name) 
export_dir = Path(os.getcwd())
files_path = Path(export_dir.parent, DP_DIR)
checkpoints_path = Path(export_dir.parent, "checkpoints")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
new_file_name = "NEW_FILE_NAME"

In [None]:
output_type_dict = {
    "VAE":"multiple",
    "MLP":"single",
    "NCF": "single"}

num_users_dict = {
    "ML1M":6037,
    "Yahoo":13797, 
    "Pinterest":19155}

num_items_dict = {
    "ML1M":3381,
    "Yahoo":4604, 
    "Pinterest":9362}


recommender_path_dict = {
    ("ML1M","VAE"): Path(checkpoints_path, "VAE_ML1M_0.0007_128_10.pt"),
    ("ML1M","MLP"):Path(checkpoints_path, "MLP1_ML1M_0.0076_256_7.pt"),
    ("ML1M","NCF"):Path(checkpoints_path, "NCF_ML1M_5e-05_64_16.pt"),
    
    ("Yahoo","VAE"): Path(checkpoints_path, "VAE_Yahoo_0.0001_128_13.pt"),
    ("Yahoo","MLP"):Path(checkpoints_path, "MLP2_Yahoo_0.0083_128_1.pt"),
    ("Yahoo","NCF"):Path(checkpoints_path, "NCF_Yahoo_0.001_64_21_0.pt"),
    
    ("Pinterest","VAE"): Path(checkpoints_path, "VAE_Pinterest_12_18_0.0001_256.pt"),
    ("Pinterest","MLP"):Path(checkpoints_path, "MLP_Pinterest_0.0062_512_21_0.pt"),
    ("Pinterest","NCF"):Path(checkpoints_path, "NCF2_Pinterest_9e-05_32_9_10.pt"),}


hidden_dim_dict = {
    ("ML1M","VAE"): None,
    ("ML1M","MLP"): 32,
    ("ML1M","NCF"): 8,

    ("Yahoo","VAE"): None,
    ("Yahoo","MLP"):32,
    ("Yahoo","NCF"):8,
    
    ("Pinterest","VAE"): None,
    ("Pinterest","MLP"):512,
    ("Pinterest","NCF"): 64,
}

LXR_checkpoint_dict = {
    ("ML1M","VAE"): ('LXR_ML1M_VAE_26_38_128_3.185652725834087_1.420642300151426.pt',128),
    ("ML1M","MLP"): ('LXR_ML1M_MLP_12_39_64_11.59908096547193_0.1414854294885049.pt',64),
    ("ML1M","NCF"): ('LXR_ML1M_NCF_17_38_64_14.950042796023537_0.1778309603009678.pt',64),

    ("Yahoo","VAE"): ('LXR_Yahoo_VAE_neg-1.5pos_combined_19_26_128_18.958765029913238_4.92235962483309.pt',128),
    ("Yahoo","MLP"):('LXR_Yahoo_MLP_neg-pos_combined_last_29_37_128_12.40692505393434_0.19367009952856118.pt',128),
    ("Yahoo","NCF"): ('LXR_Yahoo_NCF_neg-pos_combined_loss_14_14_32_16.01464392466348_6.880015038643981.pt', 32),

    ("Pinterest","VAE"): ('LXR_Pinterest_VAE_comb_4_27_32_6.3443735346179855_1.472868807603448.pt',32),
    ("Pinterest","MLP"): ('LXR_Pinterest_MLP_0_5_16_10.059416809308486_0.705778173474644.pt',16),
    ("Pinterest","NCF"): ('LXR_Pinterest_NCF_combined__neg-1.5pos_0_26_32_13.02585523498726_12.8447247971534.pt', 32)
}

In [None]:
output_type = output_type_dict[recommender_name] ### Can be single, multiple
num_users = num_users_dict[data_name] 
num_items = num_items_dict[data_name] 
num_features = num_items_dict[data_name]

hidden_dim = hidden_dim_dict[(data_name,recommender_name)]
recommender_path = recommender_path_dict[(data_name,recommender_name)]

lxr_path = LXR_checkpoint_dict[(data_name,recommender_name)][0]
lxr_dim = LXR_checkpoint_dict[(data_name,recommender_name)][1]

## Data and baselines imports

In [None]:
train_data = pd.read_csv(Path(files_path,f'train_data_{data_name}.csv'), index_col=0)
test_data = pd.read_csv(Path(files_path,f'test_data_{data_name}.csv'), index_col=0)
with open(Path(files_path,f'pop_dict_{data_name}.pkl'), 'rb') as f:
    pop_dict = pickle.load(f)
train_array = train_data.to_numpy()
test_array = test_data.to_numpy()
items_array = np.eye(num_items)
all_items_tensor = torch.Tensor(items_array).to(device)

In [None]:
with open(Path(files_path, f'jaccard_based_sim_{data_name}.pkl'), 'rb') as f:
    jaccard_dict = pickle.load(f) 

In [None]:
with open(Path(files_path, f'cosine_based_sim_{data_name}.pkl'), 'rb') as f:
    cosine_dict = pickle.load(f) 

In [None]:
with open(Path(files_path, f'pop_dict_{data_name}.pkl'), 'rb') as f:
    pop_dict = pickle.load(f) 

In [None]:
with open(Path(files_path, f'item_to_cluster_{recommender_name}_{data_name}.pkl'), 'rb') as f:
    item_to_cluster = pickle.load(f) 

In [None]:
with open(Path(files_path, f'shap_values_{recommender_name}_{data_name}.pkl'), 'rb') as f:
    shap_values= pickle.load(f) 

In [None]:
for i in range(num_items):
    for j in range(i, num_items):
        jaccard_dict[(j,i)]= jaccard_dict[(i,j)]
        cosine_dict[(j,i)]= cosine_dict[(i,j)]

In [None]:
pop_array = np.zeros(len(pop_dict))
for key, value in pop_dict.items():
    pop_array[key] = value

In [None]:
kw_dict = {'device':device,
          'num_items': num_items,
           'num_features': num_items, 
            'demographic':False,
          'pop_array':pop_array,
          'all_items_tensor':all_items_tensor,
          'items_array':items_array,
          'output_type':output_type,
          'recommender_name':recommender_name}

# Configurations

In [None]:
from ipynb.fs.defs.baselines_functions import *
importlib.reload(ipynb.fs.defs.baselines_functions)
from ipynb.fs.defs.baselines_functions import *

lime = LimeBase(distance_to_proximity)

from ipynb.fs.defs.recommenders_architecture import *
importlib.reload(ipynb.fs.defs.recommenders_architecture)
from ipynb.fs.defs.recommenders_architecture import *

from ipynb.fs.defs.help_functions import *
importlib.reload(ipynb.fs.defs.help_functions)
from ipynb.fs.defs.help_functions import *

In [None]:
recommender = load_recommender(data_name, hidden_dim, checkpoints_path, recommender_path, **kw_dict)

# Baselines functions
### Every function produces explanations for a designated baseline, resulting in a dictionary that maps items from the user's history to their explanation scores based on that baseline.

In [None]:
#Cosine based similarities between users and items
def find_cosine_mask(x, item_id, item_cosine):
    user_hist = x # remove the positive item we want to explain from the user history
    user_hist[item_id] = 0
    item_cosine_dict = {}
    for i,j in enumerate(user_hist>0):
        if j:
            if (i,item_id) in item_cosine:
                item_cosine_dict[i]=item_cosine[(i,item_id)]
            else:
                item_cosine_dict[i]=0

    return item_cosine_dict

In [None]:
def find_lime_mask(x, item_id, min_pert, max_pert, num_of_perturbations, kernel_func, feature_selection, recommender, num_samples=10, method = 'POS', **kw_dict):
    user_hist = x # remove the positive item we want to explain from the user history
    user_hist[item_id] = 0
    lime.kernel_fn = kernel_func
    neighborhood_data, neighborhood_labels, distances, item_id = get_lime_args(user_hist, item_id, recommender, all_items_tensor, min_pert = min_pert, max_pert = max_pert, num_of_perturbations = num_of_perturbations, seed = item_id, **kw_dict)
    if method=='POS':
        most_pop_items  = lime.explain_instance_with_data(neighborhood_data, neighborhood_labels, distances, item_id, num_samples, feature_selection, pos_neg='POS')
    if method=='NEG':
        most_pop_items  = lime.explain_instance_with_data(neighborhood_data, neighborhood_labels, distances, item_id, num_samples, feature_selection ,pos_neg='NEG')
        
    return most_pop_items 

In [None]:
def find_lire_mask(x, item_id, num_of_perturbations, kernel_func, feature_selection, recommender, proba=0.1, method = 'POS', **kw_dict):
    user_hist = x 
    user_hist[item_id] = 0 # remove the positive item we want to explain from the user history
    lime.kernel_fn = kernel_func
    neighborhood_data, neighborhood_labels, distances, item_id = get_lire_args(user_hist, item_id, recommender, all_items_tensor, train_array, num_of_perturbations = num_of_perturbations, seed = item_id, proba=0.1, **kw_dict)
    if method=='POS':
        most_pop_items = lime.explain_instance_with_data(neighborhood_data, neighborhood_labels, distances, item_id, num_of_perturbations, feature_selection, pos_neg='POS')
    
    return most_pop_items

In [None]:
def find_deep_shap_mask(user_tensor, item_id, explainer):
    # Reshape user_tensor to be 2D
    if user_tensor.dim() == 1:
        user_tensor = user_tensor.unsqueeze(0)
    user_tensor = user_tensor.float().to(device).requires_grad_(True)
    shap_values = explainer.shap_values(user_tensor)[item_id][0] #get deep shap values
    
    shap_value_dict = {}
    for i, shap_value in enumerate(shap_values):  # Iterate over the SHAP values
        if i != item_id:  # Exclude the item we want to explain
            shap_value_dict[i] = shap_value.item() # Convert tensor to Python scalar and add to dictionary
        else:
            shap_value_dict[i] = 0  # Set the value to 0 for the item we are explaining

    return shap_value_dict

In [None]:
def find_shapley_mask(user_tensor, user_id, model, shap_values, item_to_cluster):
    item_shap = {}
    shapley_values = shap_values[shap_values[:, 0].astype(int) == user_id][:,1:]
    user_vector = user_tensor.cpu().detach().numpy().astype(int)

    for i in np.where(user_vector.astype(int) == 1)[0]:
        items_cluster = item_to_cluster[i]
        item_shap[i] = shapley_values.T[int(items_cluster)][0]

    return item_shap  

In [None]:
def find_accent_mask(user_tensor, user_id, item_tensor, item_id, recommender_model, top_k):
   
    items_accent = defaultdict(float)
    factor = top_k - 1
    user_accent_hist = user_tensor.cpu().detach().numpy().astype(int)

    #Get topk items
    sorted_indices = list(get_top_k(user_tensor, user_tensor, recommender_model, **kw_dict).keys())
    
    if top_k == 1:
        # When k=1, return the index of the first maximum value
        top_k_indices = [sorted_indices[0]]
    else:
        top_k_indices = sorted_indices[:top_k]
   

    for iteration, item_k_id in enumerate(top_k_indices):

        # Set topk items to 0 in the user's history
        user_accent_hist[item_k_id] = 0
        user_tensor = torch.FloatTensor(user_accent_hist).to(device)
       
        item_vector = items_array[item_k_id]
        item_tensor = torch.FloatTensor(item_vector).to(device)
              
        # Check influence of the items in the history on this specific item in topk
        fia_dict = find_fia_mask(user_tensor, item_tensor, item_k_id, recommender_model)
         
        # Sum up all differences between influence on top1 and other topk values
        if not iteration:
            for key in fia_dict.keys():
                items_accent[key] *= factor
        else:
            for key in fia_dict.keys():
                items_accent[key] -= fia_dict[key]
       
    for key in items_accent.keys():
        items_accent[key] *= -1    

    return items_accent

In [None]:
def find_lxr_mask(x, item_tensor, explainer):
    user_hist = x
    expl_scores = explainer(user_hist, item_tensor)
    x_masked = user_hist*expl_scores
    item_sim_dict = {}
    for i,j in enumerate(x_masked!=0):
        if j:
            item_sim_dict[i]=x_masked[i] 
        
    return item_sim_dict

In [None]:
class Explainer(nn.Module):
    def __init__(self, user_size, item_size, hidden_size):
        super(Explainer, self).__init__()
        
        self.users_fc = nn.Linear(in_features = user_size, out_features=hidden_size).to(device)
        self.items_fc = nn.Linear(in_features = item_size, out_features=hidden_size).to(device)
        self.bottleneck = nn.Sequential(
            nn.Tanh(),
            nn.Linear(in_features = hidden_size*2, out_features=hidden_size).to(device),
            nn.Tanh(),
            nn.Linear(in_features = hidden_size, out_features=user_size).to(device),
            nn.Sigmoid()
        ).to(device)
        
        
    def forward(self, user_tensor, item_tensor):
        user_output = self.users_fc(user_tensor.float())
        item_output = self.items_fc(item_tensor.float())
        combined_output = torch.cat((user_output, item_output), dim=-1)
        expl_scores = self.bottleneck(combined_output).to(device)

        return expl_scores

In [None]:
def load_explainer(fine_tuning=False, lambda_pos=None, lambda_neg=None, alpha=None):
    explainer = Explainer(num_features, num_items, lxr_dim)
    lxr_checkpoint = torch.load(Path(checkpoints_path, lxr_path))
    explainer.load_state_dict(lxr_checkpoint)
    explainer.eval()
    for param in explainer.parameters():
        param.requires_grad= False
    return explainer

# Evaluation help functions

In [None]:
def single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender_model, user_id = None, mask_type = None, explainer=None):
    '''
    This function invokes various explanation functions
    and returns a dictionary of explanations, sorted by their scores.
    '''
    user_hist_size = np.sum(user_vector)

    if mask_type == 'lime':
        POS_sim_items = find_lime_mask(user_vector, item_id, 50, 100, 150, distance_to_proximity,'highest_weights', recommender_model, num_samples=user_hist_size, **kw_dict)
    elif mask_type == 'lire':
        POS_sim_items = find_lire_mask(user_vector, item_id, user_hist_size, distance_to_proximity, 'highest_weights', recommender_model,proba = 0.1, **kw_dict)
    else:
        if mask_type == 'cosine':
            sim_items = find_cosine_mask(user_tensor, item_id, cosine_dict)
        elif mask_type == 'shap':
            sim_items = find_shapley_mask(user_tensor, user_id, recommender_model, shap_values, item_to_cluster)    
        elif mask_type == 'deep_shap':
            user_tensor = user_tensor.float().to(device).requires_grad_(True)
            sim_items = find_deep_shap_mask(user_tensor, item_id, explainer)
        elif mask_type == 'accent':
            sim_items = find_accent_mask(user_tensor, user_id, item_tensor, item_id, recommender_model, 5)
        elif mask_type == 'lxr':
            explainer = load_explainer(False)
            sim_items = find_lxr_mask(user_tensor, item_tensor, explainer)
        POS_sim_items = list(sorted(sim_items.items(), key=lambda item: item[1],reverse=True))[0:user_hist_size]
        
    return POS_sim_items

In [None]:
create_dictionaries = True # if it is the first time generating the explanations - assing "True"

include_deep_shap = True # if you want to create an explantion of the deep shap baseline

if include_deep_shap: # create the explainer
    subset_indices = np.random.choice(train_array.shape[0], size=10, replace=False)
    background_data = train_array[subset_indices]
    background_tensor = torch.FloatTensor(background_data).requires_grad_(True).to(device)
    all_items_tensor = all_items_tensor.float().to(device).requires_grad_(True)
    if recommender_name == "MLP":
        model = MLPWrapper(recommender, all_items_tensor)
    elif recommender_name == "NCF":
        model = NCFWrapper(recommender, all_items_tensor)
    else: # recommender_name == "VAE"
        model = VAEWrapper(recommender)
    deep_shap_explainer = shap.DeepExplainer(model, background_tensor)

    
    
if create_dictionaries:
    torch.cuda.empty_cache()
    recommender.eval() # Evaluate the model on the test set

    cosine_expl_dict = {}
    lime_expl_dict = {}
    lire_expl_dict = {}
    accent_expl_dict = {}
    shap_expl_dict = {}
    deep_shap_expl_dict = {}
    lxr_expl_dict = {}

    for i in range(test_array.shape[0]):
    #for i in range(3):
        if i%500 == 0:
            print(i)
        start_time = time.time()
        user_vector = test_array[i]
        user_tensor = torch.FloatTensor(user_vector).to(device)
        user_id = int(test_data.index[i])

        item_id = int(get_user_recommended_item(user_tensor, recommender, **kw_dict).detach().cpu().numpy())
        item_vector =  items_array[item_id]
        item_tensor = torch.FloatTensor(item_vector).to(device)

        user_vector[item_id] = 0
        user_tensor[item_id] = 0

        recommender.to(device)

        cosine_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender, mask_type= 'cosine')
        lime_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender, mask_type= 'lime')
        lire_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender, mask_type= 'lire')
        accent_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender, mask_type= 'accent')
        shap_expl_dict[user_id] = single_user_expl(user_vector, user_tensor,item_id, item_tensor, num_items, recommender, mask_type= 'shap',user_id = user_id)
        deep_shap_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, all_items_tensor, num_items, recommender, mask_type='deep_shap', explainer=deep_shap_explainer)
        lxr_expl_dict[user_id] = single_user_expl(user_vector, user_tensor, item_id, item_tensor, num_items, recommender, mask_type= 'lxr')


    with open(Path(files_path,f'{recommender_name}_cosine_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(cosine_expl_dict, handle)

    with open(Path(files_path,f'{recommender_name}_lime_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(lime_expl_dict, handle)

    with open(Path(files_path,f'{recommender_name}_lire_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(lire_expl_dict, handle)

    with open(Path(files_path,f'{recommender_name}_accent_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(accent_expl_dict, handle) 

    with open(Path(files_path,f'{recommender_name}_shap_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(shap_expl_dict, handle)

    with open(Path(files_path,f'{recommender_name}_deep_shap_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(deep_shap_expl_dict, handle)

    with open(Path(files_path,f'{recommender_name}_lxr_expl_dict.pkl'), 'wb') as handle:
        pickle.dump(lxr_expl_dict, handle)

In [None]:
def eval_one_expl_type(expl_name):
    '''
    This function aggregates explanations for all test users
    and computes the average metric values across the entire test set.
    '''
    
    print(f' ============ Start explaining {data_name} {recommender_name} by {expl_name} ============')
    with open(Path(files_path,f'{recommender_name}_{expl_name}_expl_dict.pkl'), 'rb') as handle:
        expl_dict = pickle.load(handle)
    recommender.eval()     # Evaluate the model on the test set

    num_of_bins = 11
    
    users_DEL = np.zeros(num_of_bins)
    users_INS = np.zeros(num_of_bins)
    NDCG = np.zeros(num_of_bins)
    POS_at_5 = np.zeros(num_of_bins)
    POS_at_10 = np.zeros(num_of_bins)
    POS_at_20 = np.zeros(num_of_bins)

    num_of_bins = 10

    with torch.no_grad():
        for i in range(test_array.shape[0]):
            start_time = time.time()
            user_vector = test_array[i]
            user_tensor = torch.FloatTensor(user_vector).to(device)
            user_id = int(test_data.index[i])

            item_id = int(get_user_recommended_item(user_tensor, recommender, **kw_dict).detach().cpu().numpy())
            item_vector =  items_array[item_id]
            item_tensor = torch.FloatTensor(item_vector).to(device)
            user_vector[item_id] = 0
            user_tensor[item_id] = 0

            user_expl = expl_dict[user_id]
            res = single_user_metrics(user_vector, user_tensor, item_id, item_tensor, num_of_bins, recommender, user_expl, **kw_dict)
            
            users_DEL += res[0]
            users_INS += res[1]
            NDCG += res[2]
            POS_at_5 += res[3]
            POS_at_10 += res[4]
            POS_at_20 += res[5]
        

    a = i+1
    
    file_mode = 'a' if os.path.exists(new_file_name) else 'w'
    with open(new_file_name, file_mode) as file:
        file.write(f' ============ This stats are for {data_name} dataset ============\n')
        file.write(f' ============ & for the recommender {recommender_name} ============\n')
        file.write(f' ============ {expl_name} ============\n')
        file.write(f"{np.mean(users_DEL)/a}, {np.mean(users_INS)/a}, {np.mean(NDCG)/a}, {np.mean(POS_at_5)/a}, {np.mean(POS_at_10)/a}, {np.mean(POS_at_20)/a}\n")
        file.write(f"\n")

In [None]:
expl_names_list = ['cosine', 'accent', 'lime', 'lire', 'shap', 'deep_shap', 'lxr'] # specify the names of the baselines for which you wish to calculate the metrics values.

In [None]:
for expl_name in expl_names_list:
    eval_one_expl_type(expl_name)