In [1]:
# Standard library imports
import datetime
import os
from glob import glob
from typing import Any, Callable, Dict, List, Optional, Set, Tuple

# Third party imports
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F

from scipy.sparse import coo_matrix, csr_matrix, vstack
from torch.utils.data import DataLoader, Dataset
from pytorch_lightning.tuner.tuning import Tuner

In [2]:
# Load data
customers = pd.read_pickle("Data/customers.pkl")
articles = pd.read_pickle("Data/articles.pkl")
transactions = pd.read_pickle("Data/transactions.pkl")

In [3]:
print("All Transactions Date Range: {} to {}".format(transactions['t_dat'].min(), transactions['t_dat'].max()))

transactions["t_dat"] = pd.to_datetime(transactions["t_dat"])

train = transactions.loc[(transactions["t_dat"] >= transactions['t_dat'].min()) & (transactions['t_dat'] < datetime.datetime(2020,9,16))]
val = transactions.loc[transactions["t_dat"] >= datetime.datetime(2020,9,16)]

All Transactions Date Range: 2018-09-20 00:00:00 to 2020-09-22 00:00:00


In [4]:
def create_sparse_matrix_coo(
        transactions_df: pd.DataFrame,
        customers_df: pd.DataFrame,
        articles_df: pd.DataFrame
    ) -> Tuple[csr_matrix, Dict[str, int], Dict[str, int]]:
        """
        Create a sparse purchase matrix using COO format.
        Repeated purchases added up.
        """
        customer_index = {id_: i for i, id_ in enumerate(customers_df['customer_id'])}
        article_index = {id_: i for i, id_ in enumerate(articles_df['article_id'])}
        
        # Vectorized operations for index lookup
        customer_indices = np.array([customer_index[cid] for cid in transactions_df['customer_id']])
        article_indices = np.array([article_index[aid] for aid in transactions_df['article_id']])
        
        sparse_matrix = coo_matrix(
            (np.ones(len(transactions_df)),
            (customer_indices, article_indices)),
            shape=(len(customers_df), len(articles_df))
        ).tocsr()
        
        return sparse_matrix, customer_index, article_index
    
sparse_matrix_train, _, _ = create_sparse_matrix_coo(train, customers, articles)
sparse_matrix_val, _, _ = create_sparse_matrix_coo(val, customers, articles)


In [5]:
sparse_matrix_train.shape

(1371980, 105542)

In [6]:
class NCFDataset(Dataset):
    def __init__(self, 
                 sparse_matrix: csr_matrix, 
                 num_negatives: int = 4,
                 transform: Optional[Callable] = None):
        self.sparse_matrix = sparse_matrix
        self.num_negatives = num_negatives
        self.users, self.items = sparse_matrix.nonzero()
        self.num_items = sparse_matrix.shape[1]
        self.transform = transform
        
    def __len__(self):
        return len(self.users) * (1 + self.num_negatives)
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        if idx < len(self.users):
            sample = {
                'user': self.users[idx],
                'item': self.items[idx],
                'label': 1.0
            }
        else:
            pos_idx = idx % len(self.users)
            user = self.users[pos_idx]
            while True:
                neg_item = np.random.randint(self.num_items)
                if self.sparse_matrix[user, neg_item] == 0:
                    sample = {
                        'user': user,
                        'item': neg_item,
                        'label': 0.0
                    }
                    break
                    
        if self.transform:
            sample = self.transform(sample)
            
        return sample

class ToTensor:
    def __call__(self, sample: Dict[str, Any]) -> Dict[str, torch.Tensor]:
        return {
            'user': torch.tensor(sample['user'], dtype=torch.long),
            'item': torch.tensor(sample['item'], dtype=torch.long),
            'label': torch.tensor(sample['label'], dtype=torch.float)
        }

In [12]:
train_df = NCFDataset(sparse_matrix_train, num_negatives=0, transform=ToTensor())
val_df = NCFDataset(sparse_matrix_val, num_negatives=0, transform=ToTensor())
train_loader = DataLoader(train_df, batch_size=256, shuffle=True)
val_loader = DataLoader(val_df, batch_size=256, shuffle=False)

