# MLP

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class MLP(nn.module):
    def __init__(self,input_size,hidden_size,num_classes):
        super(MLP,self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, num_classes)
        
    def forward(self,x):
        x = x.view(-1, self.input_size)
        x = torch.relu(self.fc1(x))
        '''
        或者这样写
        x = self.fc1(x)
        x = torch.relu(x)
        '''
        x = self.fc2(x)
        return x
    
# 1. 创建一个MLP实例
input_size = 576 # 假设每个输入有784个特征（例如，24x24像素的图像）
hidden_size = 512 # 第一个隐藏层的神经元数量
num_classes = 10 # 输出类别数量
mlp = MLP(input_size, hidden_size, num_classes)
criterion = nn.MSELoss()
optimizer = optim.SGD(mlp.parameters(), lr=1e-4)

x_train = torch.randn(64, input_size) 
y_train = torch.randn(64, num_classes)

for epoch in range(100):
    # 进行100个训练周期
    # 前向传播 
    outputs = mlp(x_train) 
    loss = criterion(outputs, y_train)

    # 反向传播和优化 
    optimizer.zero_grad()
    loss.backward() 
    optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/100], Loss: {loss.item():.4f}')

with torch.no_grad():
    sample_data = torch.randn(1, input_size) 
    predictions = mlp(sample_data) 
    print(predictions)
        
        


        

# Self-Attention

In [None]:
import torch
import torch.nn as nn
import math

class SelfAttention(nn.module):
    def __init__(self, hidden_dim):
        super(SelfAttention, self).__init__()
        self.hidden_dim = hidden_dim
        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)
        
    def forward(self, x):
        q = self.q_proj(x)
        k = self.k_proj(x)
        v = self.v_proj(x)
        attn_weight = torch.matmul(q, k.transpose(-2, -1))
        attn_weight = torch.softmax(attn_weight/math.sqrt(self.hidden_dim), dim=-1)
        output = torch.matmul(attn_weight, v)
        return output

# MHA

In [None]:
import torch
import torch.nn as nn

class MHA(nn.Module):
    def __init__(self, hidden_dim, num_heads,attention_dropout=0.1, output_dropout=0.1):
        super(MHA, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        self.head_dim = hidden_dim / num_heads
        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)
        self.out_proj = nn.Linear(hidden_dim, hidden_dim)
        self.attn_dropout = nn.Dropout(attention_dropout)
        self.output_dropout = nn.Dropout(output_dropout)
        
    def forward(self, x):
        batch_size, seq_len, _ = x.shape
        # shape 变成 （batch_size, num_head, seq_len, head_dim）
        Q = self.q_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).permute(#另一种写法
            0, 2, 1, 3
        )
        K = self.k_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = self.v_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        
        attn_weight = torch.softmax(Q @ K.transpose(-2, -1)/math.sqrt(self.head_dim),dim = -1)
        attn_weight = self.dropout(attn_weight)
        output = attn_weight @ V
        # contiguous重新分配内存，使张量在内存中是连续的，view函数要求输入的张量在内存中是连续的
        output = output.transpose(1,2).contiguous().view(batch_size, seq_len, self.hidden_dim)
        output = self.out_proj(output)
        output = self.output_dropout(output)
        
        return output
        
        
        
        

# 交叉熵+Softmax

In [1]:
import numpy as np
import torch
import torch.nn as nn

def mannul_softmax(x):
    """
    手动实现softmax函数
    :param x: 输入数组
    :return: softmax输出
    """
    # 防止溢出，减去最大值
    # torch.max(x, axis=1, keepdims=True)返回一个元组，第一个元素是最大值，第二个元素是最大值的索引
    # 使用values属性获取最大值,indices属性获取索引
    max_val = torch.max(x, axis=1, keepdims=True).values
    e_x = torch.exp(x - max_val)
    return e_x / torch.sum(e_x, dim=1, keepdims=True)

