In [None]:
# Transformer编码器层核心代码（PyTorch示例）
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048):
        super().__init__()
        self.self_attn = MultiheadAttention(d_model, nhead)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, src, src_mask=None):
        # 自注意力计算
        src2 = self.self_attn(src, src, src, attn_mask=src_mask)[0]
        src = src + self.norm1(src2)  # 残差连接+归一化
        # 前馈网络
        src2 = self.linear2(F.relu(self.linear1(src)))
        src = src + self.norm2(src2)
        return 
def scaled_dot_product_attention(q, k, v, mask=None):
    matmul_qk = torch.matmul(q, k.transpose(-2, -1))
    dk = k.size(-1)
    scaled_attention_logits = matmul_qk / math.sqrt(dk)
    if mask: scaled_attention_logits += mask * -1e9
    attention_weights = F.softmax(scaled_attention_logits, dim=-1)
    return torch.matmul(attention_weights, v)

In [None]:
#self_attention
import torch
import torch.nn as nn
import torch.nn.functional as F

class SelfAttention(nn.Module):
    def __init__(self, embed_size, heads=1):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size #输入向量的维度
        self.heads = heads
        self.head_dim = embed_size // heads

        self.to_qkv = nn.Linear(embed_size, 3 * embed_size)

    def forward(self, x):
        batch_size, seq_len, embed_size = x.shape
        #生成Q,K,V 通过一次线性变换分割成三部分
        qkv = self.to_qkv(x)
        q, k, v = torch.chunk(qkv, 3, dim=-1)  #分割为Q/K/V

        #计算缩放点积注意力
        scale = (self.head_dim) ** -0.5  #缩放因子根号d， 防止点积过大
        attention_scores =  torch.matmul(q, k.transpose(-2, -1)) * scale

        #应用softmax得到注意力权重
        attention_weights = F.softmax(attention_scores, dim=-1)

        out = torch.matmul(attention_weights, v)

        return out, attention_weights



In [None]:
class multiattention(nn.Module):
    def __init__(self, embed_size,head=8):
        self.head = head
        self.embed_size = embed_size
        self.head_dim = embed_size // head

        self.to_qkv = nn.Linear(embed_size, 3*embed_size)
        self.to_out = nn.Linear(embed_size, embed_size)
    def forward(self,x):
        batch_size, seq_len, embed_size = x.shape()
        qkv = self.to_qkv(x).view(batch_size, seq_len, 3, self.head, self.head_dim)
        qkv = qkv.permute(2,0,3,1,4)
        q,k,v = qkv[0], qkv[1], qkv[2]

        attention_scores = torch.matmul(q,k.transpose(-2,-1)) * self.head_dim ** -0.5

        attention_weights = F.softmax(attention_scores, dim=-1)

        out = torch.matmul(attention_weights, v)

        out = out.permute(0,2,1,3).contiguous.view(batch_size, seq_len, embed_size)

        out = self.to_out(out)

    

SyntaxError: incomplete input (2631102471.py, line 10)

