In [None]:
# Imports and configuration
import os
from pathlib import Path
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split

from recsysNN_utils import load_data

np.set_printoptions(precision=3, suppress=True)
tf.random.set_seed(1)



In [None]:
# Load prepared matrices (uses MovieLens if found, else synthetic)
item_train, user_train, y_train, item_features, user_features, item_vecs, movie_dict, user_to_genre = load_data()

num_user_features = user_train.shape[1] - 3
num_item_features = item_train.shape[1] - 1
u_s = 3  # number of header cols in user_train
i_s = 1  # number of header cols in item_train

print(f"item_train shape: {item_train.shape}")
print(f"user_train shape: {user_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"num_user_features: {num_user_features}, num_item_features: {num_item_features}")


In [None]:
# Scale features and split train/test
item_scaler = StandardScaler()
user_scaler = StandardScaler()

y_scaler = MinMaxScaler(feature_range=(-1, 1))

item_scaled = item_scaler.fit_transform(item_train)
user_scaled = user_scaler.fit_transform(user_train)
y_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1))

X_item_tr, X_item_te = train_test_split(item_scaled, train_size=0.8, shuffle=True, random_state=1)
X_user_tr, X_user_te = train_test_split(user_scaled, train_size=0.8, shuffle=True, random_state=1)
Y_tr, Y_te = train_test_split(y_scaled, train_size=0.8, shuffle=True, random_state=1)

print(X_item_tr.shape, X_user_tr.shape, Y_tr.shape)


In [None]:
# Build two-tower content-based model
num_outputs = 32

user_NN = tf.keras.models.Sequential([
    tf.keras.layers.Input(shape=(num_user_features,)),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(num_outputs, activation='linear'),
])

item_NN = tf.keras.models.Sequential([
    tf.keras.layers.Input(shape=(num_item_features,)),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(num_outputs, activation='linear'),
])

input_user = tf.keras.layers.Input(shape=(user_NN.input_shape[1],))
vu = user_NN(input_user)
vu = tf.linalg.l2_normalize(vu, axis=1)

input_item = tf.keras.layers.Input(shape=(item_NN.input_shape[1],))
vm = item_NN(input_item)
vm = tf.linalg.l2_normalize(vm, axis=1)

output = tf.keras.layers.Dot(axes=1)([vu, vm])
model = tf.keras.Model([input_user, input_item], output)

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss=tf.keras.losses.MeanSquaredError())

history = model.fit([X_user_tr[:, u_s:], X_item_tr[:, i_s:]], Y_tr, epochs=15, batch_size=1024,
                    validation_data=([X_user_te[:, u_s:], X_item_te[:, i_s:]], Y_te), verbose=1)

print("Final val MSE:", float(model.evaluate([X_user_te[:, u_s:], X_item_te[:, i_s:]], Y_te, verbose=0)))


In [None]:
# Inference utilities

def predict_user_item_scores(user_feature_row: np.ndarray, item_matrix: np.ndarray) -> np.ndarray:
    """Predict scores for one user row vs many item rows (expects scaled inputs)."""
    user_emb = user_NN(user_feature_row[None, :])
    user_emb = tf.linalg.l2_normalize(user_emb, axis=1)

    item_emb = item_NN(item_matrix)
    item_emb = tf.linalg.l2_normalize(item_emb, axis=1)

    scores = tf.linalg.matvec(item_emb, tf.squeeze(user_emb, axis=0))
    return scores.numpy()


def recommend_top_k_for_user(user_row_scaled: np.ndarray, item_scaled: np.ndarray, k: int = 10) -> np.ndarray:
    scores = predict_user_item_scores(user_row_scaled[u_s:], item_scaled[:, i_s:])
    top_k_idx = np.argsort(scores)[-k:][::-1]
    return top_k_idx


def squared_distance(a: np.ndarray, b: np.ndarray) -> float:
    return float(np.sum((a - b) ** 2))



In [None]:
# Example: recommend for an existing user
user_idx = 0
user_row_scaled = X_user_te[user_idx]

top_idx = recommend_top_k_for_user(user_row_scaled, X_item_te, k=10)
movie_ids = item_train[:, 0].astype(int)[top_idx]  # map back via same order assumption

titles = [movie_dict.get("id_to_title", {}).get(int(mid), f"Movie {int(mid)}") for mid in movie_ids]
list(zip(movie_ids, titles))[:10]
