## 1.注意力机制


### MHA

multiheadattention(torch.nn.Module):

这表示定义一个名为 multiheadattention 的类，并且它继承自 nn.Module。nn.Module 是 PyTorch 中所有神经网络模块的基类，提供参数管理、子模块注册、前向传播接口、模型保存加载、.to(device)、.eval() 等核心功能。继承它的本质含义是：让 multiheadattention 成为一个“可训练的神经网络模块”，内部定义的层（如 nn.Linear）会自动被注册为可学习参数，从而能被优化器管理并参与反向传播。没有继承 nn.Module，这个类就只是普通 Python 类，不具备深度学习框架的功能。

super().init():

这行代码调用父类 nn.Module 的构造函数。作用是完成基类的初始化，包括建立参数容器、子模块字典、缓冲区结构等内部机制。如果不调用它，虽然类语法上可以运行，但模块中的参数不会被正确注册，model.parameters() 可能为空，优化器无法更新参数，模型状态也无法正确保存。简言之，它确保当前类真正成为一个“完整初始化的 PyTorch 模块”。

In [47]:
import torch
import math 
class multiheadattention(torch.nn.Module):
    def __init__(self, d, h):
        super().__init__()
        self.d = d
        self.h = h
        self.k = d // h

        self.wqkv = torch.nn.Linear(d, d*3)
        self.wo = torch.nn.Linear(d, d)
    
    def forward(self, x, mask = None):
        B, L, D = x.shape
        qkv = self.wqkv(x)
        q, k, v = torch.chunk(qkv, 3, -1)
        # 可以用reshap代替transpose
        q = q.view(B, L, self.h, self.k).transpose(1, 2)
        k = k.view(B, L, self.h, self.k).transpose(1, 2)
        v = v.view(B, L, self.h, self.k).transpose(1, 2)

        attention_score =torch.matmul(q, k.transpose(-1,-2)) / math.sqrt(self.k)
        if mask is not None:
            attention_score = attention_score.masked_fill(mask=mask, value=-1e9)
        attention_weight = torch.softmax(attention_score, dim=-1)
        context = torch.matmul(attention_weight, v).transpose(1, 2).contiguous().view(B, L, D)
        output = self.wo(context)
        return output, attention_weight
    
# --- 测试代码 ---
batch_size = 5
max_seq_len = 10
d_model = 64
head = 4

x = torch.randn(batch_size, max_seq_len, d_model)

attention_model = multiheadattention(d_model, head)
output, attention = attention_model(x) 

print("代码运行成功！")
print("输出张量的形状:", output.shape)
print("注意力权重的形状:", attention.shape)

代码运行成功！
输出张量的形状: torch.Size([5, 10, 64])
注意力权重的形状: torch.Size([5, 4, 10, 10])


### MQA

多查询注意力：所有查询头共用单一的键值头

与原 MultiHeadAttention 的差异：
- Q: 维持与 MHA 相同，shape -> [B, h, L, d_k]
- K/V: 仅产生 1 组共享头，shape -> [B, 1, L, d_k]
这样在推理时 KV cache 只需缓存 1 份（而非 h 份）。

In [37]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiQueryAttention(nn.Module):

    def __init__(self, d_model, num_head):
        super(MultiQueryAttention, self).__init__()
        assert d_model % num_head == 0, "d_model 必须能被 num_head 整除"
        self.d_model = d_model
        self.num_head = num_head
        self.d_k = d_model // num_head

        # 与原实现的区别：
        #   - Q 仍然映射到 d_model（然后 reshape 成 h 个头）
        #   - K/V 只映射到 d_k（单头维度），且各 1 份
        self.wq = nn.Linear(d_model, d_model)      # 生成多头的 Q
        self.wkv = nn.Linear(d_model, 2 * self.d_k)  # 生成共享的 K、V（仅 1 头的维度）
        self.wo = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        B, L, _ = x.shape

        # 1) 计算 Q、K、V
        q = self.wq(x)                       # [B, L, d_model]
        kv = self.wkv(x)                     # [B, L, 2*d_k]
        k, v = torch.chunk(kv, 2, dim=-1)    # [B, L, d_k], [B, L, d_k]

        # 2) 形状整理
        # Q: [B, L, h, d_k] -> [B, h, L, d_k]
        q = q.view(B, L, self.num_head, self.d_k).transpose(1, 2)  # [B, h, L, d_k]

        # 共享 K/V：加一个“伪 head 维”=1，方便广播到 h
        # K/V: [B, L, d_k] -> [B, 1, L, d_k]，unsqueeze(dim) = 在 dim 这个位置插入一个长度为1的轴
        k = k.unsqueeze(1)  # [B, 1, L, d_k]
        v = v.unsqueeze(1)  # [B, 1, L, d_k]

        # 3) 注意力分数：Q 与共享 K
        # scores: [B, h, L, L]，这里利用了 K 在 head 维度上的广播
        scores = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.d_k)  # [B, h, L, L]

        if mask is not None:
            # 要求 mask 能广播到 [B, h, L, L]
            # 例如 mask 形状可为 [B, 1, 1, L]（causal/ padding），或 [B, 1, L, L]
            scores = scores.masked_fill(mask, -1e9)

        attn = torch.softmax(scores, dim=-1)         # [B, h, L, L]

        # 4) 加权求和：与共享 V 相乘（同样通过广播）
        context = torch.matmul(attn, v)              # [B, h, L, d_k]

        # 5) 还原回 [B, L, d_model] 并输出
        context = context.transpose(1, 2).contiguous()         # [B, L, h, d_k]
        context = context.view(B, L, self.d_model)             # [B, L, d_model]
        output = self.wo(context)                               # [B, L, d_model]

        return output, attn