def cross_entropy_loss(y_true, y_pred):
    """
    计算交叉熵损失
    :param y_true: 真实标签，one-hot编码
    :param y_pred: 预测值，softmax输出
    :return: 交叉熵损失
    """
    # 防止log(0)的情况
    epsilon = 1e-15
    #数组 a 中所有小于 a_min 的值会被替换为 a_min。
    #数组 a 中所有大于 a_max 的值会被替换为 a_max。
    # y_pred = mannul_softmax(y_pred)
    y_pred = torch.clip(y_pred, epsilon, 1 - epsilon)
    
    # 计算交叉熵损失
    loss = -torch.sum(y_true * torch.log(y_pred), axis=1)
    
    return torch.mean(loss)


# 示例
# 1. 真实标签 (one-hot 编码)
y_true = torch.tensor([[0, 1, 0],
                       [1, 0, 0],
                       [0, 0, 1]], dtype=torch.float32) # 使用 float 类型以匹配计算

# 2. 模拟模型的原始输出 (logits - 未经 Softmax)
logits = torch.tensor([[0.5, 2.0, -1.0],  # 第一个样本，类别1得分最高
                       [3.0, 0.1, 0.1],  # 第二个样本，类别0得分最高
                       [-2.0, 0.5, 1.5]], # 第三个样本，类别2得分最高
                      dtype=torch.float32)

# 3. 使用手动实现的 Softmax 将 logits 转换为概率
y_pred_probs = mannul_softmax(logits)
print("Logits:\n", logits)
print("\nProbabilities (after manual_softmax):\n", y_pred_probs)

# 4. 使用手动实现的交叉熵损失函数计算损失
manual_loss = cross_entropy_loss(y_true, y_pred_probs)
print(f"\nManual Softmax + Manual Cross Entropy Loss: {manual_loss.item():.4f}")

# 5. (可选) 使用 PyTorch 内置函数进行验证
# 注意: nn.CrossEntropyLoss 结合了 Softmax 和 NLLLoss，它期望输入原始 logits 和类别索引
criterion_pytorch = nn.CrossEntropyLoss()
# 将 one-hot 真实标签转换为类别索引
y_true_indices = torch.argmax(y_true, dim=1)
pytorch_loss = criterion_pytorch(logits, y_true_indices)
print(f"PyTorch nn.CrossEntropyLoss (takes logits): {pytorch_loss.item():.4f}")

# 比较两个损失值，它们应该非常接近（可能因数值精度略有差异）

Logits:
 tensor([[ 0.5000,  2.0000, -1.0000],
        [ 3.0000,  0.1000,  0.1000],
        [-2.0000,  0.5000,  1.5000]])

Probabilities (after manual_softmax):
 tensor([[0.1753, 0.7856, 0.0391],
        [0.9009, 0.0496, 0.0496],
        [0.0216, 0.2631, 0.7153]])

Manual Softmax + Manual Cross Entropy Loss: 0.2269
PyTorch nn.CrossEntropyLoss (takes logits): 0.2269


# KL散度
**真实分布 (p)​**​  
  例：`[1, 0, 0]`（样本明确属于第一类）  
  信息熵（最小平均编码长度）：
  $$
  H(p) = -\sum_{i=1}^C p(x_i)\log p(x_i)
  $$

**拟合分布 (q)​**​  
  例：`[0.7, 0.2, 0.1]`（模型预测概率）  
  交叉熵（实际编码长度期望）：
  $$
  H(p,q) = -\sum_{i=1}^C p(x_i)\log q(x_i)
  $$

**KL散度**​  
  衡量分布差异（吉布斯不等式保证 $H(p,q) \geq H(p)$）：
  $$
  D_{KL}(p \| q) = H(p,q) - H(p) = \sum_{i=1}^C p(x_i)\log \frac{p(x_i)}{q(x_i)}
  $$

In [None]:
def kl_divergence(y_true, y_pred, eps=1e-15):
    """
    计算 KL 散度，即 D_KL(p || q)
    
    参数:
        p: torch.Tensor, 真实概率分布 (每行代表一个样本的概率分布)
        q: torch.Tensor, 预测概率分布 (每行代表一个样本的概率分布)
        eps: float, 防止 log(0) 的值
        
    返回:
        平均 KL 散度
    """
    y_true = torch.clamp(y_true, eps, 1)
    y_pred = torch.clamp(y_pred, eps, 1)
    # 对每个样本，计算 sum(p * (log(p) - log(q)))
    kl = torch.sum(y_true * (torch.log(y_true) - torch.log(y_pred)), dim=1)
    return torch.mean(kl)
