In [1]:
import pandas as pd
import numpy as np
import time
import cudf
import os
import matplotlib.pyplot as plt
import seaborn as sns

import cupy as cp

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import mlflow
import mlflow.pytorch
from torch.optim import Adam

### Constant

In [2]:
file_path = '/home/nguyenvietduc-22520273/.rs_datasets'


### Load dataset

In [3]:
# Define data folder and file paths
train_file = os.path.join(file_path, "yoochoose/yoochoose-clicks.dat")

# Load data with cudf
start_time = time.time()
train_gdf = cudf.read_csv(
    train_file,
    names=['session_id', 'time', 'item_id', 'category'],
    dtype={'session_id': 'int32', 'item_id': 'int32', 'category': 'str'},
    parse_dates=['time']
)
end_time = time.time()
print(f"cuDF Load Time: {end_time - start_time:.4f} seconds")

test_file = os.path.join(file_path, "yoochoose/yoochoose-test.dat")

# Load test data with cudf
start_time = time.time()
test_gdf = cudf.read_csv(
    test_file,
    names=['session_id', 'time', 'item_id', 'category'],
    dtype={'session_id': 'int32', 'item_id': 'int32', 'category': 'str'},
    parse_dates=['time']
)
end_time = time.time()
print(f"cuDF Test Load Time: {end_time - start_time:.4f} seconds")


cuDF Load Time: 8.3651 seconds
cuDF Test Load Time: 0.5362 seconds


### Data processing

In [4]:
# Sort by session_id and time
train_gdf = train_gdf.sort_values(['session_id', 'time'])

test_gdf = test_gdf.sort_values(['session_id', 'time'])

# Encode item_id using cudf and cupy
unique_items = train_gdf['item_id'].unique()  # Vẫn trên GPU
item_encoder = cudf.Series(cp.arange(1, len(unique_items) + 1), index=unique_items)  # Tạo mapping trên GPU
train_gdf['item_idx'] = train_gdf['item_id'].map(item_encoder)  # Ánh xạ trực tiếp trên GPU

test_gdf['item_idx'] = test_gdf['item_id'].map(item_encoder) # Với item_encoder đã được tạo từ train dưới dạng cudf.Series
test_gdf = test_gdf.dropna(subset=['item_idx'])
test_gdf['item_idx'] = test_gdf['item_idx'].astype('int32')

# Group by session_id to create sequences
sessions_gdf = train_gdf.groupby('session_id')['item_idx'].agg('collect')  # 'collect' thay cho 'list' trong cudf
sessions = sessions_gdf.to_arrow().to_pylist()  # Chuyển sang list Python, nhưng chỉ ở bước cuối

test_sessions_gdf = test_gdf.groupby('session_id')['item_idx'].agg('collect')
test_sessions = test_sessions_gdf.to_arrow().to_pylist()

# Number of unique items and sessions
print(f"Number of unique items: {len(unique_items)}")
print(f"Number of sessions: {len(sessions)}")
print(f"Number of test sessions: {len(test_sessions)}")


Number of unique items: 52739
Number of sessions: 9249729
Number of test sessions: 2311994


In [5]:
# Dataset class
class SessionDataset(Dataset):
    def __init__(self, sessions, max_len=5):
        self.sessions = sessions
        self.max_len = max_len
    
    def __len__(self):
        return len(self.sessions)
    
    def __getitem__(self, idx):
        session = self.sessions[idx]
        session = session[-self.max_len:]  # Limit session length
        input_seq = [0] * (self.max_len - len(session)) + session[:-1]  # Padding + input sequence
        target = session[-1]  # Target item
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(target, dtype=torch.long)

# Create dataset and dataloader
max_len = 20  # Adjust based on your needs or dataset characteristics


In [6]:
# Tạo test dataset
dataset = SessionDataset(sessions, max_len=max_len)
dataloader = DataLoader(dataset, batch_size=128, shuffle=True)  # Larger batch size for bigger dataset
print(f"Train dataset created with {len(dataset)} sessions")

# Tạo test dataset
test_dataset = SessionDataset(test_sessions, max_len=max_len)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)
print(f"Test dataset created with {len(test_dataset)} sessions")

Train dataset created with 9249729 sessions
Test dataset created with 2311994 sessions


In [7]:
import pickle

# Đường dẫn thư mục
save_dir = '/mnt/c/Users/HP/mlops_cs317/data/processed/'

# 1. Lưu item_encoder (cudf.Series) dưới dạng pickle
with open(save_dir + 'item_encoder.pkl', 'wb') as f:
    pickle.dump(item_encoder.to_pandas(), f)  # convert về pandas để pickle

# 2. Lưu train_gdf và test_gdf dưới dạng .parquet
train_gdf.to_parquet(save_dir + 'clicks_train.parquet', index=False)
test_gdf.to_parquet(save_dir + 'clicks_test.parquet', index=False)