# --- 简单测试（与原 MHA 测试保持一致） ---
if __name__ == "__main__":
    batch_size = 2
    d_model = 10
    head = 2
    max_seq_len = 5
    x = torch.randn(batch_size, max_seq_len, d_model)

    attention_model = MultiQueryAttention(d_model, head)
    output, attention = attention_model(x)

    print("代码运行成功！（MQA）")
    print("输出张量的形状:", output.shape)     # 期望: [B, L, d_model]
    print("注意力权重的形状:", attention.shape)  # 期望: [B, h, L, L]


代码运行成功！（MQA）
输出张量的形状: torch.Size([2, 5, 10])
注意力权重的形状: torch.Size([2, 2, 5, 5])


### GQA

分组查询注意力：采用折中策略，将h个查询头分为g组

GQA: 把 h 个 Query 头分成 g 组；组内共享 1 套 K/V
- num_head = h
- num_kv_head = g (1 < g <= h, 且 h % g == 0)
- 
形状约定：
    Q: [B, h, L, d_k]
    K,V(分组): [B, g, L, d_k]

计算时把 Q reshape 成 [B, g, h_per_group, L, d_k]，

与同组的 K,V 做注意力；最后再还原回 [B, h, L, d_k]。

In [48]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class GroupedQueryAttention(nn.Module):

    def __init__(self, d_model, num_head, num_kv_head):
        super(GroupedQueryAttention, self).__init__()
        assert d_model % num_head == 0, "d_model 必须能被 num_head 整除"
        assert 1 <= num_kv_head <= num_head, "num_kv_head 必须在 [1, num_head] 范围内"
        assert num_head % num_kv_head == 0, "num_head 必须能被 num_kv_head 整除（每组等量分配）"

        self.d_model = d_model
        self.num_head = num_head            # h
        self.num_kv_head = num_kv_head      # g
        self.d_k = d_model // num_head
        self.h_per_group = self.num_head // self.num_kv_head  # h/g

        # Q 仍映射到 d_model（随后 reshape 为 h 个头）
        self.wq = nn.Linear(d_model, d_model)

        # K/V 只映射到 g * d_k（随后 reshape 为 g 个“KV 头”）
        self.wkv = nn.Linear(d_model, 2 * self.num_kv_head * self.d_k)

        self.wo = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        B, L, _ = x.shape

        # 1) 投影
        # Q: [B, L, d_model]
        # KV: [B, L, 2 * g * d_k] -> split -> [B, L, g * d_k] 各自
        q = self.wq(x)
        kv = self.wkv(x)
        k, v = torch.chunk(kv, 2, dim=-1)

        # 2) 形状整理
        # Q -> [B, h, L, d_k]
        q = q.view(B, L, self.num_head, self.d_k).transpose(1, 2)

        # K,V -> [B, g, L, d_k]
        k = k.view(B, L, self.num_kv_head, self.d_k).transpose(1, 2)
        v = v.view(B, L, self.num_kv_head, self.d_k).transpose(1, 2)

        # 把 Q 分组： [B, h, L, d_k] -> [B, g, h_per_group, L, d_k]
        qg = q.view(B, self.num_kv_head, self.h_per_group, L, self.d_k)

        # 为了与组内 K,V 做 batched matmul，给 K,V 加一个组内头维度=1，方便广播
        # Kg, Vg: [B, g, 1, L, d_k]
        Kg = k.unsqueeze(2)
        Vg = v.unsqueeze(2)

        # 3) 组内注意力分数： [B, g, h_per_group, L, L]
        # 等价于：scores_g[b,g,hg] = qg[b,g,hg] @ Kg[b,g,0]^T / sqrt(d_k)
        scores_g = torch.matmul(qg, Kg.transpose(-1, -2)) / math.sqrt(self.d_k)

        if mask is not None:
            # 要求 mask 能广播到 [B, 1 或 g, 1 或 h_per_group, L, L] 或最终 [B, h, L, L]
            # 最常见做法：提供 [B, 1, 1, L, L]（或 [B, 1, 1, 1, L] 的causal/pad组合）
            scores_g = scores_g.masked_fill(mask, -1e9)

        attn_g = torch.softmax(scores_g, dim=-1)               # [B, g, h_per_group, L, L]

        # 4) 组内加权求和：context_g: [B, g, h_per_group, L, d_k]
        context_g = torch.matmul(attn_g, Vg)

        # 5) 还原回所有头：先合并 g 与 h_per_group -> h
        context = context_g.reshape(B, self.num_head, L, self.d_k).transpose(1, 2).reshape(B, L, self.d_model)
        # context = context_g.reshape(B, self.num_head, L, self.d_k)  # [B, h, L, d_k]
        # context = context.transpose(1, 2).contiguous()              # [B, L, h, d_k]
        # context = context.view(B, L, self.d_model)                  # [B, L, d_model]
        output = self.wo(context)

        # 同样给出注意力权重（按头展平回 [B, h, L, L]，便于对齐可视化）
        attn = attn_g.reshape(B, self.num_head, L, L)

        return output, attn


