In [1]:
import numpy as np
import math
from math import prod
from decimal import Decimal, getcontext
getcontext().prec = 13

---


# Functions


## Compute User & item prior

In [2]:
def compute_priors(ratings,plausible_rating, alpha=0.01, R=8):
    num_users = len(ratings)
    num_items = len(ratings[0])
    # rating_values = list(range(1, R + 1))
    rating_values = plausible_rating

    prior_userbased = [[0 for _ in range(num_items)] for _ in rating_values]
    prior_itembased = [[0 for _ in range(num_users)] for _ in rating_values]

    y_index = 0
    for y in rating_values:
        y_index = y_index

        # Prior user-based (per item j)
        for j in range(num_items):
            count_y = 0
            count_nonzero = 0
            for u in range(num_users):
                r = ratings[u][j]   
                if r != 0:
                    count_nonzero += 1
                    if r == y:
                        count_y += 1
            prior_userbased[y_index][j] = (count_y + alpha) / (count_nonzero + alpha * R)

        # Prior item-based (per user u)
        for u in range(num_users):
            count_y = 0
            count_nonzero = 0
            for j in range(num_items):
                r = ratings[u][j]
                if r != 0:
                    count_nonzero += 1
                    if r == y:
                        count_y += 1
            prior_itembased[y_index][u] = (count_y + alpha) / (count_nonzero + alpha * R)
        y_index = y_index + 1

    return prior_userbased, prior_itembased

## Compute likelihood User

In [3]:
def compute_likelihood_userbased(ratings, u, i, y, alpha=0.01, R=8):
    num_users = len(ratings)
    num_items = len(ratings[0])
    Iu = [j for j in range(num_items) if j != i and ratings[u][j] != 0]
    # prob_list = []
    product = Decimal(1.0)

    for j in Iu:
        k = ratings[u][j]
        count_joint = 0
        count_cond = 0
        for v in range(num_users):
            if ratings[v][i] == y:
                if ratings[v][j] != 0:
                    count_cond += 1
                    if ratings[v][j] == k:
                        count_joint += 1
        prob = (count_joint + alpha) / (count_cond + alpha * R)
        product *= Decimal(prob)
        # prob_list.append(prob)
    # print("======")
    # print(product, end="\n\n")

    return product


## Compute likelihood Item

In [4]:
def compute_likelihood_itembased(ratings, u, i, y, alpha=0.01, R=8):
    num_users = len(ratings)
    num_items = len(ratings[0])
    Ui = [v for v in range(num_users) if v != u and ratings[v][i] != 0]
    # prob_list = []
    product = Decimal(1.0)

    for v in Ui:
        k = ratings[v][i]
        count_joint = 0
        count_cond = 0
        for j in range(num_items):
            if ratings[u][j] == y:
                if ratings[v][j] != 0:
                    count_cond += 1
                    if ratings[v][j] == k:
                        count_joint += 1
        prob = (count_joint + alpha) / (count_cond + alpha * R)
        product *= Decimal(prob)
        # print(prob)
        # print("======")
        # prob_list.append(prob)

    return product


# Predict Functions

In [5]:
def predict_rating(ratings, u, i, prior_userbased, prior_itembased,plausible_rating, alpha=0.01, mode='hybrid'):
    scores = []
    all_likelihood_user = []  
    all_likelihood_item = []
    all_combined = []
    R = len(plausible_rating)  

    y_index = 0
    for y in plausible_rating:
        # print(y_index)
        prior_user = prior_userbased[y_index][i]
        prior_item = prior_itembased[y_index][u]

        likelihood_user = compute_likelihood_userbased(ratings, u, i, y, alpha, R)
        likelihood_item = compute_likelihood_itembased(ratings, u, i, y, alpha, R)
        # print(likelihood_item)

        all_likelihood_user.append(likelihood_user)
        all_likelihood_item.append(likelihood_item)

        if mode == 'user':
            score = Decimal(prior_user) * likelihood_user
        elif mode == 'item':
            score = Decimal(prior_item) * likelihood_item
        else:  # hybrid
            len_Iu = sum(1 for j in range(len(ratings[0])) if ratings[u][j] != 0 and j != i)
            len_Ui = sum(1 for v in range(len(ratings)) if v != u and ratings[v][i] != 0)

            score_user = (Decimal(prior_user) * likelihood_user) ** Decimal(1 / (1 + len_Iu)) if len_Iu > 0 else 0
            score_item = (Decimal(prior_user) * likelihood_user) ** Decimal(1 / (1 + len_Ui)) if len_Ui > 0 else 0

            score = score_user * score_item

        scores.append(score)
        all_combined.append(score)
        y_index += 1

    # predicted_rating = scores.index(max(scores)) + 1
    predicted_rating = plausible_rating[scores.index(max(scores))]

    return predicted_rating, {
        'scores': scores,
        'likelihood_user': all_likelihood_user,
        'likelihood_item': all_likelihood_item,
        'combined_score': all_combined
    }