# 3. Lưu sessions và test_sessions dưới dạng pickle (.pkl)
with open(save_dir + 'train_sessions.pkl', 'wb') as f:
    pickle.dump(sessions, f)

with open(save_dir + 'test_sessions.pkl', 'wb') as f:
    pickle.dump(test_sessions, f)

print("Saved all successfully.")


Saved all successfully.


### SASREC Model

In [7]:
# SASRec model
class SASRec(nn.Module):
    def __init__(self, num_items, embedding_dim, hidden_size, num_heads, num_layers, dropout=0.1):
        super(SASRec, self).__init__()
        self.embedding = nn.Embedding(num_items + 1, embedding_dim, padding_idx=0)
        self.pos_encoding = self._generate_pos_encoding(embedding_dim, max_len=100)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=embedding_dim, nhead=num_heads, dim_feedforward=hidden_size, 
                dropout=dropout, batch_first=True
            ),
            num_layers=num_layers
        )
        self.fc = nn.Linear(embedding_dim, num_items + 1)
    
    def _generate_pos_encoding(self, embedding_dim, max_len):
        pos = torch.arange(max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-np.log(10000.0) / embedding_dim))
        pos_encoding = torch.zeros(max_len, embedding_dim)
        pos_encoding[:, 0::2] = torch.sin(pos * div_term)
        pos_encoding[:, 1::2] = torch.cos(pos * div_term)
        return pos_encoding.unsqueeze(0).cuda()
    
    def forward(self, x):
        batch_size, seq_len = x.size()
        x = self.embedding(x) + self.pos_encoding[:, :seq_len, :].to(x.device)
        mask = (x.sum(dim=-1) == 0).bool()  # Mask for padding
        output = self.transformer(x, src_key_padding_mask=mask)
        output = self.fc(output[:, -1, :])  # Predict next item
        return output


In [8]:
# Loss function
def negative_sampling_loss(outputs, targets, num_negatives=100, num_items=52739):
    batch_size = targets.size(0)
    
    # Logits của item đúng (positive)
    positive_logits = outputs[range(batch_size), targets]  # [batch_size]
    
    # Tạo chỉ số negative samples ngẫu nhiên
    negative_indices = torch.randint(1, num_items + 1, (batch_size, num_negatives)).cuda()  # [batch_size, num_negatives]
    
    # Lấy logits của negative samples
    # Mở rộng outputs để lập chỉ mục đúng
    negative_logits = outputs.gather(1, negative_indices)  # [batch_size, num_negatives]
    
    # Tính loss: log-sigmoid của positive + sum(log-sigmoid của negatives)
    loss = -torch.mean(F.logsigmoid(positive_logits) + torch.sum(F.logsigmoid(-negative_logits), dim=1))
    return loss


In [9]:
# Initialize model
model = SASRec(
    num_items=52739, 
    embedding_dim=64,  # Increased embedding size for larger dataset
    hidden_size=128,   # Increased hidden size
    num_heads=4, 
    num_layers=2
).cuda()


### Evaluation Func

In [10]:
# Hàm đánh giá trên test set
def evaluate(model, test_dataloader, num_items):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for inputs, targets in test_dataloader:
            inputs, targets = inputs.cuda(), targets.cuda()
            outputs = model(inputs)
            loss = negative_sampling_loss(outputs, targets, num_negatives=100, num_items=num_items)
            total_loss += loss.item()
    avg_loss = total_loss / len(test_dataloader)
    return avg_loss
    
def recall_at_20(model, dataloader, num_items, k=20):
    model.eval()
    total_recall = 0
    num_samples = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.cuda(), targets.cuda()
            outputs = model(inputs)
            _, top_k = torch.topk(outputs, k, dim=-1)
            matches = (top_k == targets.unsqueeze(-1)).sum(dim=-1)
            total_recall += matches.sum().item()
            num_samples += targets.numel()
    return total_recall / num_samples
    
def mrr_at_k(model, dataloader, num_items, k=20):
    model.eval()
    total_mrr = 0
    num_samples = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.cuda(), targets.cuda()
            outputs = model(inputs)
            _, top_k = torch.topk(outputs, k, dim=-1)
            for i, target in enumerate(targets):
                rank = (top_k[i] == target).nonzero(as_tuple=True)[0]
                if rank.numel() > 0:
                    total_mrr += 1.0 / (rank.item() + 1)
            num_samples += targets.size(0)
    return total_mrr / num_samples


### Training

In [11]:
# Thiết lập MLFlow
mlflow.set_tracking_uri("http://192.168.28.39:5000")
mlflow.set_experiment("YooChoose_SASRec_Tracking")

# Training loop với MLFlow và early stopping
num_epochs = 2
num_items = 52739
learning_rate = 0.001
num_negatives = 100
patience = 2
best_loss = float('inf')
patience_counter = 0
checkpoint_path = "best_model.pth"