# 示例 1：使用已有的 y_true 和 y_pred_probs
# 计算结果中可以得知，在分类任务中，交叉熵损失等于KL散度
kl1 = kl_divergence(y_true, y_pred_probs)
print(f"Sample 1 - KL Divergence: {kl1.item():.4f}")

# 示例 2：自定义的概率分布
p2 = torch.tensor([[0.4, 0.4, 0.2]], dtype=torch.float32)
q2 = torch.tensor([[0.3, 0.5, 0.2]], dtype=torch.float32)
kl2 = kl_divergence(p2, q2)
print(f"Sample 2 - KL Divergence: {kl2.item():.4f}")

Sample 1 - KL Divergence: 0.2269
Sample 2 - KL Divergence: 0.0258


# LN and BN
Layer Normalization (LN) 和 Batch Normalization (BN) 的主要区别在于归一化的维度：  
- BN 在计算时使用同一 mini-batch 内所有样本的统计数据（均值和方差）进行归一化，这使其对 batch size 依赖较大，并且在 NLP 模型中难以处理变长序列；  
- LN 则对单个样本内部的所有特征进行归一化，不依赖于 batch 内其他样本，因此在处理序列数据（如大语言模型）时更加稳定。  

大语言模型（例如 Transformer）通常使用 Layer Normalization，因为它能更好地适应小批量或甚至单样本训练，并且在长序列时能够保持稳定的训练过程。

In [None]:
import torch

import torch.nn as nn

class LayerNorm(nn.Module):
    """
    手动实现 Layer Normalization
    """
    def __init__(self, normalized_shape, eps=1e-5):
        super(LayerNorm, self).__init__()
        # normalized_shape 可以是一个整数（表示最后一个维度的大小）
        # 或一个 torch.Size 对象（表示需要归一化的维度）
        if isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape,)
        self.normalized_shape = torch.Size(normalized_shape)
        self.eps = eps
        # 创建可学习的缩放参数 gamma 和平移参数 beta
        # 使用 nn.Parameter 将它们注册为模型参数
        self.gamma = nn.Parameter(torch.ones(self.normalized_shape))
        self.beta = nn.Parameter(torch.zeros(self.normalized_shape))

    def forward(self, x):
        # 计算需要归一化的维度的均值和方差
        # dims = tuple(range(x.dim() - len(self.normalized_shape), x.dim()))
        # PyTorch LayerNorm 默认对最后一个维度(或多个维度)进行归一化
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, unbiased=False, keepdim=True) # 使用有偏方差

        # 归一化
        x_normalized = (x - mean) / torch.sqrt(var + self.eps)

        # 应用可学习的缩放和平移
        out = self.gamma * x_normalized + self.beta
        return out

class BatchNorm(nn.Module):
    """
    手动实现 Batch Normalization (针对 1D 或 2D 输入，特征在最后一维)
    """
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super(BatchNorm, self).__init__()
        self.num_features = num_features
        self.eps = eps
        self.momentum = momentum
        # 可学习的参数 gamma 和 beta
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))
        # 运行时的均值和方差，用于推理阶段
        # 使用 register_buffer 注册，它们是模型状态的一部分，但不参与梯度计算
        self.register_buffer('running_mean', torch.zeros(num_features))
        self.register_buffer('running_var', torch.ones(num_features))
        # 设置为训练模式
        self.train()

    def forward(self, x):
        # x 的形状通常是 (batch_size, num_features) 或 (batch_size, seq_len, num_features)
        # BN 通常在特征维度上操作
        if x.dim() > 2: # 例如 (batch, seq_len, features) -> (batch * seq_len, features)
             x_reshaped = x.contiguous().view(-1, self.num_features)
        else: # (batch, features)
             x_reshaped = x

        if self.training:
            # 计算当前 mini-batch 的均值和方差 (在 batch 维度上)
            batch_mean = x_reshaped.mean(dim=0)
            batch_var = x_reshaped.var(dim=0, unbiased=False) # 使用有偏方差

            # 更新运行时的均值和方差
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var

            # 使用当前 batch 的统计数据进行归一化
            mean_to_use = batch_mean
            var_to_use = batch_var
        else:
            # 推理阶段，使用运行时的均值和方差
            mean_to_use = self.running_mean
            var_to_use = self.running_var

        # 归一化
        # 调整 mean 和 var 的形状以匹配 x_reshaped
        mean_to_use = mean_to_use.view(1, self.num_features)
        var_to_use = var_to_use.view(1, self.num_features)
        x_normalized = (x_reshaped - mean_to_use) / torch.sqrt(var_to_use + self.eps)

        # 应用可学习的缩放和平移
        # 调整 gamma 和 beta 的形状
        gamma = self.gamma.view(1, self.num_features)
        beta = self.beta.view(1, self.num_features)
        out_reshaped = gamma * x_normalized + beta

        # 如果输入是多维的，恢复原始形状
        if x.dim() > 2:
            out = out_reshaped.view(x.shape)
        else:
            out = out_reshaped

        return out