In [None]:
#MultiAttention
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads=8):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        assert embed_size % heads == 0, #embed_size 必须能被 heads 整除
        self.heads_dim = embed_size // heads

        self.to_qkv = nn.Linear(embed_size, 3 * embed_size)
        self.to_out = nn.Linear(embed_size, embed_size)
    def forward(self, x):
        batch_size, seq_len, embed_size = x.shape

        qkv = self.to_qkv(x).view(batch_size, seq_len, 3, self.heads, self.heads_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)  # 调整维度顺序为 (3, batch_size, heads, seq_len, heads_dim)  #方便之后K矩阵的转置
        q, k, v = qkv[0], qkv[1], qkv[2]  # 分离为查询 q、键 k、值 v，每个形状为 (batch_size, heads, seq_len, heads_dim)
        #q：batch_size, heads, seq_len, heads_dim
        scale = self.heads_dim ** -0.5  # 计算缩放因子，防止点积过大，等于 1/sqrt(heads_dim)
        attention_scores = torch.matmul(q,k.transpose(-2, -1)) * scale
        # 计算注意力分数：q 和 k 的转置点积后乘以缩放因子
        # k.transpose(-2, -1) 将 k 的最后两个维度 (seq_len, heads_dim) 转置为 (heads_dim, seq_len)
        # 结果形状为 (batch_size, heads, seq_len, seq_len)

        attention_wights = F.softmax(attention_scores, dim=-1)  # 对注意力分数应用 softmax，得到权重
        # dim=-1 表示沿最后一个维度（序列长度）归一化，形状保持 (batch_size, heads, seq_len, seq_len)
        out = torch.matmul(attention_wights, v)
        # attention_weights 和 v 的矩阵乘法，结果形状为 (batch_size, heads, seq_len, heads_dim)
        print(out)
        #用于合并最后两个维度heads*heads_dim
        out = out.permute(0, 2, 1, 3).contiguous().view(batch_size, seq_len, embed_size)
        #permute后，batch_size, seq_len, heads, heads_dim
        # contiguous() 确保内存连续，view 重塑为 (batch_size, seq_len, embed_size)
        print(out)
        #batch_size, seq_len, embed_size
        out = self.to_out(out)
        print(out)
        return out   
    

class GQA(nn.Module):
    def __init__(self, num_head, groups, k_v_heads, embed_size):
        super().__init__()
        self.head_dim = embed_size // num_head
        self.num_head = num_head
        self.groups = groups
        self.k_v_heads = k_v_heads
        self.embed_size = embed_size

        self.q_proj = nn.Linear(embed_size, embed_size)
        self.k_proj = nn.Linear(embed_size, k_v_heads * self.head_dim)
        self.v_proj = nn.Linear(embed_size, k_v_heads * self.head_dim)
        self.out_proj = nn.Linear(embed_size, embed_size)
    def forward(self, x):
        batch_size, seq_len, head_dim = x.shape
        q = self.q_proj(x)
        k = self.k_proj(x)
        v = self.v_proj(x)

        q = q.view(batch_size, seq_len, self.groups, self.num_head, head_dim)
        q = q.permute(0,2,3,1,4)
        k = k.view(batch_size, seq_len, self.num_head, head_dim)
        k = k.permute(0,2,3,1)
        v = v.view(batch_size, seq_len, self.num_head, head_dim)
        v = v.permute(0,2,1,3)

        scale = self.head_dim ** -0.5
        score = torch.matmul(q,k) * scale

        weight = F.softmax(score, dim=-1)

        out = torch.matmul(weight, v)

        out = out.permute(0, 3, 1, 2, 4)
        
        out = out.contiguous().view(batch_size, seq_len, -1)


        return self.out_proj(out)



# 输入形状: (batch_size, seq_len, embed_size)
# BN统计量在 (batch_size, seq_len) 上计算，对每个 embed_size 通道独立归一化
bn = nn.BatchNorm1d(embed_size)
x_bn = bn(x)  # 输出形状仍为 (batch_size, seq_len, embed_size)

# 输入形状: (batch_size, seq_len, embed_size)
# LN统计量在 embed_size 上计算，每个样本的每个位置独立归一化
ln = nn.LayerNorm(embed_size)
x_ln = ln(x)  # 输出形状仍为 (batch_size, seq_len, embed_size)





SyntaxError: invalid syntax (3180739669.py, line 11)

In [None]:
class LR(nn.Module):
    def __init__(self, input_dim = 1):
        super(LR, self).__init__()
        self.linear = nn.Linear(input_dim, input_dim)
    def forward(self, x):
        return torch.sigmoid(self.linear(x))

In [None]:
import torch
import torch.nn as nn