# --- 简单测试（与原 MHA 测试风格一致） ---
if __name__ == "__main__":
    batch_size = 2
    d_model = 12
    num_head = 6     # h
    num_kv_head = 3  # g（每组 2 个 Q 头）
    max_seq_len = 5

    x = torch.randn(batch_size, max_seq_len, d_model)
    gqa = GroupedQueryAttention(d_model, num_head, num_kv_head)
    out, attn = gqa(x)

    print("代码运行成功！（GQA）")
    print("输出张量形状:", out.shape)      # 期望: [B, L, d_model]
    print("注意力形状  :", attn.shape)     # 期望: [B, h, L, L]


代码运行成功！（GQA）
输出张量形状: torch.Size([2, 5, 12])
注意力形状  : torch.Size([2, 6, 5, 5])


练习

In [51]:
import torch
import torch.nn as nn
import math

class GQA(nn.Module):
    def __init__(self, dim, q_head, kv_head):
        super().__init__()
        self.d = dim
        self.q_head = q_head
        self.kv_head = kv_head
        self.d_k = dim // q_head
        self.h_per_kv = q_head // kv_head

        self.wq = nn.Linear(dim, dim)
        self.wkv = nn.Linear(dim, 2*kv_head*self.d_k)
        self.wo = nn.Linear(dim, dim)

    def forward(self, x, mask = None):
        B, L, D = x.shape
        q = self.wq(x)
        kv = self.wkv(x)
        k, v = torch.chunk(kv, 2, -1)
        q = q.view(B, L, self.q_head, self.d_k).transpose(1, 2)
        k = k.view(B, L, self.kv_head, self.d_k).transpose(1, 2)
        v = v.view(B, L, self.kv_head, self.d_k).transpose(1, 2)
        qg = q.view(B, self.kv_head, self.h_per_kv, L, self.d_k)
        kg = k.unsqueeze(2)
        vg = v.unsqueeze(2)
        attention_scores = torch.matmul(qg, kg.transpose(-1, -2)) / math.sqrt(self.d_k)
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask, -1e9)
        attention_weight = nn.functional.softmax(attention_scores, dim = -1)

        context = torch.matmul(attention_weight, vg).reshape(B, self.q_head, L, self.d_k).transpose(1, 2).reshape(B, L, self.d)
        output = self.wo(context)
        return output, attention_weight

B = 10
L = 25
D = 64
q_head = 8
kv_head = 4
x = torch.randn(B, L, D)
GQA_model = GQA(D, q_head, kv_head)
output, attn = GQA_model(x)
print(output.shape)
print(attn.shape)


torch.Size([10, 25, 64])
torch.Size([10, 4, 2, 25, 25])


## 2.AUC

### AUC
基于排序（Rank / Mann–Whitney U）的 AUC 计算方法。