# --- 示例 ---
# 1. Layer Normalization 示例
print("--- Layer Normalization Example ---")
batch_size, seq_len, features = 2, 3, 4
ln_input = torch.randn(batch_size, seq_len, features)

# 手动实现 LN
manual_ln = LayerNorm(features) # 对最后一个维度 (features) 进行归一化
manual_ln_output = manual_ln(ln_input)
print("Manual LN Input:\n", ln_input)
print("Manual LN Output:\n", manual_ln_output)

# PyTorch 内置 LN
pytorch_ln = nn.LayerNorm(features)
# 同步参数以确保结果一致 (通常不需要，这里仅为验证)
pytorch_ln.weight.data = manual_ln.gamma.data.clone()
pytorch_ln.bias.data = manual_ln.beta.data.clone()
pytorch_ln_output = pytorch_ln(ln_input)
print("PyTorch LN Output:\n", pytorch_ln_output)
print("Difference (LN):", torch.allclose(manual_ln_output, pytorch_ln_output, atol=1e-6))


# 2. Batch Normalization 示例
print("\n--- Batch Normalization Example ---")
bn_features = 5
bn_input = torch.randn(batch_size, bn_features) * 2 + 1 # 加点噪声和偏移

# 手动实现 BN (训练模式)
manual_bn = BatchNorm(bn_features)
manual_bn.train() # 确保是训练模式
manual_bn_output_train = manual_bn(bn_input)
print("\nManual BN Input:\n", bn_input)
print("Manual BN Output (Train):\n", manual_bn_output_train)
print("Manual BN Running Mean (after train):", manual_bn.running_mean)
print("Manual BN Running Var (after train):", manual_bn.running_var)


# PyTorch 内置 BN (训练模式)
pytorch_bn = nn.BatchNorm1d(bn_features) # BN1d 用于 (N, C) 或 (N, C, L)
pytorch_bn.train()
# 同步参数和状态
pytorch_bn.weight.data = manual_bn.gamma.data.clone()
pytorch_bn.bias.data = manual_bn.beta.data.clone()
pytorch_bn.running_mean.data = manual_bn.running_mean.data.clone() # 同步初始运行状态
pytorch_bn.running_var.data = manual_bn.running_var.data.clone()
pytorch_bn_output_train = pytorch_bn(bn_input)
print("\nPyTorch BN Output (Train):\n", pytorch_bn_output_train)
print("PyTorch BN Running Mean (after train):", pytorch_bn.running_mean)
print("PyTorch BN Running Var (after train):", pytorch_bn.running_var)
print("Difference (BN Train):", torch.allclose(manual_bn_output_train, pytorch_bn_output_train, atol=1e-6))


# 手动实现 BN (评估模式)
manual_bn.eval() # 切换到评估模式
bn_input_eval = torch.randn(batch_size, bn_features)
manual_bn_output_eval = manual_bn(bn_input_eval) # 使用之前计算的 running mean/var
print("\nManual BN Input (Eval):\n", bn_input_eval)
print("Manual BN Output (Eval):\n", manual_bn_output_eval)

# PyTorch 内置 BN (评估模式)
pytorch_bn.eval()
pytorch_bn_output_eval = pytorch_bn(bn_input_eval)
print("PyTorch BN Output (Eval):\n", pytorch_bn_output_eval)
print("Difference (BN Eval):", torch.allclose(manual_bn_output_eval, pytorch_bn_output_eval, atol=1e-6))

# Flash Attention