class LogisticRegression:
    def __init__(self, input_dim, lr=0.01):
        # 手动初始化权重和偏置（对应线性层参数）
        self.W = torch.randn(input_dim, 1, requires_grad=False) * 0.01
        self.b = torch.zeros(1, requires_grad=False)
        self.lr = lr  # 学习率

    def forward(self, X):
        """前向传播：计算线性输出并应用Sigmoid"""
        linear = torch.mm(X, self.W) + self.b  # X.shape: (n_samples, input_dim)
        return torch.sigmoid(linear)  # 输出概率 P(y=1|x)

    def loss(self, y_pred, y_true):
        """计算二元交叉熵损失"""
        epsilon = 1e-8  # 防止log(0)
        return -torch.mean(y_true * torch.log(y_pred + epsilon) + 
                          (1 - y_true) * torch.log(1 - y_pred + epsilon))

    def manual_update(self, X, y):
        """手动计算梯度并更新参数（不依赖optimizer）"""
        # 前向传播
        y_pred = self.forward(X)
        loss = self.loss(y_pred, y)
        
        # 反向传播手动求导（链式法则）
        n_samples = X.size(0)
        
        # 计算损失对输出的梯度 dL/dy_pred
        dL_dy_pred = (y_pred - y) / n_samples  # shape: (n_samples, 1)
        
        # 计算梯度 dL/dW 和 dL/db
        dL_dW = torch.mm(X.t(), dL_dy_pred)    # X.T @ dL_dy_pred
        dL_db = torch.sum(dL_dy_pred, dim=0)   # 沿样本维度求和
        
        # 参数更新（梯度下降）
        self.W -= self.lr * dL_dW
        self.b -= self.lr * dL_db
        
        return loss.item()

    def predict(self, X, threshold=0.5):
        """预测类别（0或1）"""
        with torch.no_grad():  # 禁用梯度计算
            proba = self.forward(X)
            return (proba >= threshold).float()

In [None]:
def auc_count(self,y_true,y_prob):
    sorted_nums = torch.argsort(y_prob, descending=False)
    y_prob = y_prob[sorted_nums]
    y_true = y_true[sorted_nums]

    pos_num = (y_true == 1).sum().item()
    neg_num = (y_true == 0).sum().item()
    
    cnt_cum = 0
    same_prob = None
    same_prob_cnt = 0

    for i, prob in enumerate(y_prob, start=1):
        if prob != same_prob:
            if same_prob_cnt > 0:
                avg_prob = same_prob_cnt / same_prob_cnt
                cnt_cum += avg_prob * same_prob_cnt

                same_prob_cnt = 0
                prob_sum = 0
            same_prob = prob
        if y_true[i-1] == 1:
            same_prob_cnt += 1 
            prob_sum += prob
        
    if same_prob_cnt>0:
        avg_prob = prob_sum / same_prob_cnt
        cnt_cum += avg_prob * same_prob_cnt

    auc = (cnt_cum - pos_num*(pos_num+1)/2)/(pos_num * neg_num)

    return auc 

    

SyntaxError: incomplete input (2445662913.py, line 9)

pos_probs = y_prob[y_true == 1]
neg_probs = y_prob[y_true == 0]
comparison = pos_probs[:, None] > neg_probs[None, :]
auc = (comparison.sum() + 0.5 * (pos_probs[:, None] == neg_probs[None, :]).sum()) / (pos_num * neg_num)

In [None]:
#AUC计算代码 表示正样本预测概率大于负样本预测概率的概率，评估二分类模型对正负样本的排序能力，反映区分两类样本的效果