In [13]:
len(train_df)

27101148

In [31]:
sample_batch = next(iter(val_loader))


In [32]:
print('User shape:', sample_batch['user'].shape)
print('Item shape:', sample_batch['item'].shape)
print('Label shape:', sample_batch['label'].shape)

User shape: torch.Size([256])
Item shape: torch.Size([256])
Label shape: torch.Size([256])


In [35]:
class NeuMF(pl.LightningModule):
    def __init__(self, n_users, n_items, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters()
        
        # Model layers
        self.embedding_user = nn.Embedding(n_users, 64)
        self.embedding_item = nn.Embedding(n_items, 64)
        self.output = nn.Linear(64, 1)
        self.logistic = nn.Sigmoid()
        
        # Store learning rate as hparam
        self.learning_rate = learning_rate
        
    def forward(self, user_indices, item_indices):
        user_embedding = self.embedding_user(user_indices)
        item_embedding = self.embedding_item(item_indices)
        element_product = torch.mul(user_embedding, item_embedding)
        logits = self.output(element_product)
        rating = self.logistic(logits)
        return rating
    
    def _common_step(self, batch):
        user = batch['user']
        item = batch['item']
        label = batch['label']
        
        # Move computation to GPU as a batch
        prediction = self(user, item)
        # Use binary_cross_entropy instead of BCELoss for better performance
        loss = F.binary_cross_entropy(prediction.squeeze(), label)
        return loss, prediction, label
    
    def training_step(self, batch, batch_idx):
        loss, prediction, label = self._common_step(batch)
        # Reduce logging frequency for better performance
        if batch_idx % 100 == 0:  # Log every 100 batches
            self.log('train_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss, prediction, label = self._common_step(batch)
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = optim.AdamW(
            self.parameters(),
            lr=self.learning_rate,
            weight_decay=0.01,
            amsgrad=True
        )
        
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, 
            mode='min',
            factor=0.1,
            patience=3,
            verbose=True
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "train_loss",
                "frequency": 1
            }
        }
        
    @torch.no_grad()  # Disable gradient computation for predictions
    def predict(self, user, item):
        user = torch.tensor(user, dtype=torch.long, device=self.device)
        item = torch.tensor(item, dtype=torch.long, device=self.device)
        return self(user, item)

# Initialize model
model = NeuMF(sparse_matrix_train.shape[0], sparse_matrix_train.shape[1])

# Callbacks
early_stopping = pl.callbacks.EarlyStopping(
    monitor='val_loss',  # Changed to monitor validation loss instead
    patience=5,
    mode='min',
    verbose=True
)

# Initialize trainer
trainer = pl.Trainer(
    max_epochs=50,
    callbacks=[early_stopping],
    precision='16-mixed',
    accelerator='mps',
    devices=1,
    strategy='auto',
    fast_dev_run=False
)

# Learning rate finder
tuner = Tuner(trainer)
lr_finder = tuner.lr_find(
    model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader,
    min_lr=1e-4,
    max_lr=1e-2,
    num_training=100
)

# Update model with found learning rate
new_lr = lr_finder.suggestion()
model.learning_rate = new_lr

# Optional: Plot the results
print(f"Suggested learning rate: {new_lr}")

# Train the model
trainer.fit(model, train_loader, val_loader)

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
/opt/miniconda3/envs/eda/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=10` in the `DataLoader` to improve performance.
/opt/miniconda3/envs/eda/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:424: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=10` in the `DataLoader` to improve performance.
Finding best initial lr: 100%|██████████| 100/100 [00:08<00:00, 12.59it/s]`Trainer.fit` stopped: `max_steps=100` reached.
Finding best initial lr: 100%|██████████| 100/100 [00:08<00:00, 12.27it/s]
Learning rate 

Suggested learning rate: 0.009120108393559099
Epoch 0:  41%|████▏     | 3729/9031 [04:40<06:38, 13.31it/s, v_num=15]      


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined