In [2]:
!pip install torch_geometric

Collecting torch_geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/63.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch_geometric
Successfully installed torch_geometric-2.6.1


In [3]:
from transformers import BertTokenizer, BertModel
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv

class PopularityAwareRecommender(nn.Module):
    def __init__(
        self,
        num_users,
        num_items,
        user_texts,
        item_texts,
        embedding_dim=768,  # Set to 768 to match BERT output
        lstm_hidden_dim=32,
        gcn_hidden_dim=32,
        num_gcn_layers=2,
        temperature=0.07,
    ):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.temperature = temperature

        # Initialize BERT model and tokenizer
        self.bert_model = BertModel.from_pretrained('bert-base-uncased')
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

        # Use BERT to encode user and item embeddings
        self.user_embeddings = self.encode_texts(user_texts)
        self.item_embeddings = self.encode_texts(item_texts)

        # LSTM for Sequential Modeling of Browsing History
        self.user_lstm = nn.LSTM(
            embedding_dim,
            lstm_hidden_dim,
            batch_first=True,
            bidirectional=True
        )

        # LSTM for Item Popularity Time Series
        self.popularity_lstm = nn.LSTM(
            1,  # Input is the popularity score (time series)
            lstm_hidden_dim,
            batch_first=True
        )

        # GCN layers for Item Embedding
        self.gcn_layers = nn.ModuleList([
            GCNConv(embedding_dim if i == 0 else gcn_hidden_dim, gcn_hidden_dim)
            for i in range(num_gcn_layers)
        ])

        # Final projection layers
        self.item_projection = nn.Linear(
            gcn_hidden_dim + lstm_hidden_dim * 2,  # GCN output + popularity context
            embedding_dim
        )
        self.user_projection = nn.Linear(
            embedding_dim + lstm_hidden_dim * 2,  # Text embedding + interaction context
            embedding_dim
        )

    def encode_texts(self, texts):
        # Encode texts using BERT
        inputs = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=128)
        with torch.no_grad():
            outputs = self.bert_model(**inputs)
            embeddings = outputs.last_hidden_state[:, 0, :]  # Use [CLS] token embeddings
        return embeddings

    def forward(
        self,
        user_ids,
        interaction_histories,
        item_popularity_seq,
        edge_index
    ):
        # Get user embeddings from precomputed BERT embeddings
        user_emb = self.user_embeddings[user_ids]  # Shape: [num_users, embedding_dim]

        # Process user interaction history to get sequential representation
        item_seq_emb = self.item_embeddings[interaction_histories]  # Shape: [batch_size, seq_len, embedding_dim]
        seq_output, _ = self.user_lstm(item_seq_emb)
        interaction_context = seq_output[:, -1, :]  # Use the last hidden state as user interaction context

        # Process popularity time series for each item
        popularity_scores = item_popularity_seq.unsqueeze(-1)  # Shape: [batch_size, seq_len, 1]
        pop_output, _ = self.popularity_lstm(popularity_scores)
        pop_context = pop_output[:, -1, :]  # Last hidden state for popularity context

        # Apply GCN layers to item embeddings
        x = self.item_embeddings  # Initial node features
        for gcn_layer in self.gcn_layers:
            x = gcn_layer(x, edge_index)
            x = F.relu(x)
            x = F.dropout(x, p=0.1, training=self.training)

        # Combine item embeddings with popularity context
        item_final = self.item_projection(
            torch.cat([x, pop_context], dim=1)
        )
        user_final = self.user_projection(
            torch.cat([user_emb, interaction_context], dim=1)
        )

        return user_final, item_final

    def compute_loss(
        self,
        user_emb,
        item_emb,
        positive_items,
        negative_items,
        popularity_weights
    ):
        # Compute similarity scores
        pos_scores = torch.sum(user_emb * item_emb[positive_items], dim=1)
        neg_scores = torch.sum(user_emb.unsqueeze(1) * item_emb[negative_items], dim=2)

        # Apply popularity adjustment
        pop_weights = popularity_weights[positive_items]
        adjusted_pos_scores = pos_scores / (pop_weights + 1e-6)

        # Compute InfoNCE loss with popularity adjustment
        pos_exp = torch.exp(adjusted_pos_scores / self.temperature)
        neg_exp = torch.exp(neg_scores / self.temperature).sum(dim=1)
        loss = -torch.log(pos_exp / (pos_exp + neg_exp)).mean()

        return loss

    def get_recommendations(self, user_emb, item_emb, k=10):
        # Compute similarity scores
        scores = torch.matmul(user_emb, item_emb.t())

        # Get top-k recommendations
        _, top_items = torch.topk(scores, k=k, dim=1)
        return top_items