## Load Data Full

Untuk justifikasi tain test split

In [6]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

In [7]:

def load_filmtrust_full(path):
    df = pd.read_csv(path, sep=' ', engine='python', names=['user', 'item', 'rating'])

    num_users = df['user'].nunique()
    num_items = df['item'].nunique()

    user_map = {uid: idx for idx, uid in enumerate(df['user'].unique())}
    item_map = {iid: idx for idx, iid in enumerate(df['item'].unique())}

    ratings = np.zeros((num_users, num_items))
    for _, row in df.iterrows():
        u = user_map[row['user']]
        i = item_map[row['item']]
        ratings[u][i] = row['rating']

    return ratings

ratings_full = load_filmtrust_full("./film-trust/ratings.txt")


In [8]:
ratings_full

array([[2. , 4. , 3.5, ..., 0. , 0. , 0. ],
       [0. , 0. , 0. , ..., 0. , 0. , 0. ],
       [0. , 0. , 0. , ..., 0. , 0. , 0. ],
       ...,
       [0. , 0. , 0. , ..., 0. , 0. , 0. ],
       [0. , 4. , 0. , ..., 0. , 0. , 0. ],
       [1.5, 3. , 2. , ..., 0. , 0. , 0. ]], shape=(1508, 2071))

# Train Test Split


In [9]:


# # Load the original ratings.txt file
# df = pd.read_csv('./film-trust/ratings.txt', sep=' ', names=['user', 'item', 'rating'])

# print(f"Dataset loaded: {len(df)} ratings")
# print(f"Users: {df['user'].nunique()}")
# print(f"Items: {df['item'].nunique()}")
# print(f"Rating range: {df['rating'].min()} - {df['rating'].max()}")

# # Split into train (80%) and test (20%)
# train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# print(f"\nTrain set: {len(train_df)} ratings ({len(train_df)/len(df)*100:.1f}%)")
# print(f"Test set: {len(test_df)} ratings ({len(test_df)/len(df)*100:.1f}%)")

# # Save train and test sets
# train_df.to_csv('./film-trust/train.txt', sep=' ', header=False, index=False)
# test_df.to_csv('./film-trust/test.txt', sep=' ', header=False, index=False)

# print("\nFiles saved:")
# print("- train.txt")
# print("- test.txt")

## Get Plausible Ratings

In [10]:
temp_df = pd.read_csv("./film-trust/ratings.txt", sep=' ', engine='python', names=['rating'])
plausible_rating = temp_df['rating'].unique()
plausible_rating.sort()
plausible_rating

array([0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. ])

In [11]:
pR = plausible_rating
num_r = len(pR)
num_r

8

## Load train and test data

In [12]:

def load_filmtrust_train_make_matrix(path, full_ratings):
    df = pd.read_csv(path, sep=' ', engine='python', names=['user', 'item', 'rating'])
    df_full = pd.read_csv(full_ratings, sep=' ', engine='python', names=['user', 'item', 'rating'])

    num_users = df_full['user'].nunique()
    num_items = df_full['item'].nunique()

    user_map = {uid: idx for idx, uid in enumerate(df['user'].unique())}
    item_map = {iid: idx for idx, iid in enumerate(df['item'].unique())}

    ratings = np.zeros((num_users, num_items))
    for _, row in df.iterrows():
        u = user_map[row['user']]
        i = item_map[row['item']]
        ratings[u][i] = row['rating']

    return ratings, user_map, item_map

