In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

In [None]:
data_path = r"C:\Users\kondu\Downloads\ml-100k\ml-100k\u.data"
movie_path = r"C:\Users\kondu\Downloads\ml-100k\ml-100k\u.item"

In [None]:
columns = ['user_id', 'item_id', 'rating', 'timestamp']
df = pd.read_csv(data_path, sep='\t', names=columns)
movies_df = pd.read_csv(movie_path, sep='|', encoding='latin-1', header=None, usecols=[0, 1], names=['movie_id', 'title'])

In [None]:
df

In [None]:
movies_df

In [None]:
num_users = df['user_id'].max()
num_items = df['item_id'].max()

In [None]:
R = np.zeros((num_users, num_items))
for row in df.itertuples():
    R[row.user_id - 1, row.item_id - 1] = row.rating 

In [None]:
R_tensor = torch.tensor(R, dtype=torch.float32)

In [None]:
class QuaternionMatrixFactorization(nn.Module):
    def __init__(self, num_users, num_items, K):
        super(QuaternionMatrixFactorization, self).__init__()
        self.user_emb = nn.Embedding(num_users, 4 * K)
        self.item_emb = nn.Embedding(num_items, 4 * K)
        self.init_weights()

    def init_weights(self):
        nn.init.xavier_normal_(self.user_emb.weight)
        nn.init.xavier_normal_(self.item_emb.weight)

    def forward(self, user_indices, item_indices, projection_method='radius'):
        user_emb = self.user_emb(user_indices).view(-1, 4, self.user_emb.embedding_dim // 4)
        item_emb = self.item_emb(item_indices).view(-1, 4, self.item_emb.embedding_dim // 4)
        
        a_u, b_u, c_u, d_u = user_emb[:, 0, :], user_emb[:, 1, :], user_emb[:, 2, :], user_emb[:, 3, :]
        a_i, b_i, c_i, d_i = item_emb[:, 0, :], item_emb[:, 1, :], item_emb[:, 2, :], item_emb[:, 3, :]

        a = a_u * a_i - b_u * b_i - c_u * c_i - d_u * d_i
        b = a_u * b_i + b_u * a_i + c_u * d_i - d_u * c_i
        c = a_u * c_i - b_u * d_i + c_u * a_i + d_u * b_i
        d = a_u * d_i + b_u * c_i - c_u * b_i + d_u * a_i

        if projection_method == 'radius':
            return torch.sqrt(torch.sum(a ** 2 + b ** 2 + c ** 2 + d ** 2, dim=1) + 1e-8)
    
        elif projection_method == 'angle':
            vector_norm = torch.sqrt(torch.sum(b ** 2 + c ** 2 + d ** 2, dim=1) + 1e-8)
            scalar_sum = torch.sum(a, dim=1) + 1e-8
            return torch.atan2(vector_norm, scalar_sum)
    
        else:
            raise ValueError("Invalid projection method. Choose 'radius' or 'angle'.")


In [None]:
def train_model(model, R, lr, reg, epochs, projection_method='radius'):
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=reg)
    criterion = nn.MSELoss()
    
    user_indices, item_indices = torch.where(R > 0)
    R_nonzero = R[user_indices, item_indices]

    for epoch in tqdm(range(epochs), desc="Training Progress"):
        model.train()
        optimizer.zero_grad()
        predictions = model(user_indices, item_indices, projection_method)
        loss = criterion(predictions, R_nonzero)
        loss.backward()
        optimizer.step()
        
        rmse = torch.sqrt(loss).item()
        mae = torch.mean(torch.abs(predictions - R_nonzero)).item()
        tqdm.write(f"Epoch {epoch+1}: Loss = {loss:.4f}, RMSE = {rmse:.4f}, MAE = {mae:.4f}")
    
    return rmse, mae

In [None]:
def normalize_ratings(R):
    return (R - R.min()) / (R.max() - R.min())

In [None]:
embedding_sizes_quaternion = [4, 8, 16, 32]  
learning_rate = 0.0005
regularization = 0.001
epochs = 10

In [None]:
results = []
num_users, num_items = 1000, 1000  # Example values
R_tensor = torch.rand((num_users, num_items)) * 5  # Example rating matrix
R_tensor = normalize_ratings(R_tensor)

In [None]:
for K in embedding_sizes_quaternion:
    for projection in ['radius', 'angle']:
        print(f"\nTraining Quaternion Embeddings with K={K} ({projection.capitalize()} Projection)")
        model = QuaternionMatrixFactorization(num_users, num_items, K)
        rmse, mae = train_model(model, R_tensor, learning_rate, regularization, epochs, projection_method=projection)
        results.append(("Quaternion", K, projection, rmse, mae))

In [None]:
print("\n--- Optimized Comparison of Vector Embeddings ---")
print(f"{'Type':<20} {'K':<10} {'Projection':<15} {'RMSE':<10} {'MAE':<10}")
print("=" * 65)
for res in results:
    print(f"{res[0]:<20} {res[1]:<10} {res[2]:<15} {res[3]:<10.4f} {res[4]:<10.4f}")

In [None]:
def predict_full_matrix(model, num_users, num_items, projection_method='radius', device='cpu'):
    """
    Compute the predicted rating matrix for all user-item pairs.
    """
    model.eval()
    with torch.no_grad():
        all_user_idx = torch.arange(num_users, device=device)
        all_item_idx = torch.arange(num_items, device=device)
        predictions = torch.zeros((num_users, num_items), device=device)

        # Compute predictions in batches to save memory
        batch_size = 512
        for i in range(0, num_users, batch_size):
            users_batch = all_user_idx[i:i + batch_size]
            user_batch = users_batch.repeat_interleave(num_items)
            item_batch = all_item_idx.repeat(len(users_batch))

            preds = model(user_batch, item_batch, projection_method)
            predictions[i:i + len(users_batch)] = preds.view(len(users_batch), num_items)

    return predictions.cpu()

In [None]:
def recommend_top_n(pred_matrix, R_original, movies_df, user_id, n=5):
    """
    Recommend top-N movies for a specific user.
    """
    user_idx = user_id - 1  # because indices start at 0
    user_ratings = pred_matrix[user_idx]

    # Mask out movies the user has already rated
    rated_mask = R_original[user_idx] > 0
    user_ratings = user_ratings.clone()
    user_ratings[rated_mask] = -1  # Exclude already rated items

    # Get top N predicted ratings
    top_indices = torch.topk(user_ratings, n).indices
    top_movies = movies_df.iloc[top_indices.cpu().numpy()]

    print(f"\nðŸŽ¥ Top {n} Recommendations for User {user_id}:")
    for rank, (title) in enumerate(top_movies['title'], start=1):
        print(f"{rank}. {title}")


In [None]:
model = QuaternionMatrixFactorization(num_users, num_items, K)
train_model(model, R_tensor, learning_rate, regularization, epochs, projection_method='radius')


In [None]:
# Predict all ratings
pred_matrix = predict_full_matrix(model, num_users, num_items, projection_method='radius')

# Recommend top movies for a sample user
recommend_top_n(pred_matrix, R_tensor, movies_df, user_id=1, n=5)