optimizer = Adam(model.parameters(), lr=learning_rate)

with mlflow.start_run(run_name="SASRec_NegativeSampling"):
    mlflow.log_param("num_epochs", num_epochs)
    mlflow.log_param("num_items", num_items)
    mlflow.log_param("learning_rate", learning_rate)
    mlflow.log_param("num_negatives", num_negatives)
    mlflow.log_param("patience", patience)
    mlflow.log_param("batch_size", dataloader.batch_size)
    mlflow.log_param("max_len", 5)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        start_time = time.time()
        for batch_idx, (inputs, targets) in enumerate(dataloader):
            inputs, targets = inputs.cuda(), targets.cuda()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = negative_sampling_loss(outputs, targets, num_negatives=num_negatives, num_items=num_items)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            
            if (batch_idx + 1) % 100 == 0:
                print(f"Epoch {epoch+1}, Batch {batch_idx+1}/{len(dataloader)}, Loss: {loss.item():.4f}")
        
        avg_train_loss = total_loss / len(dataloader)
        epoch_time = time.time() - start_time
        print(f"Epoch {epoch+1}/{num_epochs}, Avg Train Loss: {avg_train_loss:.4f}, Time: {epoch_time:.2f} seconds")
        
        # Evaluate
        avg_test_loss = evaluate(model, test_dataloader, num_items)
        print(f"Epoch {epoch+1}/{num_epochs}, Avg Test Loss: {avg_test_loss:.4f}")
        
        # Log metrics
        mlflow.log_metric("train_loss", avg_train_loss, step=epoch)
        mlflow.log_metric("test_loss", avg_test_loss, step=epoch)
        # mlflow.log_metric("recall@20", recall_20, step=epoch)
        # mlflow.log_metric("mrr@20", mrr_20, step=epoch)
        mlflow.log_metric("epoch_time", epoch_time, step=epoch)
        
        # Early stopping
        if avg_test_loss < best_loss:
            best_loss = avg_test_loss
            patience_counter = 0
            torch.save({'epoch': epoch + 1, 'model_state_dict': model.state_dict(), 
                        'optimizer_state_dict': optimizer.state_dict(), 'loss': best_loss}, 
                       checkpoint_path)
            mlflow.log_artifact(checkpoint_path)
            print(f"Saved best model at Epoch {epoch+1} with Test Loss: {best_loss:.4f}")
        else:
            patience_counter += 1
            print(f"No improvement. Patience counter: {patience_counter}/{patience}")
        
        if patience_counter >= patience:
            print(f"Early stopping triggered after Epoch {epoch+1}. Best Test Loss: {best_loss:.4f}")
            break
    
    mlflow.pytorch.log_model(model, "final_model")
    
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        print(f"Loaded best model from {checkpoint_path} with Test Loss: {checkpoint['loss']:.4f}")

2025/04/12 09:34:33 INFO mlflow.tracking.fluent: Experiment with name 'YooChoose_SASRec_Tracking' does not exist. Creating a new experiment.


Epoch 1, Batch 100/72264, Loss: 13.5384
Epoch 1, Batch 200/72264, Loss: 5.6604
Epoch 1, Batch 300/72264, Loss: 4.2298
Epoch 1, Batch 400/72264, Loss: 4.1321
Epoch 1, Batch 500/72264, Loss: 3.7262
Epoch 1, Batch 600/72264, Loss: 3.9042
Epoch 1, Batch 700/72264, Loss: 3.7726
Epoch 1, Batch 800/72264, Loss: 3.5049
Epoch 1, Batch 900/72264, Loss: 3.5673
Epoch 1, Batch 1000/72264, Loss: 3.6415
Epoch 1, Batch 1100/72264, Loss: 3.7526
Epoch 1, Batch 1200/72264, Loss: 3.6199
Epoch 1, Batch 1300/72264, Loss: 3.5575
Epoch 1, Batch 1400/72264, Loss: 3.9104
Epoch 1, Batch 1500/72264, Loss: 3.5077
Epoch 1, Batch 1600/72264, Loss: 3.5615
Epoch 1, Batch 1700/72264, Loss: 3.4799
Epoch 1, Batch 1800/72264, Loss: 3.6982
Epoch 1, Batch 1900/72264, Loss: 3.6894
Epoch 1, Batch 2000/72264, Loss: 3.6175
Epoch 1, Batch 2100/72264, Loss: 3.5000
Epoch 1, Batch 2200/72264, Loss: 3.4162
Epoch 1, Batch 2300/72264, Loss: 3.2168
Epoch 1, Batch 2400/72264, Loss: 3.5162
Epoch 1, Batch 2500/72264, Loss: 3.2582
Epoch 1,

KeyboardInterrupt: 