输入：
- labels: List[int] 或 1D numpy array
    样本真实标签，取值为 {0, 1}
- scores: List[float] 或 1D numpy array
    模型预测分数，分数越大表示越可能为正样本

输出：
- auc: float
    AUC 值，取值范围 [0, 1]

核心思想：对标签进行排序
1. 按预测分数从小到大排序
2. 扫描排序后的样本序列
3. 每遇到一个正样本，统计其前面已有多少负样本
    这些负样本都被该正样本“正确地排在后面”

In [None]:
import numpy as np
 
def auc_rank(labels, scores):
 
    # 转为 numpy array，便于排序和向量化操作
    labels = np.asarray(labels)
    scores = np.asarray(scores)
    print(f"labels:{labels}")
    print(f"scores:{scores}")

 
    # 获取按照 score 从小到大排序后的索引
    order = np.argsort(scores)
    print(f"order:{order}")
 
    # 按排序后的顺序重排标签
    labels_sorted = labels[order]
    print(f"labels_sorted:{labels_sorted}")
 
 
    # 正样本数量 |P|
    n_pos = np.sum(labels_sorted == 1)
    print(f"n_pos:{n_pos}")
 
    # 负样本数量 |N|
    n_neg = np.sum(labels_sorted == 0)
    print(f"n_neg:{n_neg}")
    # 已扫描到的负样本数量（前缀负样本计数）
    neg_count = 0
 
    # 排序正确的正负样本对数量
    correct = 0.0
 
    # 从低分到高分扫描
    for l in labels_sorted:
        if l == 1:
            # 当前是正样本：
            # 它前面的所有负样本都满足 score_neg < score_pos
            correct += neg_count
        else:
            # 当前是负样本，增加负样本计数
            neg_count += 1
 
    # AUC = 排序正确的正负样本对 / 总正负样本对
    return correct / (n_pos * n_neg)

label = [0, 1, 0, 0, 1, 0, 0, 1]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.8]
auc  = auc_rank(label, q)
print(f"auc:{auc}")

labels:[0 1 0 0 1 0 0 1]
scores:[0.1 0.9 0.2 0.8 1.  0.2 0.3 0.8]
order:[0 2 5 6 3 7 1 4]
labels_sorted:[0 0 0 0 0 1 1 1]
n_pos:3
n_neg:5
auc:1.0


练习

In [2]:
import numpy as np
def auc_rank(labels, scores):
    labels = np.array(labels)
    scores = np.array(scores)
    order = np.argsort(scores)
    labels_ordered = labels[order]
    pos_sample = np.sum(labels_ordered == 1)
    neg_sample = np.sum(labels_ordered == 0)
    negative = 0
    positive = 0
    for i in labels_ordered:
        if i == 0:
            negative += 1
        else:
            positive += negative
    auc = positive / (pos_sample * neg_sample)
    return auc
label = [0, 1, 0, 0, 1, 0, 0, 1]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.8]
auc  = auc_rank(label, q)
print(f"auc:{auc}")

auc:1.0


### GAUC
基于排序的 GAUC 计算方法(Group AUC)。

GAUC 通过在每个用户内部分别计算 AUC,然后进行加权平均,
从而消除用户间基线差异的影响,更准确地评估模型的用户内排序能力。

输入:
- user_ids: List[str] 或 1D numpy array
    每个样本对应的用户标识
- labels: List[int] 或 1D numpy array  
    样本真实标签,取值为 {0, 1}
- scores: List[float] 或 1D numpy array
    模型预测分数,分数越大表示越可能为正样本
- weight_type: str, 默认 'impression'
    权重类型,可选值:
    - 'impression': 按用户曝光次数加权(工业界常用)
    - 'uniform': 等权重,每个用户权重为 1
    
输出:
- gauc: float
    加权后的 GAUC 值,取值范围 [0, 1]
- user_auc_dict: dict
    每个用户的 AUC 值字典,用于分析不同用户的排序质量
    
核心思想:
1. 将样本按 user_id 分组
2. 在每个用户内部,使用 Rank-based 方法计算 AUC
3. 根据指定的权重类型对所有用户的 AUC 进行加权平均

In [40]:
import numpy as np
from collections import defaultdict
 