# 按预测概率升序排列，并同步调整真实标签顺序
def auc_count(self, y_prob, y_true):
    sorted_nums = torch.argsort(y_prob, descending=False)
    y_true_sorted = y_true[sorted_nums]
    y_prob_sorted = y_prob[sorted_nums]

    pos_num = (y_true == 1).sum().item()
    neg_num = (y_true == 0).sum().item()

    if pos_num == 0 or neg_num == 0:
        return 0.0
    # 计算每个正样本的累计排名（处理相同概率的情况）
    cum_rank = 0.0
    cur_prob = None
    same_prob_count = 0
    same_rank_sum = 0

    for idx, prob in enumerate(y_prob_sorted, start = 1):
        if prob != cur_prob:
            if same_prob_count > 0:
                avg_rank = same_rank_sum / same_prob_count
                cum_rank += avg_rank * same_prob_count
                
                same_prob_count = 0
                same_rank_sum = 0.0
            cur_prob = prob
        #只找正样本的排名
        if y_true_sorted[idx - 1] == 1:  
            same_prob_count += 1
            same_rank_sum += idx
    # 处理最后一组相同概率的情况
    if same_prob_count > 0:
        avg_rank = same_rank_sum/ same_prob_count
        cum_rank += avg_rank * same_prob_count
    # 应用AUC计算公式
    auc = (cum_rank - pos_num*(pos_num+1)/2)/(pos_num * neg_num)  
    return auc      


In [None]:
def kl(self, P, Q, epsilon = 1e-8):
    P += epsilon
    Q += epsilon

    P = F.softmax(P,dim=-1)
    Q = F.softmax(Q,dim=-1)

    logP = torch.log(P)
    logQ = torch.log(Q)

    KL_elements = P *(logP - logQ)
    P/Q - logp - logQ - 1
    KL = KL_elements.sum(dim=-1)

    return KL.mean()

In [None]:
def kl(P,Q,epsilon = 1e-8):
    P += epsilon
    Q += epsilon

    P = F.softmax(P,dim=-1)
    Q = F.softmax(Q, dim=-1)

    logP = torch.log(P)
    logQ = torch.log(Q)

    KL = P * (logP - logQ)
    KL = KL.sum(dim=-1)

    return KL.mean()


P: tensor([[0.0469, 0.0489, 0.2302, 0.0418, 0.0568, 0.0148, 0.0487, 0.2712, 0.2014,
         0.0392],
        [0.0436, 0.3497, 0.0228, 0.0051, 0.0119, 0.0270, 0.0196, 0.0320, 0.4057,
         0.0826],
        [0.1846, 0.1609, 0.1473, 0.0572, 0.2800, 0.0317, 0.0273, 0.0310, 0.0408,
         0.0393],
        [0.0768, 0.0925, 0.2772, 0.1169, 0.0447, 0.0567, 0.1478, 0.0679, 0.1017,
         0.0178]])
P1: tensor([[0.0469, 0.0489, 0.2302, 0.0418, 0.0568, 0.0148, 0.0487, 0.2712, 0.2014,
         0.0392],
        [0.0436, 0.3497, 0.0228, 0.0051, 0.0119, 0.0270, 0.0196, 0.0320, 0.4057,
         0.0826],
        [0.1846, 0.1609, 0.1473, 0.0572, 0.2800, 0.0317, 0.0273, 0.0310, 0.0408,
         0.0393],
        [0.0768, 0.0925, 0.2772, 0.1169, 0.0447, 0.0567, 0.1478, 0.0679, 0.1017,
         0.0178]])
tensor(1.0280)


In [None]:
#KL散度
import torch
import torch.nn as nn
import torch.nn.functional as F

def kl_divergence_manual(P, Q, epsilon=1e-8):
    """手动实现KL散度计算"""
    # 添加极小值避免log(0)导致数值不稳定
    P = P + epsilon
    Q = Q + epsilon
    
    # 归一化处理（若输入不是概率分布）
    # 沿最后一个维度求和并保持维度，将张量转化为概率分布
    P = P / P.sum(dim=-1, keepdim=True)
    Q = Q / Q.sum(dim=-1, keepdim=True)
    
    # 计算对数概率（KL散度核心操作）
    logP = torch.log(P)
    logQ = torch.log(Q)
    
    # 逐元素计算KL散度：P * (logP - logQ)
    kl_elements = P * (logP - logQ)
    
    # 沿最后一个维度求和（对每个样本的概率分布求和）
    kl = kl_elements.sum(dim=-1)
    
    # 返回批次平均值
    return kl.mean()