In [13]:
ratings_train, user_map, item_map = load_filmtrust_train_make_matrix("./film-trust/train.txt","./film-trust/ratings.txt")

In [14]:
ratings_train

array([[4. , 0. , 3. , ..., 0. , 0. , 0. ],
       [0. , 4. , 0. , ..., 0. , 0. , 0. ],
       [1. , 0. , 2.5, ..., 0. , 0. , 0. ],
       ...,
       [0. , 0. , 0. , ..., 0. , 0. , 0. ],
       [0. , 0. , 0. , ..., 0. , 0. , 0. ],
       [0. , 0. , 0. , ..., 0. , 0. , 0. ]], shape=(1508, 2071))

In [15]:

def load_test_ratings(path):
    data = np.loadtxt(path, dtype={'names': ('u', 'i', 'r'), 'formats': (int, int, float)})
    test = np.array([[int(row[0])-1, int(row[1])-1, float(row[2])] for row in data])

    return test

In [16]:
test_set = load_test_ratings("./film-trust/test.txt")

In [17]:
test_set

array([[ 470. ,    5. ,    3.5],
       [1033. ,  902. ,    3. ],
       [1248. ,  256. ,    3. ],
       ...,
       [1082. ,  256. ,    2. ],
       [ 570. ,   11. ,    4. ],
       [ 525. ,    3. ,    3. ]], shape=(7100, 3))

In [None]:
# y_index = 0
# all_likelihood_user = []
# all_likelihood_item = []
# for y in pR:
#     likelihood_user = compute_likelihood_userbased(ratings_train, 0, 10, y, 0.01, 8)
#     likelihood_item = compute_likelihood_itembased(ratings_train, 0, 10, y, 0.01, 8)
#     all_likelihood_user.append(likelihood_user)
#     all_likelihood_item.append(likelihood_item)
#     y_index = y_index + 1


## Precompute User & Item Prior

In [None]:

prior_userbased, prior_itembased = compute_priors(ratings_train,pR)

In [None]:
prior_userbased

In [None]:
prior_itembased

In [None]:
# pred, _ = predict_rating(ratings_train, 0, 7, prior_userbased, prior_itembased,plausible_rating= pR, mode='hybrid')
# _
# pred

## Loop prediction for each data test 

In [None]:


# Prediksi dan evaluasi
from tqdm import tqdm

y_true = []
y_pred = []

for u, i, actual in tqdm(test_set):
    pred, _ = predict_rating(ratings_train, u, i, prior_userbased, prior_itembased,plausible_rating= pR, mode='hybrid')
    y_true.append(actual)
    y_pred.append(pred)

In [None]:
from sklearn.metrics import mean_absolute_error

mae = mean_absolute_error(y_true, y_pred)
print("MAE :", mae)


## Export to evaluation

In [None]:
import pandas as pd

df_results = pd.DataFrame({'y_true': y_true, 'y_pred': y_pred})
df_results.to_csv('predictions.csv', index=False)

In [None]:
df_test = pd.DataFrame(test_set)
df_test.to_csv('test_set.csv', index=False)

In [None]:

df_full = pd.DataFrame(ratings_full)
df_full.to_csv('ratings_full.csv', index=False)

df_train = pd.DataFrame(ratings_train)
df_train.to_csv('ratings_train.csv', index=False)

df_test = pd.DataFrame(test_set)
df_test.to_csv('test_set.csv', index=False)



In [None]:
df_prior_userbased = pd.DataFrame(prior_userbased)
df_prior_userbased.to_csv('prior_userbased.csv', index=False)

df_prior_itembased = pd.DataFrame(prior_itembased)
df_prior_itembased.to_csv('prior_itembased.csv', index=False)

