In [2]:
import torch
import torch.nn as nn
import math

In [6]:
class AUGRU(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        
        # 更新门
        self.update_gate = nn.Linear(input_size + hidden_size, hidden_size)
        # 重置门
        self.reset_gate = nn.Linear(input_size + hidden_size, hidden_size)
        # 候选隐状态
        self.candidate = nn.Linear(input_size + hidden_size, hidden_size)
        # 注意力门
        self.attention_gate = nn.Linear(input_size + hidden_size, hidden_size)
        
    def forward(self, x, h=None):
        batch_size, seq_len, _ = x.size()
        
        if h is None:
            h = torch.zeros(batch_size, self.hidden_size, device=x.device)
            
        output = []
        
        for t in range(seq_len):
            xt = x[:, t, :]  # [batch_size, input_size]
            
            # 合并当前输入和上一个隐状态
            combined = torch.cat([xt, h], dim=1)
            
            # 计算各个门控值
            update = torch.sigmoid(self.update_gate(combined))
            reset = torch.sigmoid(self.reset_gate(combined))
            attention = torch.sigmoid(self.attention_gate(combined))
            
            # 计算候选隐状态
            reset_hidden = reset * h
            candidate_combined = torch.cat([xt, reset_hidden], dim=1)
            candidate_hidden = torch.tanh(self.candidate(candidate_combined))
            
            # 更新隐状态
            h = (1 - update) * h + update * candidate_hidden
            
            # 应用注意力门
            h = attention * h
            
            output.append(h)
            
        return torch.stack(output, dim=1)  # [batch_size, seq_len, hidden_size]


In [7]:
class SequenceRecommender(nn.Module):
    def __init__(self, num_items, embedding_dim, hidden_dim, num_heads=4, dropout=0.1):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        
        # Embedding层
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        
        # GRU层
        self.gru = nn.GRU(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            batch_first=True
        )
        
        # Multi-head Self-Attention层
        self.attention = nn.MultiheadAttention(
            embed_dim=hidden_dim,
            num_heads=num_heads,
            dropout=dropout,
            batch_first=True
        )
        
        # AUGRU层
        self.augru = AUGRU(hidden_dim, hidden_dim)
        
        # 输出层
        self.output_layer = nn.Linear(hidden_dim, num_items)
        
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
    def forward(self, seq):
        # seq shape: [batch_size, seq_len]
        
        # 1. Embedding层
        embedded = self.item_embedding(seq)  # [batch_size, seq_len, embedding_dim]
        
        # 2. GRU层
        gru_output, _ = self.gru(embedded)  # [batch_size, seq_len, hidden_dim]
        
        # 3. Self-Attention层
        attn_output, _ = self.attention(
            gru_output, gru_output, gru_output
        )  # [batch_size, seq_len, hidden_dim]
        
        # 4. 残差连接和层归一化
        attn_output = self.dropout(attn_output)
        normalized = self.layer_norm(gru_output + attn_output)
        
        # 5. AUGRU层
        augru_output = self.augru(normalized)  # [batch_size, seq_len, hidden_dim]
        
        # 6. 输出层
        output = self.output_layer(augru_output)  # [batch_size, seq_len, num_items]
        
        return output

In [8]:
# 使用示例
def test_model():
    # 模型参数
    num_items = 1000
    embedding_dim = 64
    hidden_dim = 128
    batch_size = 32
    seq_len = 50
    
    # 创建模型
    model = SequenceRecommender(
        num_items=num_items,
        embedding_dim=embedding_dim,
        hidden_dim=hidden_dim
    )
    
    # 创建示例输入
    x = torch.randint(0, num_items, (batch_size, seq_len))
    
    # 前向传播
    output = model(x)
    print(f"输入形状: {x.shape}")
    print(f"输出形状: {output.shape}")

In [9]:
test_model()

输入形状: torch.Size([32, 50])
输出形状: torch.Size([32, 50, 1000])


In [12]:
import torch
import torch.nn as nn
import math

class SequenceRecommender(nn.Module):
    def __init__(self, num_items, embedding_dim, hidden_dim, num_heads=4, dropout=0.1):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        
        # Embedding层
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        
        # 投影层：将embedding_dim转换为hidden_dim
        self.input_projection = nn.Linear(embedding_dim, hidden_dim)
        
        # GRU层 - 每个时间步一个GRU
        self.gru_cell = nn.GRUCell(hidden_dim, hidden_dim)
        
        # Attention层 - 每个时间步一个
        self.attention = AttentionLayer(hidden_dim, num_heads, dropout)
        
        # AUGRU层 - 每个时间步一个
        self.augru_cell = nn.GRUCell(hidden_dim * 2, hidden_dim)  # 输入维度翻倍因为要concat
        
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(hidden_dim)
        
        # 输出层
        self.output_layer = nn.Linear(hidden_dim, 1)
        
    def forward(self, seq, target_items):
        """
        Args:
            seq: 用户行为序列 [batch_size, seq_len]
            target_items: 目标物品 [batch_size]
        """
        batch_size, seq_len = seq.size()
        
        # 1. Embedding层
        seq_emb = self.item_embedding(seq)  # [batch_size, seq_len, embedding_dim]
        target_emb = self.item_embedding(target_items)  # [batch_size, embedding_dim]
        
        # 2. 投影到hidden_dim
        seq_hidden = self.input_projection(seq_emb)  # [batch_size, seq_len, hidden_dim]
        target_hidden = self.input_projection(target_emb)  # [batch_size, hidden_dim]
        
        # 3. 逐个时间步处理
        h_gru = torch.zeros(batch_size, self.hidden_dim, device=seq.device)
        h_augru = torch.zeros(batch_size, self.hidden_dim, device=seq.device)
        final_outputs = []
        
        for t in range(seq_len):
            # 获取当前时间步的物品embedding
            current_input = seq_hidden[:, t, :]  # [batch_size, hidden_dim]
            
            # GRU处理
            gru_output = self.gru_cell(current_input, h_gru)  # [batch_size, hidden_dim]
            h_gru = gru_output  # 更新GRU隐状态
            
            # Attention处理
            # 将GRU输出扩展为序列形式以适应attention层
            gru_output_seq = gru_output.unsqueeze(1)  # [batch_size, 1, hidden_dim]
            attn_output = self.attention(
                query=target_hidden,    # [batch_size, hidden_dim]
                key=gru_output_seq,     # [batch_size, 1, hidden_dim]
                value=gru_output_seq    # [batch_size, 1, hidden_dim]
            )  # [batch_size, hidden_dim]
            
            # 连接GRU输出和attention输出
            augru_input = torch.cat([gru_output, attn_output], dim=1)  # [batch_size, hidden_dim*2]
            
            # AUGRU处理
            augru_output = self.augru_cell(augru_input, h_augru)  # [batch_size, hidden_dim]
            h_augru = augru_output  # 更新AUGRU隐状态
            
            final_outputs.append(augru_output)
        
        # 4. 堆叠所有时间步的输出
        final_outputs = torch.stack(final_outputs, dim=1)  # [batch_size, seq_len, hidden_dim]
        
        # 5. 取最后一个时间步的输出
        final_hidden = final_outputs[:, -1, :]  # [batch_size, hidden_dim]
        
        # 6. 输出层
        score = self.output_layer(final_hidden)  # [batch_size, 1]
        
        return score.squeeze(-1)  # [batch_size]

class AttentionLayer(nn.Module):
    def __init__(self, hidden_dim, num_heads, dropout=0.1):
        super().__init__()
        self.num_heads = num_heads
        self.hidden_dim = hidden_dim
        self.head_dim = hidden_dim // num_heads
        assert self.head_dim * num_heads == hidden_dim, "hidden_dim must be divisible by num_heads"
        
        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, query, key, value):
        batch_size = query.size(0)
        seq_len = key.size(1)
        
        # 1. 线性投影
        q = self.q_proj(query)  # [batch_size, hidden_dim]
        k = self.k_proj(key)    # [batch_size, seq_len, hidden_dim]
        v = self.v_proj(value)  # [batch_size, seq_len, hidden_dim]
        
        # 2. 将query扩展为与key相同的序列长度
        q = q.unsqueeze(1)  # [batch_size, 1, hidden_dim]
        
        # 3. 计算注意力分数
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.hidden_dim)
        attn_weights = torch.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        # 4. 加权求和
        attn_output = torch.matmul(attn_weights, v)  # [batch_size, 1, hidden_dim]
        
        return attn_output.squeeze(1)  # [batch_size, hidden_dim]

# 使用示例
def test_model():
    num_items = 1000
    embedding_dim = 64
    hidden_dim = 128
    batch_size = 32
    seq_len = 50
    
    model = SequenceRecommender(
        num_items=num_items,
        embedding_dim=embedding_dim,
        hidden_dim=hidden_dim
    )
    
    seq = torch.randint(0, num_items, (batch_size, seq_len))
    target_items = torch.randint(0, num_items, (batch_size,))
    
    output = model(seq, target_items)
    print(f"输入序列形状: {seq.shape}")
    print(f"目标物品形状: {target_items.shape}")
    print(f"输出形状: {output.shape}")

if __name__ == "__main__":
    test_model()

输入序列形状: torch.Size([32, 50])
目标物品形状: torch.Size([32])
输出形状: torch.Size([32])