def gauc_rank(user_ids, labels, scores, weight_type='impression'):
    
    # 转为 numpy array
    user_ids = np.asarray(user_ids)
    labels = np.asarray(labels)
    scores = np.asarray(scores)
    
    # 按用户分组:构建 user_id -> 样本索引列表 的映射
    user_sample_dict = defaultdict(list)
    for idx, user_id in enumerate(user_ids):
        user_sample_dict[user_id].append(idx)
    
    # 存储每个用户的 AUC 和权重
    user_auc_dict = {}
    total_weighted_auc = 0.0
    total_weight = 0.0
    
    # 遍历每个用户,计算其 AUC
    for user_id, sample_indices in user_sample_dict.items():
        # 提取该用户的所有样本,user_id=1,sample_indices= [0, 1, 2, 3]
        user_labels = labels[sample_indices]
        user_scores = scores[sample_indices]
        
        # 计算该用户内的正负样本数
        n_pos = np.sum(user_labels == 1)
        n_neg = np.sum(user_labels == 0)
        
        # 如果该用户只有正样本或只有负样本,无法计算 AUC
        # 跳过该用户(也可以选择赋值为 0.5)
        if n_pos == 0 or n_neg == 0:
            continue
            
        # 使用 Rank-based 方法计算该用户的 AUC
        # 按 score 从小到大排序
        order = np.argsort(user_scores)
        sorted_labels = user_labels[order]
        
        # 统计排序正确的正负样本对数
        neg_count = 0
        correct_pairs = 0.0
        
        for label in sorted_labels:
            if label == 1:
                # 正样本:其前面的所有负样本都被正确排序
                correct_pairs += neg_count
            else:
                # 负样本:计数增加
                neg_count += 1
        
        # 该用户的 AUC
        user_auc = correct_pairs / (n_pos * n_neg)
        user_auc_dict[user_id] = user_auc
        
        # 确定该用户的权重
        if weight_type == 'impression':
            # 按曝光次数加权
            weight = len(sample_indices)
        elif weight_type == 'uniform':
            # 等权重
            weight = 1.0
        else:
            raise ValueError(f"Unknown weight_type: {weight_type}")
        
        # 累加加权 AUC
        total_weighted_auc += user_auc * weight
        total_weight += weight
    
    # 计算 GAUC
    if total_weight == 0:
        return 0.5, user_auc_dict
    
    gauc = total_weighted_auc / total_weight
    print(f"gauc:{gauc}")
    print(f"user_auc_dict:{user_auc_dict}")
    return gauc, user_auc_dict

label = [0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.9, 0.7, 0.9, 0.7]
user_id = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3]
_, _ = gauc_rank(user_id, label, q, weight_type='impression')


gauc:0.8636363636363636
user_auc_dict:{1: 1.0, 2: 1.0, 3: 0.5}


练习

In [None]:
import numpy as np
from collections import defaultdict
 
def gauc_rank(user_ids, labels, scores, weight_type='impression'):
    
    user_ids = np.asarray(user_ids)
    labels = np.asarray(labels)
    scores = np.asarray(scores)
    user_sample_dict = defaultdict(list)
    for idx, user_id in enumerate(user_ids):
        user_sample_dict[user_id].append(idx)
    user_auc_dict = {}
    total_weighted_auc = 0.0
    total_weight = 0.0
    for user_id, sample_indices in user_sample_dict.items():
        user_labels = labels[sample_indices]
        user_scores = scores[sample_indices]
        n_pos = np.sum(user_labels == 1)
        n_neg = np.sum(user_labels == 0)
        if n_pos == 0 or n_neg == 0:
            continue
        order = np.argsort(user_scores)
        sorted_labels = user_labels[order]
        neg_count = 0
        correct_pairs = 0.0
        
        for label in sorted_labels:
            if label == 1:
                correct_pairs += neg_count
            else:
                neg_count += 1
        user_auc = correct_pairs / (n_pos * n_neg)
        user_auc_dict[user_id] = user_auc
        if weight_type == 'impression':
            weight = len(sample_indices)
        elif weight_type == 'uniform':
            weight = 1.0
        else:
            raise ValueError(f"Unknown weight_type: {weight_type}")
        total_weighted_auc += user_auc * weight
        total_weight += weight
    if total_weight == 0:
        return 0.5, user_auc_dict
    gauc = total_weighted_auc / total_weight
    print(f"gauc:{gauc}")
    print(f"user_auc_dict:{user_auc_dict}")
    return gauc, user_auc_dict

label = [0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 0]
q = [0.1, 0.9, 0.2, 0.8, 1, 0.2, 0.3, 0.9, 0.7, 0.9, 0.7]
user_id = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3]
_, _ = gauc_rank(user_id, label, q, weight_type='impression')


## 3.损失函数

### BCE二类交叉熵实现