def kl_divergence_pytorch(P_logits, Q_logits):
    """使用PyTorch内置函数计算KL散度"""
    # 对Q的logits应用log_softmax得到对数概率（符合KLDivLoss输入要求）
    log_Q = F.log_softmax(Q_logits, dim=-1)
    
    # 对P的logits应用softmax得到概率分布（符合KLDivLoss目标要求）
    P = F.softmax(P_logits, dim=-1)
    
    # 初始化KL散度损失函数，reduction='batchmean'表示按批次均值返回
    kl_loss = nn.KLDivLoss(reduction='batchmean')
    
    # 计算KL散度（输入log_Q需在前，目标P在后）
    loss = kl_loss(log_Q, P)
    
    return loss

In [None]:
# BERT模型核心结构（简化版）
class BERT(nn.Module):
    def __init__(self, vocab_size, hidden_size, num_layers, num_heads):
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, hidden_size)
        self.segment_emb = nn.Embedding(2, hidden_size)
        self.pos_emb = nn.Parameter(torch.randn(1, 512, hidden_size))
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderLayer(hidden_size, num_heads) 
            for _ in range(num_layers)
        ])
        
    def forward(self, input_ids, segment_ids):
        # 嵌入融合
        token_emb = self.embeddings(input_ids)
        seg_emb = self.segment_emb(segment_ids)
        pos_emb = self.pos_emb[:, :input_ids.size(1), :]
        x = token_emb + seg_emb + pos_emb
        # 编码器堆叠
        for layer in self.encoder_layers:
            x = layer(x)
        return x

In [None]:
#MLM（Masked Language Model）：随机遮蔽15%的token（80%替换为[MASK]，10%随机替换，10%保留原词）
class MaskedLM(nn.Module):
    def __init__(self, hidden_size, vocab_size):
        super().__init__()
        self.dense = nn.Linear(hidden_size, hidden_size)
        self.layer_norm = nn.LayerNorm(hidden_size)
        self.decoder = nn.Linear(hidden_size, vocab_size)

    def forward(self, hidden_states, masked_positions):
        # 提取被遮蔽位置的隐藏状态
        batch_size, seq_len, dim = hidden_states.shape
        flat_positions = masked_positions.view(-1)
        selected = hidden_states.view(-1, dim)[flat_positions]
        # 解码预测
        x = F.gelu(self.dense(selected))
        x = self.layer_norm(x)
        logits = self.decoder(x)
        return logits

In [None]:
#交叉熵损失  -P(x)logQ(x) P为真实分布，Q为预测分布
import torch

def cross_entropy_loss(logits, labels):
    """
    logits: 模型原始输出 (未归一化), shape: (N, C)
    labels: 真实标签（类别索引）, shape: (N,)
    """
    # 数值稳定性处理: 减去最大值防止指数爆炸
    logits = logits - torch.max(logits, dim=1, keepdim=True)[0]
    
    # 计算LogSoftmax (等价于 log(softmax(logits)))
    exp_logits = torch.exp(logits)
    log_probs = logits - torch.log(torch.sum(exp_logits, dim=1, keepdim=True))
    
    # 提取真实类别对应的对数概率
    n_samples = logits.shape[0]
    true_class_logprobs = log_probs[range(n_samples), labels]
    
    # 计算损失（平均损失）
    loss = -torch.mean(true_class_logprobs)
    return loss

In [None]:
class Lora(nn.modules):
    def __init__(self, in_dim, out_dim, rank=8, alpha = 16):
        super().__init__
        self.rank = rank
        self.alpha = alpha

        self.A = nn.Parameter(torch.randn(in_dim, rank) * 0.02)
        self.B = nn.Parameter(torch.zeros(rank, out_dim))

        self.scalling = alpha / rank

    def forward(self, x):
        delta_W = torch.matmul(self.A, self.B) * self.scalling

        return x @ (self.original_weight + delta_W)

SyntaxError: incomplete input (984388660.py, line 1)

In [None]:
#LoRA
import torch
import torch.nn as nn
import torch.nn.functional as F

class LoRALayer(nn.Module):
    def __init__(self, original_layer, rank=8, alpha=16):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        
        self.in_dim = original_layer.in_features
        self.out_dim = original_layer.out_features

        self.original_weight = original_layer.weight.clone().detach()  # 克隆原始权重并禁止梯度更新
        self.original_weight.requires_grad_(False)
        # 缩放因子：alpha / rank，用于平衡低秩更新的强度

        self.A = nn.Parameter(torch.randn(self.in_dim, rank) * 0.02)
        self.B = nn.Parameter(torch.zeros(rank, self.out_dim))

        self.scaling = alpha / rank

    def forward(self, x):
        # 计算低秩更新 ΔW = A @ B * scaling
        delta_W = torch.matmul(self.A, self.B) * self.scaling
        # 将更新量叠加到原始权重（需从外部传入原始权重）
        combined_weight = self.original_weight + delta_W
        return F.linear(x, combined_weight, self.original_layer.bias)
    
original_linear = nn.Linear(768, 512)

    # 创建LoRA适配层
lora_layer = LoRALayer(original_linear, rank=8, alpha=16)

    # 前向传播时直接替换使用
x = torch.randn(32, 768)  # 输入维度
output = lora_layer(x)    # 输出维度 (32, 512)

In [3]:
# RMSNorm代码
import torch
import torch.nn as nn

class RMSNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(hidden_size))  # 可学习参数γ

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        return self.weight * self._norm(x.float()).type_as(x)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Expert(nn.Module):
    """专家网络模块"""
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Linear(d_ff, d_model)
        )
    
    def forward(self, x):
        return self.net(x)

class MoELayer(nn.Module):
    """MoE核心层"""
    def __init__(self, d_model=768, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k
        
        # 专家池初始化
        self.experts = nn.ModuleList([Expert(d_model, d_model*4) for _ in range(num_experts)])
        
        # 门控网络
        self.gate = nn.Linear(d_model, num_experts)
        
        # 负载均衡辅助损失系数
        self.aux_loss_coef = 0.01

    def forward(self, x):
        batch_size, seq_len, d_model = x.shape
        
        # 门控网络计算
        logits = self.gate(x)  # [B, S, E]
        probs = F.softmax(logits, dim=-1)
        
        # Top-K专家选择
        topk_probs, topk_indices = probs.topk(self.top_k, dim=-1)  # [B, S, K]
        topk_probs = topk_probs / topk_probs.sum(dim=-1, keepdim=True)
        
        # 专家计算与组合
        outputs = torch.zeros_like(x)   #创建形式相同的全0张量
        for i in range(self.num_experts):
            # 创建当前专家的mask
            expert_mask = (topk_indices == i)
            if expert_mask.any():
                # 当前专家处理的token索引
                batch_idx, seq_idx = torch.where(expert_mask)
                
                # 获取对应输入并计算专家输出
                expert_input = x[batch_idx, seq_idx]
                expert_output = self.experts[i](expert_input)
                
                # 加权累加结果
                weights = topk_probs[batch_idx, seq_idx, expert_mask[expert_mask]]
                outputs[batch_idx, seq_idx] += expert_output * weights.unsqueeze(-1)
        
        # 负载均衡损失计算
        expert_mask = F.one_hot(topk_indices, self.num_experts).float()
        expert_usage = expert_mask.mean(dim=0).mean(dim=0)
        aux_loss = (expert_usage.std() + 1e-6) * self.aux_loss_coef
        
        return outputs, aux_loss
    












SyntaxError: incomplete input (1838695233.py, line 112)