### nn boop

In [5]:
import torch
from torch import nn
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, TQDMProgressBar
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from torch.multiprocessing import set_start_method

# Initialize multiprocessing method
try:
    set_start_method('spawn')
except RuntimeError:
    pass

In [7]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("Neural_Recommender") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

* Load as PyTorch dataset

In [8]:
class MovieLensDataset(Dataset):
    def __init__(self, ratings_data):
        self.users = torch.LongTensor(ratings_data['user_idx'].values)
        self.movies = torch.LongTensor(ratings_data['movie_idx'].values)
        self.ratings = torch.FloatTensor(ratings_data['rating'].values)
        
    def __len__(self):
        return len(self.ratings)
    
    def __getitem__(self, idx):
        return {
            'user_idx': self.users[idx],
            'movie_idx': self.movies[idx],
            'rating': self.ratings[idx]
        }

* NCF Model

In [11]:
class NCF(pl.LightningModule):
    def __init__(self, num_users, num_items, embedding_dim=64, layers=[256, 128, 64], lr=1e-3):
        super().__init__()
        self.save_hyperparameters()
        
        # GMF part
        self.user_embedding_gmf = nn.Embedding(num_users, embedding_dim)
        self.item_embedding_gmf = nn.Embedding(num_items, embedding_dim)
        
        # MLP part
        self.user_embedding_mlp = nn.Embedding(num_users, embedding_dim)
        self.item_embedding_mlp = nn.Embedding(num_items, embedding_dim)
        
        # MLP layers
        mlp_layers = []
        input_dim = embedding_dim * 2
        for layer_size in layers:
            mlp_layers.extend([
                nn.Linear(input_dim, layer_size),
                nn.ReLU(),
                nn.BatchNorm1d(layer_size),
                nn.Dropout(0.2)
            ])
            input_dim = layer_size
        
        self.mlp_layers = nn.Sequential(*mlp_layers)
        self.final_layer = nn.Linear(layers[-1] + embedding_dim, 1)
        self.lr = lr
        
    def forward(self, user_indices, item_indices):
        # GMF part
        user_embed_gmf = self.user_embedding_gmf(user_indices)
        item_embed_gmf = self.item_embedding_gmf(item_indices)
        gmf_output = user_embed_gmf * item_embed_gmf
        
        # MLP part
        user_embed_mlp = self.user_embedding_mlp(user_indices)
        item_embed_mlp = self.item_embedding_mlp(item_indices)
        mlp_input = torch.cat([user_embed_mlp, item_embed_mlp], dim=-1)
        mlp_output = self.mlp_layers(mlp_input)
        
        combined = torch.cat([gmf_output, mlp_output], dim=-1)
        return self.final_layer(combined).squeeze()
    
    def training_step(self, batch, batch_idx):
        user_indices = batch['user_idx']
        item_indices = batch['movie_idx']
        ratings = batch['rating']
        
        predictions = self(user_indices, item_indices)
        loss = nn.MSELoss()(predictions, ratings)
        
        self.log('train_loss', loss, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        user_indices = batch['user_idx']
        item_indices = batch['movie_idx']
        ratings = batch['rating']
        
        predictions = self(user_indices, item_indices)
        loss = nn.MSELoss()(predictions, ratings)
        
        self.log('val_loss', loss, prog_bar=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        user_indices = batch['user_idx']
        item_indices = batch['movie_idx']
        ratings = batch['rating']
        
        predictions = self(user_indices, item_indices)
        loss = nn.MSELoss()(predictions, ratings)
        
        self.log('test_loss', loss, prog_bar=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.1, patience=5, verbose=True
        )
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_loss'
        }

In [13]:
def prepare_data(sample_size):
    ratings_df = spark.read.csv("ml-32m/ratings.csv", header=True) \
        .withColumn("rating", col("rating").cast("double")) \
        .withColumn("userId", col("userId").cast("integer")) \
        .withColumn("movieId", col("movieId").cast("integer"))
    
    ratings_df = ratings_df.sample(False, fraction=sample_size/ratings_df.count(), seed=42)
    
    ratings_pd = ratings_df.toPandas()
    movies_df = spark.read.csv("ml-32m/movies.csv", header=True).toPandas()
    
    print(f"Total ratings loaded: {len(ratings_pd)}")
    
    user_encoder = LabelEncoder()
    movie_encoder = LabelEncoder()
    
    ratings_pd['user_idx'] = user_encoder.fit_transform(ratings_pd['userId'])
    ratings_pd['movie_idx'] = movie_encoder.fit_transform(ratings_pd['movieId'])
    
    train_data, test_data = train_test_split(ratings_pd, test_size=0.2, random_state=42)
    train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)
    
    return train_data, val_data, test_data, user_encoder, movie_encoder