1\. 基础 BCE 公式

BCE 的标准形式是基于概率定义的。假设 $y$ 为真实标签 (0 或 1)，$p$ 为模型预测为 1 的概率（即 Sigmoid 的输出），则损失 $L$ 为：

$$L = -[y \cdot \log(p) + (1-y) \cdot \log(1-p)]$$

这个公式非常直观，但存在一个致命缺陷：当 $p$ 趋近于 0 或 1 时，$\\log(p)$ 或 $\\log(1-p)$ 会导致 `log(0)`，产生负无穷大的结果，这在计算上是灾难性的。

2\. 数值稳定的 BCE 公式

为了解决这个问题，我们不应该使用模型输出的概率 $p$，而应直接使用未经 Sigmoid 激活的原始输出——**Logits**，我们记为 $x$。

通过将 $p = \\text{sigmoid}(x)$ 代入原始公式并进行数学推导与化简，我们可以得到一个等价且高度数值稳定的形式：

$$L = \max(x, 0) - x \cdot y + \log(1 + e^{-|x|})$$

这个公式的核心优势在于，指数项中的 $-|x|$ 永远是非正数，这使得 $e^{-|x|}$ 的结果被限制在 $(0, 1]$ 区间内，从根本上避免了浮点数上溢的风险。它不依赖任何 `epsilon` 裁剪之类的技巧，是数值计算上的最优解。

In [16]:
import numpy as np
def BCE_logits_Loss(lables, y_predict):
    per_sample_loss = np.maximum(y_predict, 0) - y_predict * lables + np.log(1 + np.exp(-np.abs(y_predict)))
    return np.mean(per_sample_loss)

x = np.array([5,-4,5,-6])
y = np.array([1,0,1,0])

bce_loss = BCE_logits_Loss(y,x)

print(f"bce损失为{bce_loss}")

bce损失为0.008514077508444018


In [18]:
import numpy as np
def BCE_logits_Loss(lables, y_predict):
    p = 1 / (1 + np.exp(-y_predict))
    per_sample_loss = -(lables * np.log(p) + (1 - lables) * np.log(1 - p))
    return np.mean(per_sample_loss)

x = np.array([5,-4,5,-6])
y = np.array([1,0,1,0])

bce_loss = BCE_logits_Loss(y,x)

print(f"bce损失为{bce_loss}")

bce损失为0.008514077508444016


### MSE

In [20]:
import numpy as np
def MSE_Loss(lables, y_predict):
    per_sample_loss = (lables-y_predict)**2
    return np.mean(per_sample_loss)

x = np.array([1,2,3,4])
y = np.array([1,2,4,4])

mes_loss = MSE_Loss(y,x)

print(f"bce损失为{mes_loss}")

bce损失为0.25


### focal_Loss


当面临类别极度不平衡的数据时，标准的交叉熵损失会因大量易分样本的主导而失效。Focal Loss 通过引入动态调制因子，强制模型聚焦于训练过程中的“硬核”样本，是解决此类问题的关键技术。

核心公式回顾

Focal Loss 在标准交叉熵的基础上，增加了权重因子 $\\alpha$ 和调制因子 $(1-p\_t)^\gamma$。其统一形式简洁而强大：

$$L_{\text{focal}} = -\alpha_t (1-p_t)^\gamma \log(p_t)$$

其中：

  * $p\_t$ 定义为：当真实标签 $y=1$ 时，$p\_t=p$；当 $y=0$ 时，$p\_t=1-p$。这里的 $p$ 是模型预测为正类的概率。
  * $\\alpha\_t$ 是类别平衡参数，$\\gamma$ 是聚焦参数，用于抑制易分样本的损失贡献。

类别平衡参数 α 用于调节正负类别的整体权重比例：当正样本稀少时应增大 α（如 0.5→0.75 甚至更高），提高正样本在总损失中的占比；若负样本更重要或误报代价更高，则减小 α。本质是对“类别频率不平衡”进行线性重加权。聚焦参数 γ 用于控制对易分样本的抑制强度：γ=0 时退化为普通交叉熵；γ 越大（常用 1~3，默认 2），对高置信度样本的损失衰减越强，模型越专注于难样本；但过大可能导致训练不稳定或收敛变慢。实践中通常先根据类别比例确定 α，再在 γ∈[1,3] 内调节，以验证集表现为准微调。

该实现接收模型输出的概率 (经过 Sigmoid 激活后) 作为输入。

In [24]:
import numpy as np