## dataset

In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Sample user profiles and movie descriptions
user_profiles = [
    "User interested in action and adventure movies.",
    "User prefers romantic comedies and dramas.",
    "User enjoys science fiction and fantasy films."
]

movie_descriptions = [
    "An action-packed adventure in the mountains.",
    "A heartwarming romantic comedy set in Paris.",
    "A sci-fi epic exploring distant galaxies.",
    "A fantasy tale of magic and dragons.",
    "A drama about family and relationships."
]

# Simulated user interaction histories (indices of movies)
user_interactions = [
    [0, 1, 2],  # User 0 watched movies 0, 1, 2
    [1, 3, 4],  # User 1 watched movies 1, 3, 4
    [2, 3, 0]   # User 2 watched movies 2, 3, 0
]

# Simulated item popularity sequences (random values)
item_popularity_seq = np.random.rand(len(movie_descriptions), 10)  # 10 time steps

# Number of users and items
num_users = len(user_profiles)
num_items = len(movie_descriptions)

In [5]:
class ToyDataset(Dataset):
    def __init__(self, user_interactions, item_popularity_seq, num_users, num_items):
        self.user_interactions = user_interactions
        self.item_popularity_seq = item_popularity_seq
        self.num_users = num_users
        self.num_items = num_items
        self.edge_index = self.build_edge_index()  # Automatically build edge index

    def build_edge_index(self):
        edges = []
        for user_id, interactions in enumerate(self.user_interactions):
            for item_id in interactions:
                edges.append([user_id, self.num_users + item_id])  # user-to-item
                edges.append([self.num_users + item_id, user_id])  # item-to-user (reverse)
        return torch.tensor(edges, dtype=torch.long).t().contiguous()

    def __len__(self):
        return len(self.user_interactions)

    def __getitem__(self, idx):
        user_interaction = self.user_interactions[idx]
        popularity_seq = self.item_popularity_seq[user_interaction]
        return {
            'user_id': idx,
            'interaction_history': torch.tensor(user_interaction, dtype=torch.long),
            'popularity_seq': torch.tensor(popularity_seq, dtype=torch.float),
            'edge_index': self.edge_index  # Include edge_index
        }
# Create dataset and dataloader
dataset = ToyDataset(user_interactions, item_popularity_seq, num_users, num_items)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)


## training

In [6]:
def train_model(model, dataloader, num_epochs=5, learning_rate=0.001):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    model.train()

    for epoch in range(num_epochs):
        total_loss = 0
        for batch in dataloader:
            user_ids = batch['user_id']
            interaction_histories = batch['interaction_history']
            popularity_seq = batch['popularity_seq']
            edge_index = batch['edge_index']  # Get edge_index from the batch

            # Forward pass
            user_emb, item_emb = model(
                user_ids=user_ids,
                interaction_histories=interaction_histories,
                item_popularity_seq=popularity_seq,
                edge_index=edge_index
            )

            # Simulate positive and negative items for loss computation
            positive_items = interaction_histories[:, -1]  # Last item in history as positive
            negative_items = torch.randint(0, num_items, (len(user_ids), 5))  # 5 negative samples

            # Compute popularity weights (for simplicity, using random values)
            popularity_weights = torch.rand(num_items)

            # Compute loss
            loss = model.compute_loss(
                user_emb=user_emb,
                item_emb=item_emb,
                positive_items=positive_items,
                negative_items=negative_items,
                popularity_weights=popularity_weights
            )

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')



In [7]:

# Initialize the model
model = PopularityAwareRecommender(
    num_users=num_users,
    num_items=num_items,
    user_texts=user_profiles,
    item_texts=movie_descriptions
)

# Train the model
train_model(model, dataloader)