In [8]:
try:
    spark.stop()
except:
    pass

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

spark = SparkSession.builder \
    .appName("APP_2") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True)
])
# Read ratings.csv
df = spark.read.csv("ratings.csv", header=True, schema = schema).cache()

In [None]:
print("Rows:", df.count())
print("Unique users:", df.select("userId").distinct().count())
print("Unique Movies:", df.select("movieId").distinct().count())

In [None]:
# Select first n unique users
unique_users = df.select("userId").distinct().limit(1000)

# Keep only rows from these users
df = df.join(unique_users, on="userId", how="inner")

df.show(5)
print("Rows:", df.count())
print("Unique users:", df.select("userId").distinct().count())
print("Unique Movies:", df.select("movieId").distinct().count())

In [None]:
n_users = df.select("userId").distinct().count()
n_movies = df.select("movieId").distinct().count()
n_ratings = df.count()

sparsity = (n_ratings / (n_users * n_movies)) * 100
print(f"Sparsity: {sparsity:.4f}%")

In [None]:
from pyspark.sql.functions import col, round as spark_round
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array

user_indexer = StringIndexer(inputCol="userId", outputCol="user")
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movie")
assembler = VectorAssembler(inputCols=["rating"], outputCol="rating_vec")
scaler = MinMaxScaler(inputCol="rating_vec", outputCol="rating_scaled")

pipeline = Pipeline(stages=[user_indexer, movie_indexer, assembler, scaler])

pipeline_model = pipeline.fit(df)
scaled_df = pipeline_model.transform(df)

# Flatten vector -> float, then round to 1 decimal
scaled_df = scaled_df.withColumn(
    "rating_scaled",
    spark_round(vector_to_array(col("rating_scaled"))[0], 1)
)

# Keep only necessary columns
df_final = scaled_df.select("user", "movie", "rating_scaled")
df_final = df_final.cache()
df_final.show(5)

In [6]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
import torch

# 80% train, 20% test
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)

#Converting PySpark DF to Pandas to Tensors
train_pd = train_df.toPandas()
test_pd = test_df.toPandas()

X_train = torch.tensor(train_pd[["user", "movie"]].values)
y_train = torch.tensor(train_pd["rating_scaled"].values)

X_test = torch.tensor(test_pd[["user", "movie"]].values)
y_test = torch.tensor(test_pd["rating_scaled"].values)

print("Train count:", X_train.shape[0])
print("Train Label:",y_train.shape[0])
print("Test count:", X_test.shape[0])
print("Test Label:",y_test.shape[0])

In [8]:
from torch.utils.data import TensorDataset, DataLoader

# Create datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
#print(train_dataset[2200])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

In [9]:
import torch.nn as nn
import torch.nn.functional as F

class CollabFiltering(nn.Module):
    def __init__(self, n_users, n_movies, emb_dim, hidden, dropout):
        super().__init__()
        self.user_emb = nn.Embedding(n_users, emb_dim)   
        self.movie_emb = nn.Embedding(n_movies, emb_dim)  
        self.dropout_emb = 0.4

        self.mlp = nn.Sequential(
            nn.Linear(emb_dim * 2, hidden), 
            nn.BatchNorm1d(hidden),  
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, 1),  
            nn.Sigmoid()  
        )

    def forward(self,user,movie):
        u = F.dropout(self.user_emb(user), p=self.dropout_emb, training=self.training)
        m = F.dropout(self.movie_emb(movie), p=self.dropout_emb, training=self.training)
        
        # Concatenate embeddings (instead of dot product)
        x = torch.cat([u, m], dim=1)
        
        # Pass through MLP
        return self.mlp(x).squeeze()


In [10]:
loss_fn = nn.MSELoss()

In [11]:
def adam(parameter, m_state, v_state, t, lr=0.1, beta1=0.9, beta2=0.999, epsilon=1e-8):
    # Loop over Tensors in model.parameters()
    for p in parameter:
        if p.grad is None:
            continue

        # First Time seeing this parameter
        if p not in m_state:
            m_state[p] = torch.zeros_like(p)
            v_state[p] = torch.zeros_like(p)

        g = p.grad
        # First moment
        m_state[p] = beta1 * m_state[p] + (1 - beta1) * g
        # Second moment
        v_state[p] = beta2 * v_state[p] + (1 - beta2) * (g * g)

        # Bias correction
        m_corrected = m_state[p] / (1 - beta1 ** t)
        v_corrected = v_state[p] / (1 - beta2 ** t)

        # Parameter update (in-place)
        p.data -= lr * m_corrected / (torch.sqrt(v_corrected) + epsilon)

    return m_state, v_state


In [None]:
import copy

all_epoch = []

epochs = 4
n_models = 5
n_users_global = df_final.select("user").distinct().count()
n_movies_global = df_final.select("movie").distinct().count()

for i in range(1, n_models+1):
    model = CollabFiltering(n_users_global, n_movies_global, emb_dim=32, hidden=32, dropout=0.03)
    m_state, v_state, t = {}, {}, 0

    best_rmse = float("inf")
    best_epoch = -1
    best_state = None

    train_losses_epoch, test_losses_epoch = [], []
    train_rmses_epoch, test_rmses_epoch = [], []
    epoch_states = []

    for epoch in range(epochs):
        # ---------- Training ----------
        model.train()
        total_loss, total_sq_error, total_samples = 0,0,0
        for X_batch, y_batch in train_loader:
            preds = model(X_batch[:,0].long(), X_batch[:,1].long()).squeeze()
            loss = loss_fn(preds, y_batch.float())
            model.zero_grad()
            loss.backward()
            t += 1
            m_state, v_state = adam(model.parameters(), m_state, v_state, t)
            total_loss += loss.item()
            total_sq_error += torch.sum((preds - y_batch)**2).item()
            total_samples += len(y_batch)
        train_loss = total_loss / len(train_loader)
        train_rmse = (total_sq_error / total_samples)**0.5

        # ---------- Testing ----------
        model.eval()
        total_loss, total_sq_error, total_samples = 0,0,0
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                preds = model(X_batch[:,0].long(), X_batch[:,1].long()).squeeze()
                loss = loss_fn(preds, y_batch.float())
                total_loss += loss.item()
                total_sq_error += torch.sum((preds - y_batch)**2).item()
                total_samples += len(y_batch)
        test_loss = total_loss / len(test_loader)
        test_rmse = (total_sq_error / total_samples)**0.5

        # ---------- Print per-epoch summary ----------
        print(f"Model {i} | Epoch {epoch+1}/{epochs}")
        print(f"Train Loss: {train_loss:.4f}, Train RMSE: {train_rmse:.4f}")
        print(f"Test  Loss: {test_loss:.4f}, Test  RMSE: {test_rmse:.4f}\n")

        # ---------- Save per-epoch stats ----------
        train_losses_epoch.append(train_loss)
        test_losses_epoch.append(test_loss)
        train_rmses_epoch.append(train_rmse)
        test_rmses_epoch.append(test_rmse)
        epoch_states.append(copy.deepcopy(model.state_dict()))

        if test_rmse < best_rmse:
            best_rmse = test_rmse
            best_epoch = epoch+1
            best_state = copy.deepcopy(model.state_dict())

    # ---------- Save model stats ----------
    all_epoch.append({
        "id": i,
        "train_losses": train_losses_epoch,
        "test_losses": test_losses_epoch,
        "train_rmses": train_rmses_epoch,
        "test_rmses": test_rmses_epoch,
        "epoch_states": epoch_states,
        "best_epoch": best_epoch,
        "best_rmse": best_rmse
    })
    model.load_state_dict(best_state)


Model 1 | Epoch 1/4
Train Loss: 0.0490, Train RMSE: 0.2214
Test  Loss: 0.0439, Test  RMSE: 0.2094

Model 1 | Epoch 2/4
Train Loss: 0.0443, Train RMSE: 0.2106
Test  Loss: 0.0413, Test  RMSE: 0.2030

Model 1 | Epoch 3/4
Train Loss: 0.0428, Train RMSE: 0.2069
Test  Loss: 0.0426, Test  RMSE: 0.2065

Model 1 | Epoch 4/4
Train Loss: 0.0422, Train RMSE: 0.2054
Test  Loss: 0.0411, Test  RMSE: 0.2026

Model 2 | Epoch 1/4
Train Loss: 0.0507, Train RMSE: 0.2251
Test  Loss: 0.0469, Test  RMSE: 0.2164

Model 2 | Epoch 2/4
Train Loss: 0.0465, Train RMSE: 0.2156
Test  Loss: 0.0464, Test  RMSE: 0.2155

Model 2 | Epoch 3/4
Train Loss: 0.0454, Train RMSE: 0.2132
Test  Loss: 0.0439, Test  RMSE: 0.2095

Model 2 | Epoch 4/4
Train Loss: 0.0447, Train RMSE: 0.2115
Test  Loss: 0.0435, Test  RMSE: 0.2085

Model 3 | Epoch 1/4
Train Loss: 0.0508, Train RMSE: 0.2254
Test  Loss: 0.0450, Test  RMSE: 0.2120

Model 3 | Epoch 2/4
Train Loss: 0.0475, Train RMSE: 0.2179
Test  Loss: 0.0487, Test  RMSE: 0.2207

Model 3 | 

In [17]:
import utils

# Save the Base approach results
utils.save_results("results/app2_results.pkl", all_epoch)


In [None]:
import matplotlib.pyplot as plt

# Plot RMSE curves for each model
for result in all_result:
    model_id = result["model"]
    train_rmses = result["train_rmses"]
    test_rmses = result["test_rmses"]

    plt.figure(figsize=(8, 5))
    plt.plot(train_rmses, label="Train RMSE")
    plt.plot(test_rmses, label="Test RMSE")
    plt.xlabel("Epoch")
    plt.ylabel("RMSE")
    plt.title(f"RMSE Curve for Model {model_id} (Best Epoch {result['best_epoch']}, RMSE={result['best_rmse']:.4f})")
    plt.legend()
    plt.grid(True)
    plt.show()
