# 02_模型架构与实现报告

**项目名称**: SASRec.pytorch - 基于Transformer的序列推荐系统  
**版本**: v1.0  
**创建日期**: 2024-01-10  

---

## 目录

1. [SASRec模型架构](#1-SASRec模型架构)  
2. [TiSASRec时序感知机制](#2-TiSASRec时序感知机制)  
3. [mHC流形约束超连接](#3-mHC流形约束超连接)  
4. [核心代码实现](#4-核心代码实现)  
5. [模型对比](#5-模型对比)  

---

## 1. SASRec模型架构

### 1.1 自注意力机制理论基础

SASRec的核心是**自注意力机制（Self-Attention）**，它通过计算序列中每个位置与其他位置之间的关联强度，捕捉序列内的长期依赖关系。其数学表达式如下：

**缩放点积注意力（Scaled Dot-Product Attention）**：

$$\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V$$

其中：
- $Q \in \mathbb{R}^{n \times d_k}$：查询矩阵（Query）
- $K \in \mathbb{R}^{n \times d_k}$：键矩阵（Key）
- $V \in \mathbb{R}^{n \times d_v}$：值矩阵（Value）
- $d_k$：注意力头的维度
- $\sqrt{d_k}$：缩放因子，用于稳定梯度

**多头注意力（Multi-Head Attention）**：

$$\text{MultiHead}(Q, K, V) = \text{Concat}\left(\text{head}_1, \ldots, \text{head}_h\right)W^O$$

其中每个注意力头计算为：

$$\text{head}_i = \text{Attention}\left(QW_i^Q, KW_i^K, VW_i^V\right)$$

其中 $W_i^Q, W_i^K, W_i^V \in \mathbb{R}^{d_{model} \times d_k}$ 和 $W^O \in \mathbb{R}^{hd_v \times d_{model}}$ 为可学习的投影矩阵。

**位置编码（Positional Encoding）**：

由于Transformer没有循环结构，需要显式注入位置信息：

$$PE_{(pos,2i)} = \sin\left(\frac{pos}{10000^{2i/d_{model}}}\right)$$
$$PE_{(pos,2i+1)} = \cos\left(\frac{pos}{10000^{2i/d_{model}}}\right)$$

其中 $pos$ 为位置索引，$i$ 为维度索引。

### 1.2 模型概述

SASRec (Self-Attentive Sequential Recommendation) 是基于Transformer的自注意力序列推荐模型。它使用多头注意力机制来捕捉用户行为序列中的长期依赖关系。

**核心特点**：
- 使用位置编码捕捉序列顺序
- 多头注意力机制学习物品间的依赖
- 基于物品嵌入的预测

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_units, num_heads, dropout_rate):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = hidden_units // num_heads
        self.W_Q = nn.Linear(hidden_units, hidden_units, bias=False)
        self.W_K = nn.Linear(hidden_units, hidden_units, bias=False)
        self.W_V = nn.Linear(hidden_units, hidden_units, bias=False)
        self.W_O = nn.Linear(hidden_units, hidden_units, bias=False)
        self.dropout = nn.Dropout(dropout_rate)
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to('cuda')

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        # Linear projections
        Q = self.W_Q(query).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        K = self.W_K(key).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        V = self.W_V(value).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        
        # Scaled dot-product attention
        attention = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
        if mask is not None:
            attention = attention.masked_fill(mask == 0, -1e9)
        attention = self.dropout(F.softmax(attention, dim=-1))
        
        # Output
        output = torch.matmul(attention, V).permute(0, 2, 1, 3).contiguous()
        output = output.view(batch_size, -1, self.num_heads * self.head_dim)
        output = self.W_O(output)
        return output

In [None]:
class PointWiseFeedForward(nn.Module):
    def __init__(self, hidden_units, dropout_rate):
        super(PointWiseFeedForward, self).__init__()
        self.W_1 = nn.Linear(hidden_units, hidden_units, bias=True)
        self.W_2 = nn.Linear(hidden_units, hidden_units, bias=True)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, inputs):
        output = self.W_2(F.relu(self.W_1(inputs)))
        output = self.dropout(output)
        return output

In [None]:
class SASRecBlock(nn.Module):
    def __init__(self, hidden_units, num_heads, dropout_rate):
        super(SASRecBlock, self).__init__()
        self.mha = MultiHeadAttention(hidden_units, num_heads, dropout_rate)
        self.ffn = PointWiseFeedForward(hidden_units, dropout_rate)
        self.layernorm1 = nn.LayerNorm(hidden_units, eps=1e-8)
        self.layernorm2 = nn.LayerNorm(hidden_units, eps=1e-8)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)

    def forward(self, input_emb, mask):
        attn_output = self.mha(input_emb, input_emb, input_emb, mask)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(input_emb + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)
        return out2

In [None]:
class SASRec(nn.Module):
    def __init__(self, item_num, hidden_units=50, num_blocks=2, num_heads=1, dropout_rate=0.2, maxlen=100):
        super(SASRec, self).__init__()
        self.item_num = item_num
        self.hidden_units = hidden_units
        self.maxlen = maxlen
        
        self.item_embeddings = nn.Embedding(item_num + 1, hidden_units, padding_idx=0)
        self.pos_embeddings = nn.Embedding(maxlen + 1, hidden_units)
        
        self.blocks = nn.ModuleList([
            SASRecBlock(hidden_units, num_heads, dropout_rate) 
            for _ in range(num_blocks)
        ])
        
        self.LayerNorm = nn.LayerNorm(hidden_units, eps=1e-8)
        self.dropout = nn.Dropout(dropout_rate)
        
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0, std=0.01)
        elif isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
        if module.bias is not None:
            nn.init.zeros_(module.bias)
    
    def forward(self, input_seq, mask):
        seq_emb = self.item_embeddings(input_seq)
        positions = torch.arange(input_seq.size(1), dtype=torch.long, device=input_seq.device)
        pos_emb = self.pos_embeddings(positions)
        
        emb = seq_emb + pos_emb
        emb = self.dropout(self.LayerNorm(emb))
        
        for block in self.blocks:
            emb = block(emb, mask)
        
        return emb
    
    def predict(self, input_seq, target_items, mask):
        seq_emb = self.forward(input_seq, mask)
        last_emb = seq_emb[:, -1, :]
        target_emb = self.item_embeddings(target_items)
        scores = torch.matmul(last_emb, target_emb.transpose(0, 1))
        return scores

## 2. TiSASRec时序感知机制

### 2.1 时间间隔建模理论基础

TiSASRec在SASRec基础上引入**时间间隔信息**，相邻交互的时间间隔被编码并融入注意力计算。时间间隔的编码方式如下：

**时间间隔编码公式**：

$$t_{ij} = \log\left(1 + \Delta_{ij}\right) / \log\left(1 + T_{max}\right)$$

其中：
- $\Delta_{ij} = |t_i - t_j|$：相邻交互之间的时间差
- $t_i, t_j$ 分别为第 $i$ 和 $j$ 个交互的时间戳
- $T_{max}$：预设的最大时间间隔阈值（默认30天 = 86400×30秒）
- $\log(1 + \cdot)$：对数变换，用于压缩时间跨度范围

**带时间间隔的注意力计算**：

$$\text{Attention}_{time}(Q, K, V, T) = \text{softmax}\left(\frac{QK^T + \alpha \cdot T}{\sqrt{d_k}}\right)V$$

其中 $T$ 为时间间隔矩阵，$\alpha$ 为时间间隔的权重系数。

### 2.2 时间间隔建模

TiSASRec在SASRec基础上引入时间间隔信息，相邻交互的时间间隔被编码并融入注意力计算。

In [None]:
def compute_time_matrix(timestamps, max_time_gap=86400 * 30):
    """计算相邻交互之间的时间间隔矩阵"""
    batch_size, seq_len = timestamps.shape
    time_diffs = timestamps.unsqueeze(2) - timestamps.unsqueeze(1)
    time_diffs = torch.log1p(torch.clamp(time_diffs, min=0)) / (torch.log1p(torch.tensor(max_time_gap)) + 1e-8)
    return time_diffs

## 3. mHC流形约束超连接

### 3.1 Sinkhorn-Knopp算法理论基础

mHC（Manifold Hyperconnection）创新性地引入**Sinkhorn-Knopp算法**，将权重矩阵投影到**双随机矩阵流形**上，实现流形约束的正则化。

**双随机矩阵定义**：

矩阵 $P \in \mathbb{R}^{n \times n}$ 被称为双随机矩阵，当且仅当：

$$P \mathbf{1} = \mathbf{1} \quad \text{且} \quad P^T \mathbf{1} = \mathbf{1}$$

即矩阵的每一行和每一列的和均为1，且所有元素非负。

**Sinkhorn-Knopp迭代算法**：

通过交替缩放行和列，将任意非负矩阵 $M$ 收敛到双随机矩阵 $P$：

$$
\begin{aligned}
&\text{初始化: } P^{(0)} = M + \epsilon \\
&\text{迭代 } k = 1, 2, \ldots, K: \\
&\quad P^{(k)}_{\text{row}} = \frac{P^{(k-1)}}{P^{(k-1)} \mathbf{1} \mathbf{1}^T} \quad \text{（行归一化）} \\
&\quad P^{(k)} = \frac{P^{(k)}_{\text{row}}}{\mathbf{1} \mathbf{1}^T P^{(k)}_{\text{row}}} \quad \text{（列归一化）}
\end{aligned}
$$

其中分数形式表示逐元素除法，$\mathbf{1}$ 为全1向量。

**mHC层的前向传播**：

$$
\begin{aligned}
&H^{(h)} = \text{Sinkhorn}\left(W^{(h)}ight) \quad \forall h \in \{1, \ldots, h_{num}\\
&\text{Head}^{(h)} = X \cdot H^{(h)} \\
&Y = \text{Projection}\left(\frac{1}{h_{num}} \sum_{h=1}^{h_{num}} \text{Head}^{(h)}\right) \\
&\text{Output} = X + Y
\end{aligned}
$$

### 3.2 Sinkhorn-Knopp算法

Sinkhorn-Knopp算法通过交替缩放行和列，将任意非负矩阵转换为双随机矩阵。

In [None]:
def sinkhorn_knopp(M, num_iterations=100, eps=1e-6):
    """Sinkhorn-Knopp算法：将矩阵投影到双随机矩阵流形"""
    n = M.shape[0]
    P = M + eps
    
    for _ in range(num_iterations):
        row_sums = P.sum(dim=1, keepdim=True)
        P = P / (row_sums + 1e-10)
        col_sums = P.sum(dim=0, keepdim=True)
        P = P / (col_sums + 1e-10)
    
    return P

In [None]:
class MHCLayer(nn.Module):
    def __init__(self, hidden_units, num_heads=4, dropout_rate=0.1):
        super(MHCLayer, self).__init__()
        self.hidden_units = hidden_units
        self.num_heads = num_heads
        self.hyper_weights = nn.Parameter(torch.randn(num_heads, hidden_units, hidden_units))
        self.out_proj = nn.Linear(hidden_units, hidden_units)
        self.dropout = nn.Dropout(dropout_rate)
        self.apply(self._init_weights)
    
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
    
    def forward(self, inputs, mask=None):
        batch_size, seq_len, hidden_units = inputs.shape
        
        hyper_proj = []
        for h in range(self.num_heads):
            W = self.hyper_weights[h]
            W_proj = sinkhorn_knopp(W, num_iterations=20)
            hyper_proj.append(W_proj)
        
        head_outputs = []
        for h in range(self.num_heads):
            output = torch.matmul(inputs, hyper_proj[h])
            if mask is not None:
                output = output.masked_fill(mask.unsqueeze(-1) == 0, 0)
            head_outputs.append(output)
        
        combined = torch.stack(head_outputs, dim=-1)
        combined = combined.mean(dim=-1)
        output = self.out_proj(combined)
        output = self.dropout(output)
        
        return inputs + output

## 4. 核心代码实现

### 4.1 模型组件对比

| 组件 | SASRec | TiSASRec | mHC |
|------|--------|----------|-----|
| 位置编码 | Learnable | Learnable | Learnable |
| 时间编码 | None | TimeInterval | None |
| 注意力 | MultiHead | MultiHead+Time | MultiHead+mHC |

### 4.2 训练流程

In [None]:
def train_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    
    for batch in train_loader:
        user_ids, seq, pos_items, neg_items, mask = batch
        seq = seq.to(device)
        pos_items = pos_items.to(device)
        neg_items = neg_items.to(device)
        mask = mask.to(device)
        
        optimizer.zero_grad()
        seq_emb = model(seq, mask)
        
        pos_emb = model.item_embeddings(pos_items)
        pos_scores = (seq_emb[:, -1, :] * pos_emb).sum(dim=-1)
        
        neg_emb = model.item_embeddings(neg_items)
        neg_scores = (seq_emb[:, -1, :] * neg_emb).sum(dim=-1)
        
        loss = criterion(pos_scores, neg_scores)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

## 5. 模型对比

### 5.1 模型结构对比

| 维度 | SASRec | TiSASRec | SASRec+mHC | TiSASRec+mHC |
|------|--------|----------|------------|--------------|
| 嵌入维度 | 50 | 50 | 50 | 50 |
| Transformer层数 | 2 | 2 | 2 | 2 |
| 注意力头数 | 2 | 2 | 2 | 2 |

---

**上一章**: [01_数据与实验设计报告.ipynb](./01_数据与实验设计报告.ipynb)  
**下一章**: [03_训练与评估报告.ipynb](./03_训练与评估报告.ipynb)