In [None]:
# Load data
print("Loading and preparing data...")
train_data, val_data, test_data, user_encoder, movie_encoder = prepare_data(sample_size=30000000)

# Create dataloaders
train_loader = DataLoader(
    MovieLensDataset(train_data),
    batch_size=32,
    shuffle=True,
    num_workers=0,
    pin_memory=True
)

val_loader = DataLoader(
    MovieLensDataset(val_data),
    batch_size=32,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

test_loader = DataLoader(
    MovieLensDataset(test_data),
    batch_size=32,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

# Initialize model




In [43]:
model = NCF(
    num_users=len(user_encoder.classes_),
    num_items=len(movie_encoder.classes_),
    embedding_dim=16,
    layers=[32, 24, 16],
    lr=1e-3
)
# Setup trainer
trainer = pl.Trainer(
    max_epochs=1,
    accelerator='gpu',
    devices=1,
    enable_progress_bar=True,
    callbacks=[
        ModelCheckpoint(monitor='val_loss', mode='min'),
        TQDMProgressBar(refresh_rate=1)
    ],
    enable_checkpointing=True,
    logger=False,
    num_sanity_val_steps=0
)

model = model.cuda()
torch.set_float32_matmul_precision('medium')
print(next(model.parameters()).device)  

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


cuda:0


In [45]:
# Train model
print("Training model...")
trainer.fit(model, train_loader, val_loader)

C:\Users\User\anaconda3\Lib\site-packages\pytorch_lightning\callbacks\model_checkpoint.py:654: Checkpoint directory C:\Users\User\big_data_project\checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name               | Type       | Params | Mode 
----------------------------------------------------------
0 | user_embedding_gmf | Embedding  | 3.2 M  | train
1 | item_embedding_gmf | Embedding  | 798 K  | train
2 | user_embedding_mlp | Embedding  | 3.2 M  | train
3 | item_embedding_mlp | Embedding  | 798 K  | train
4 | mlp_layers         | Sequential | 2.4 K  | train
5 | final_layer        | Linear     | 33     | train
----------------------------------------------------------
8.0 M     Trainable params
0         Non-trainable params
8.0 M     Total params
32.024    Total estimated model params size (MB)
18        Modules in train mode
0         Modules in eval mode


Training model...


C:\Users\User\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
C:\Users\User\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


In [63]:
class MetricsCallback(pl.Callback):
    def __init__(self, user_encoder, movie_encoder, k=1, batch_size=32):
        super().__init__()
        self.user_encoder = user_encoder
        self.movie_encoder = movie_encoder
        self.k = k
        self.batch_size = batch_size
        self.reset_metrics()
        
    def reset_metrics(self):
        self.total_rmse = 0.0
        self.total_mae = 0.0
        self.hit_ratio = 0.0
        self.ndcg = 0.0
        self.predictions = []
        self.actuals = []
        self.n_users = 0
        self.n_batches = 0
    
    def on_test_batch_end(self, trainer, pl_module, outputs, batch, batch_idx):
        device = next(pl_module.parameters()).device
        
        if len(batch['user_idx']) == 0:
            return
            
        user_indices = batch['user_idx'].to(device)
        item_indices = batch['movie_idx'].to(device)
        ratings = batch['rating'].to(device)
        
        try:
            pred = pl_module(user_indices, item_indices)
            self.predictions.extend(pred.cpu().numpy())
            self.actuals.extend(ratings.cpu().numpy())
            
            if len(pred) > 0:
                self.total_rmse += torch.sqrt(nn.MSELoss()(pred, ratings)).item()
                self.total_mae += torch.abs(pred - ratings).mean().item()
                self.n_batches += 1
            
            unique_users = torch.unique(user_indices)
            self.n_users += len(unique_users)
            
            # Batch recommendations
            all_items = torch.arange(len(self.movie_encoder.classes_)).to(device)
            
            for i in range(0, len(unique_users), self.batch_size):
                user_batch = unique_users[i:i + self.batch_size]
                user_tensor = user_batch.repeat_interleave(len(all_items))
                item_tensor = all_items.repeat(len(user_batch))
                
                all_predictions = pl_module(user_tensor, item_tensor).reshape(len(user_batch), -1)
                _, indices = torch.topk(all_predictions, k=self.k, dim=1)
                recommended_items = all_items[indices]
                
                for j, user_idx in enumerate(user_batch):
                    user_mask = user_indices == user_idx
                    actual_items = item_indices[user_mask]
                    actual_ratings = ratings[user_mask]
                    
                    if len(actual_items) == 0:
                        continue
                    
                    positive_items = actual_items[actual_ratings >= 4.0]
                    if len(positive_items) > 0:
                        hit = torch.any(torch.isin(recommended_items[j], positive_items)).item()
                        self.hit_ratio += hit
                    
                    relevance = torch.zeros_like(recommended_items[j], dtype=torch.float, device=device)
                    for k_idx, item in enumerate(recommended_items[j]):
                        if item in actual_items:
                            rating_idx = (actual_items == item).nonzero().item()
                            if actual_ratings[rating_idx] >= 4.0:
                                relevance[k_idx] = 1.0
                    
                    log2_range = torch.arange(2, len(relevance) + 2, dtype=torch.float, device=device)
                    dcg = torch.sum(relevance / torch.log2(log2_range))
                    ideal_relevance = torch.sort(relevance, descending=True)[0]
                    idcg = torch.sum(ideal_relevance / torch.log2(log2_range))
                    
                    if idcg > 0:
                        self.ndcg += (dcg / idcg).item()
        
        except Exception as e:
            print(f"Error processing batch: {e}")
    
    def on_test_epoch_end(self, trainer, pl_module):
        metrics = {}
        metrics['RMSE'] = self.total_rmse / self.n_batches if self.n_batches > 0 else float('nan')
        metrics['MAE'] = self.total_mae / self.n_batches if self.n_batches > 0 else float('nan')
        metrics['HR@50'] = self.hit_ratio / self.n_users if self.n_users > 0 else float('nan')
        metrics['NDCG@50'] = self.ndcg / self.n_users if self.n_users > 0 else float('nan')
        metrics['Coverage'] = len(set(self.predictions)) / len(self.movie_encoder.classes_) if len(self.movie_encoder.classes_) > 0 else float('nan')
        
        pl_module.log_dict(metrics)
        self.print_metrics(metrics)
        self.reset_metrics()
        return metrics
    
    def print_metrics(self, metrics):
        print("\nTest Metrics:")
        print(f"RMSE: {metrics['RMSE']:.4f}")
        print(f"MAE: {metrics['MAE']:.4f}")
        print(f"Hit Ratio@50: {metrics['HR@50']:.4f}")
        print(f"NDCG@50: {metrics['NDCG@50']:.4f}")
        print(f"Coverage: {metrics['Coverage']:.4f}")

def evaluate_model(model, test_loader, user_encoder, movie_encoder, k=1):
    trainer = pl.Trainer(
        accelerator='gpu' if torch.cuda.is_available() else 'cpu',
        devices=1,
        enable_progress_bar=True,
        callbacks=[MetricsCallback(user_encoder, movie_encoder, k=k)],
        logger=False
    )
    trainer.test(model, test_loader)

def get_top_k_recommendations(model, user_id, user_encoder, movie_encoder, k=1):
    model.eval()
    device = next(model.parameters()).device
    
    with torch.no_grad():
        user_idx = torch.tensor(user_encoder.transform([user_id])[0]).to(device)
        all_items = torch.arange(len(movie_encoder.classes_)).to(device)
        user_tensor = torch.full_like(all_items, user_idx)
        
        predictions = model(user_tensor, all_items)
        _, indices = torch.topk(predictions, k=k)
        recommended_items = all_items[indices].cpu().numpy()
        predicted_ratings = predictions[indices].cpu().numpy()
        
        return list(zip(movie_encoder.inverse_transform(recommended_items), predicted_ratings))

In [65]:
evaluate_model(model, test_loader, user_encoder, movie_encoder, k=1)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
C:\Users\User\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]


Test Metrics:
RMSE: 0.9158
MAE: 0.7099
Hit Ratio@50: 0.0028
NDCG@50: 0.0028
Coverage: 18.2860


### Save and Load

In [67]:
import os
import pickle
import torch
import pytorch_lightning as pl
from pathlib import Path

def save_model(model, user_encoder, movie_encoder, save_dir="savedt_model"):
    """
    Save the model, encoders, and metadata to disk.
    
    Args:
        model (NCF): The trained model
        user_encoder (LabelEncoder): The fitted user encoder
        movie_encoder (LabelEncoder): The fitted movie encoder
        save_dir (str): Directory to save the model files
    """
    save_dir = Path(save_dir)
    os.makedirs(save_dir, exist_ok=True)
    
    # Save model state dict directly
    torch.save(model.state_dict(), save_dir / "model.pt")
    
    # Save model hyperparameters
    torch.save({
        'num_users': len(user_encoder.classes_),
        'num_items': len(movie_encoder.classes_),
        'embedding_dim': model.hparams.embedding_dim,
        'layers': model.hparams.layers,
        'lr': model.hparams.lr
    }, save_dir / "model_config.pt")
    
    # Save encoders
    with open(save_dir / "encoders.pkl", "wb") as f:
        pickle.dump({
            'user_encoder': user_encoder,
            'movie_encoder': movie_encoder
        }, f)
    
    print(f"Model and encoders saved to {save_dir}")

def load_model(load_dir="saved_model"):
    """
    Load the model, encoders, and metadata from disk.
    
    Args:
        load_dir (str): Directory containing the saved model files
    
    Returns:
        tuple: (loaded_model, user_encoder, movie_encoder)
    """
    load_dir = Path(load_dir)
    
    # Load model configuration
    config = torch.load(load_dir / "model_config.pt")
    
    # Initialize model with saved configuration
    model = NCF(
        num_users=config['num_users'],
        num_items=config['num_items'],
        embedding_dim=config['embedding_dim'],
        layers=config['layers'],
        lr=config['lr']
    )
    
    # Load model state dictionary
    state_dict = torch.load(load_dir / "model.pt")
    model.load_state_dict(state_dict)
    
    # Load encoders
    with open(load_dir / "encoders.pkl", "rb") as f:
        encoders = pickle.load(f)
        user_encoder = encoders['user_encoder']
        movie_encoder = encoders['movie_encoder']
    
    print(f"Model and encoders loaded from {load_dir}")
    return model, user_encoder, movie_encoder

def save_checkpoint(model, user_encoder, movie_encoder, epoch, metrics, optimizer, save_dir="checkpoints"):
    """
    Save a training checkpoint with model state and metrics.
    
    Args:
        model (NCF): The model being trained
        user_encoder (LabelEncoder): The user encoder
        movie_encoder (LabelEncoder): The movie encoder
        epoch (int): Current epoch number
        metrics (dict): Dictionary of current metrics
        optimizer (torch.optim.Optimizer): The optimizer
        save_dir (str): Directory to save checkpoints
    """
    save_dir = Path(save_dir)
    os.makedirs(save_dir, exist_ok=True)
    
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict() if optimizer is not None else None,
        'metrics': metrics,
        'model_hparams': {
            'num_users': len(user_encoder.classes_),
            'num_items': len(movie_encoder.classes_),
            'embedding_dim': model.hparams.embedding_dim,
            'layers': model.hparams.layers,
            'lr': model.hparams.lr
        }
    }
    
    # Save encoders separately to avoid potential pickle/torch.save conflicts
    with open(save_dir / f"checkpoint_epoch_{epoch}_encoders.pkl", "wb") as f:
        pickle.dump({
            'user_encoder': user_encoder,
            'movie_encoder': movie_encoder
        }, f)
    
    # Save model checkpoint
    checkpoint_path = save_dir / f"checkpoint_epoch_{epoch}.pt"
    torch.save(checkpoint, checkpoint_path)
    print(f"Checkpoint saved: {checkpoint_path}")