def focal_loss_prob(y_pred_prob, y_true, alpha=0.25, gamma=2.0, reduction="mean", eps=1e-9):
    p = np.clip(y_pred_prob, eps, 1 - eps)

    pt = y_true * p + (1 - y_true) * (1 - p)
    alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)

    loss = -alpha_t * (1 - pt) ** gamma * np.log(pt)

    if reduction == "mean":
        return np.mean(loss)
    elif reduction == "sum":
        return np.sum(loss)
    else:
        return loss

x = np.array([0.9,0.1,0.9,0.7])
y = np.array([1,0,1,0])
focal_loss损失为 = focal_loss_prob(x,y)

print(f"focal_loss损失为{focal_loss损失为}")

focal_loss损失为0.11094425300887605


## 手写MLP

In [None]:
import torch
import torch.nn as nn

class MLP(nn.Module):
    """
    多层感知机（MLP）的完整实现
    Args:
        input_dim (int): 输入特征维度
        hidden_dims (list): 隐藏层维度列表，每个元素代表对应隐藏层的神经元数量
        output_dim (int): 输出维度
        activation (str): 激活函数类型，支持 'relu', 'tanh', 'sigmoid'
        dropout_rate (float): Dropout概率，用于正则化
        batch_norm (bool): 是否使用批归一化
    """
    
    def __init__(self, input_dim, hidden_dims, output_dim, activation='relu', dropout_rate=0.0, batch_norm=False):
        super(MLP, self).__init__()
        
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        self.activation_name = activation
        self.dropout_rate = dropout_rate
        self.batch_norm = batch_norm
        
        # 构建网络层
        layers = []
        layer_dims = [input_dim] + hidden_dims + [output_dim]
        
        # 逐层构建网络
        for i in range(len(layer_dims) - 1):
            # 线性变换层
            linear_layer = nn.Linear(layer_dims[i], layer_dims[i + 1])
            layers.append(linear_layer)
            
            # 如果不是最后一层，添加激活函数、批归一化和dropout
            if i < len(layer_dims) - 2:
                # 批归一化
                if self.batch_norm:
                    layers.append(nn.LayerNorm(layer_dims[i + 1]))
                
                # 激活函数
                if activation == 'relu':
                    layers.append(nn.ReLU())
                elif activation == 'tanh':
                    layers.append(nn.Tanh())
                elif activation == 'sigmoid':
                    layers.append(nn.Sigmoid())
                else:
                    raise ValueError(f"不支持的激活函数: {activation}")
                
                # Dropout正则化
                if dropout_rate > 0:
                    layers.append(nn.Dropout(dropout_rate))
        
        # 将所有层组合成Sequential模块
        self.network = nn.Sequential(*layers)
        
        # 初始化网络参数
        self._initialize_weights()
    
    def _initialize_weights(self):
        """
        网络参数初始化
        使用Xavier/Glorot初始化方法，这对于深层网络的训练非常重要
        """
        for module in self.modules():
            if isinstance(module, nn.Linear):
                # Xavier均匀分布初始化
                nn.init.xavier_uniform_(module.weight)
                # 偏置项初始化为0
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
    
    def forward(self, x):
        return self.network(x)

In [None]:
import torch
import torch.nn as nn
# 常见顺序是：Linear -> LayerNorm -> Activation -> Dropout
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim, activation='relu', dropout=0.1, layernorm=True):
        super().__init__()

        layer_dims = [input_dim] + hidden_dims + [output_dim]
        layers = []

        for i in range(len(layer_dims) - 1):
            layers.append(nn.Linear(layer_dims[i], layer_dims[i+1]))

            if i < len(layer_dims) - 2:  # 最后一层不要激活/Dropout/Norm
                if layernorm:
                    layers.append(nn.LayerNorm(layer_dims[i+1]))      # nn.LayerNorm 必须指定 特征维度大小（即输入最后一维的大小）

                if activation == 'relu':
                    layers.append(nn.ReLU())
                elif activation == 'sigmoid':
                    layers.append(nn.Sigmoid())
                elif activation == 'tanh':
                    layers.append(nn.Tanh())

                # Dropout接收的输入是经过ReLU激活后的向量，然后对这些激活值进行随机屏蔽，直接影响的是传递给下一层的信息
                if dropout > 0:
                    layers.append(nn.Dropout(dropout))

        # *被称为"解包操作符"（unpacking operator），它的作用是将一个可迭代对象（如列表、元组）中的元素逐个取出，作为独立的参数传递给函数。
        self.network = nn.Sequential(*layers)
        self.initialize_weight()

    def initialize_weight(self):
        for module in self.modules():       # 调用self.modules()会依次返回整个MLP模型、第一个Linear层、LayerNorm层、ReLU层、第二个Linear层等等
            if isinstance(module, nn.Linear):       # Linear层有权重矩阵和偏置向量需要初始化
                nn.init.xavier_uniform_(module.weight)  # PyTorch的设计体系中，函数名末尾的下划线表示这是一个"就地操作"（in-place operation），意思是它会直接修改传入的张量，而不是返回一个新的张量。
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)

    def forward(self, x):
        return self.network(x)