def load_checkpoint(checkpoint_path, model=None, optimizer=None):
    """
    Load a training checkpoint.
    
    Args:
        checkpoint_path (str): Path to the checkpoint file
        model (NCF, optional): The model to load the state into. If None, a new model will be created.
        optimizer (torch.optim.Optimizer, optional): The optimizer to load the state into
    
    Returns:
        tuple: (model, epoch, metrics, user_encoder, movie_encoder)
    """
    checkpoint_path = Path(checkpoint_path)
    checkpoint = torch.load(checkpoint_path)
    
    # Load or create model
    if model is None:
        model = NCF(**checkpoint['model_hparams'])
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Load optimizer state if provided
    if optimizer is not None and checkpoint['optimizer_state_dict'] is not None:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    
    # Load encoders
    with open(checkpoint_path.parent / f"{checkpoint_path.stem}_encoders.pkl", "rb") as f:
        encoders = pickle.load(f)
        user_encoder = encoders['user_encoder']
        movie_encoder = encoders['movie_encoder']
    
    print(f"Checkpoint loaded: {checkpoint_path}")
    return model, checkpoint['epoch'], checkpoint['metrics'], user_encoder, movie_encoder


In [69]:
save_model(model, user_encoder, movie_encoder, save_dir="saved_model")

# Later, to load the model
model, user_encoder, movie_encoder = load_model(load_dir="saved_model")

Model and encoders saved to saved_model
Model and encoders loaded from saved_model


  config = torch.load(load_dir / "model_config.pt")
  state_dict = torch.load(load_dir / "model.pt")