mlp = MLP(32, [128, 64, 32], 1)
x = torch.randn(1, 2, 32)
print(x)
y = mlp(x)
print(y)


## 混合精度训练AMP

In [46]:
import torch 
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 1.基础配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.bfloat16

# 2.数据准备
# transforms.ToTensor:()第一，数据类型转换(numpy或者image数据转为tensor)；第二，数值归一化。将0-255范围的像素值缩放到0-1范围；第三，维度重排[通道数，高度，宽度]
# Compose将多个transform串联成一个处理链，数据依次通过每个环节。
transform = transforms.Compose([transforms.ToTensor()]) 
train_dataset = datasets.MNIST(root = r'D:\科研\搜广推\04_手撕算法题\大模型_推荐算法_手撕题\data', train=True, download=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, pin_memory=True, num_workers=4)

# 3.模型
class simplemodel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),  
            nn.Linear(512, 256),
            nn.ReLU(),  
            nn.Linear(256, 10)
        )
    
    def forward(self, x):
        return self.net(x)

# 损失函数、优化器等设置
"""
需要移动到 GPU 的是参与大规模计算的张量数据，包括模型参数和输入数据
损失函数和优化器是操作这些张量的“工具”，它们会自动在张量所在的设备上执行操作，所以不需要手动移动
"""
model = simplemodel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 4.训练设置
"""
训练模式
Dropout 层会按照设定的概率 p 随机地将一部分神经元的输出置为零。这是一种有效的正则化手段，可以防止模型过拟合
BN 层会计算当前批次数据的均值和方差，并用它们来归一化数据。同时，它还会维护一个全局的均值和方差，这个全局值是根据训练过程中所有批次的统计数据通过滑动平均更新的

推理模式
Dropout 层会“关闭”，不再随机丢弃任何神经元，而是让所有神经元的输出都通过。这样可以保证在预测时模型的输出是确定和稳定的
BN 层会“冻结”，不再计算当前批次的均值和方差，而是直接使用在整个训练集上学习到的全局均值和方差来进行归一化。这保证了在推理时，即使输入只有一个样本，也能得到一致和稳定的结果
"""

model.train()
def main():
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
        optimizer.zero_grad()
        with torch.autocast("cuda", dtype=dtype):
            outputs = model(inputs)
            loss = criterion(outputs, targets)
        
        loss.backward()     # PyTorch 会计算出模型中每个参数的梯度,默认情况下，这些新计算出的梯度会加到该参数已有的梯度上（如果已有梯度存在的话），而不是覆盖它们
        optimizer.step()    # 更新梯度
        if batch_idx % 10 == 0:
            print(f"epoch[{1}], step[{batch_idx}], loss:{loss.item()}")

if __name__ == '__main__':
    main()


epoch[1], step[0], loss:2.30487060546875
epoch[1], step[10], loss:2.2474365234375
epoch[1], step[20], loss:2.1429443359375
epoch[1], step[30], loss:1.953338623046875
epoch[1], step[40], loss:1.589691162109375
epoch[1], step[50], loss:1.1607837677001953
epoch[1], step[60], loss:0.8270969390869141
epoch[1], step[70], loss:0.7493977546691895
epoch[1], step[80], loss:0.6877131462097168
epoch[1], step[90], loss:0.5060857534408569
epoch[1], step[100], loss:0.49372947216033936
epoch[1], step[110], loss:0.503061830997467
epoch[1], step[120], loss:0.4905708432197571
epoch[1], step[130], loss:0.32233864068984985
epoch[1], step[140], loss:0.44921380281448364
epoch[1], step[150], loss:0.46268683671951294
epoch[1], step[160], loss:0.36531856656074524
epoch[1], step[170], loss:0.29948681592941284
epoch[1], step[180], loss:0.33051609992980957
epoch[1], step[190], loss:0.34873807430267334
epoch[1], step[200], loss:0.36760321259498596
epoch[1], step[210], loss:0.31475210189819336
epoch[1], step[220], l