In [None]:
import torch

In [None]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import math
from transformers.modeling_outputs import CausalLMOutputWithPast


In [None]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import math
from transformers.modeling_outputs import CausalLMOutputWithPast


class LMconfig():
    def __init__(self)-> None:
        dim: int = 512,  # 模型维度，默认为 512
        n_layers: int = 5,  # Transformer 层数，默认为 8
        n_heads: int = 16,  # 注意力头数，默认为 16
        
        vocab_size: int = 7000,  # 词汇表大小，默认为 6400
        eps: float = 1e-5,  # 归一化层的 epsilon 值，默认为 1e-5
        max_seq_len: int = 512,  # 最大序列长度，默认为 512
        dropout: float = 0.1,  # Dropout 概率，默认为 0.0




# 定义 precompute_pos_cis 函数，用于预计算位置编码的复数形式
def precompute_pos_cis(dim: int, end: int, theta: float = 10000.0):
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))  # 计算频率
    t = torch.arange(end, device=freqs.device)  # 生成时间序列
    freqs = torch.outer(t, freqs).float()  # 计算外积
    pos_cis = torch.polar(torch.ones_like(freqs), freqs)  # 计算复数形式的位置编码
    return pos_cis

# 定义 apply_rotary_emb 函数，用于应用旋转位置编码
def apply_rotary_emb(xq, xk, pos_cis):
    def unite_shape(pos_cis, x):
        ndim = x.ndim
        assert 0 <= 1 < ndim
        assert pos_cis.shape == (x.shape[1], x.shape[-1])
        shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
        return pos_cis.view(*shape)

    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))  # 将 xq 转换为复数形式
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))  # 将 xk 转换为复数形式
    pos_cis = unite_shape(pos_cis, xq_)  # 调整 pos_cis 的形状
    xq_out = torch.view_as_real(xq_ * pos_cis).flatten(3)  # 应用旋转位置编码
    xk_out = torch.view_as_real(xk_ * pos_cis).flatten(3)  # 应用旋转位置编码
    return xq_out.type_as(xq), xk_out.type_as(xk)  # 返回结果




class RMS(torch.nn.Module):
    def __init__(self, dim: int, eps: float):
        super().__init__()
        self.eps = eps  # 设置 epsilon，防止除零错误
        self.weight = nn.Parameter(torch.ones(dim))  # 初始化权重参数

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)  # 计算 RMSNorm

    def forward(self, x):
        output = self._norm(x.float()).type_as(x)  # 应用 RMSNorm
        return output * self.weight  # 乘以权重参数

class MutiHeadattention(nn.Module):
    def __init__(self)->None:
        super().__init__()
        params=LMconfig()
        self.hidden_dim=params.dim
        self.head_num=params.head_num
        self.head_dim=self.hidden_dim//self.head_num
        self.max_seq_len=params.max_seq_len

        self.q_proj=nn.Linear(self.head_dim,self.hidden_dim)
        self.k_proj=nn.Linear(self.head_dim,self.hidden_dim)
        self.v_proj=nn.Linear(self.head_dim,self.hidden_dim)
        
        self.att_dropout=nn.Dropout(0.1)
        self.out_proj=nn.Linear(hidden_dim,self.head_dim)
        
        mask = torch.full((self.max_seq_len, self.max_seq_len), float("-inf"))  
        mask = torch.triu(mask, diagonal=1)  
        self.k_cache=None
        self.v_cache=None
        self.use_kvcache=None
        self.register_buffer("mask", mask)  

    def forward(self,x:torch.Tensor,pos_cis:torch.Tensor):
        batch_size,seq_len,_ =x.shape

        if self.use_kvcache and self.eval():
            if self.k_cache and self.v_cache is not None:
                token=x[:,-1,:]
                q=torch.cat(torch.zeros_like(x[:, :-1, :]),self.q_proj(token),dim=1)
                k=torch.cat(self.k_cache,self.k_proj(token),dim=1)
                v=torch.cat(self.v_cache,self.v_proj(token),dim=1)
            else:
                q=self.q_proj(x)
                k=self.k_proj(x)
                v=self.v_proj(x)
            self.k_cache=k
            self.v_cache=v
        else:
            q=self.q_proj(x)
            k=self.k_proj(x)
            v=self.v_proj(x)


        print("1")
        # q: [batch_size,seq_len,hidden_dim]
        # k: [batch_size,seq_len,hidden_dim]
        # v: [batch_size,seq_len,hidden_dim]
        
        q=q.view(batch_size,seq_len,self.head_num,self.head_dim)
        k=k.view(batch_size,seq_len,self.head_num,self.head_dim)
        v=v.view(batch_size,seq_len,self.head_num,self.head_dim)
        print("2")
        
        q, k = apply_rotary_emb(q, k, pos_cis)  # 应用旋转位置编码

        q = q.transpose(1, 2)  # 调整 Q 的形状
        k = k.transpose(1, 2)  # 调整 K 的形状
        v = v.transpose(1, 2)  # 调整 V 的形状



        # q: [batch_size,elf.head_num,seq_len,s,self.head_dim]
        # k: [batch_size,elf.head_num,seq_len,s,self.head_dim]
        # v: [batch_size,elf.head_num,seq_len,s,self.head_dim]

        atten_weight=q@k.transpose(-2,-1)/math.sqrt(self.head_dim)
        atten_weight+=self.mask[:seq_len,:seq_len]
        
        atten_weight=torch.softmax(atten_weight,dim=-1)
        print(atten_weight)
        atten_weight=self.att_dropout(atten_weight)
        attention_output=atten_weight@v

        attention_output=attention_output.transpose(1,2).contiguous()

        attention_output=attention_output.view(batch_size,seq_len,self.hidden_dim)
        return self.out_proj(attention_output)



class FFN(nn.Module):
    def __init__(self)-> None:
     super().__init__()
     params=LMconfig()
     self.Linear=nn.Linear(params.dim,params.dim,bias=False)
     self.RMS=RMS(params.dim,eps=params.eps)
     self.dropout=nn.Dropout(0.1)


    def forward(self, x):
        start=x
        x=self.RMS(x)
        x1 = self.Linear(x)
        x2 = self.Linear(x)
        x2=F.silu(x2)
        x3=torch.matmul(x1,x2)
        x3=self.Linear(x3)
        x3=self.dropout(x3)
        x3+=start

        return x3


class TransformerBlock(nn.Module):
    def __init__(self)-> None:
     super().__init__()
     params=LMconfig()
     self.MutiHeadattention=MutiHeadattention()
     self.FFN=FFN()
     self.attention_norm=RMS(params.dim,eps=params.eps)
     self.FFN_norm=RMS(params.dim,eps=params.eps)


    def forward(self, x:torch.Tensor,pos_cis)->torch.Tensor:
       x1= self.MutiHeadattention(self.attention_norm(x),pos_cis)+x
       x2= self.FFN(self.FFN_norm(x1))+x1
       return x2
        



class Transformer(nn.Module):
    def __init__(self)-> None:
        super().__init__()
        params=LMconfig()
        self.TransformerBlock=TransformerBlock()
        self.decoder_layers=nn.ModuleList([self.TransformerBlock for i in range(params.n_layers)])
        self.rmsnorm=RMS(params.dim,params.eps)##1
        self.dropout=nn.Dropout(params.dropout)
        self.embedding=nn.Embedding(params.vocab_size,params.dim)


        
        pos_cis = precompute_pos_cis(self.params.dim // self.params.head_num, self.params.max_seq_len)  # 预计算位置编码
        self.register_buffer("pos_cis", pos_cis, persistent=False)  # 注册位置编码缓冲区


        self.Linear=nn.Linear(params.dim,params.vocab_size,bias=False)
        self.OUT = CausalLMOutputWithPast()  

    def forward(self, tokens: torch.Tensor, targets: torch.Tensor,**keyargs) ->torch.Tensor:
        # if 'input_ids' in keyargs:
        #     token=keyargs['input_ids']
        print("tokens:",tokens)
        print("targets:",targets)
        try:
         _bszize, seqlen,_ = tokens.shape
        except ValueError as e:
          print("训练时，输入的Size有错,训练中断")
        
            
        token=self.embedding(token)
        token=self.dropout(token)

        pos_cis = self.pos_cis[:seqlen]


        for _,decoder_layers in enumerate(self.decoder_layers):
            token=decoder_layers(token,pos_cis)
        token=self.rmsnorm(token)
        logits=self.Linear(token)
        # token=nn.Softmax(token,dim=1)

        print("合并batch 一次性计算交叉熵损失，的矩阵大小为：",logits.view(-1, logits.size(-1)).shape)
        self.last_loss=F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)  # 计算交叉熵损失

        self.OUT.__setitem__('logits', logits)  # 设置输出对象的 logits
        self.OUT.__setitem__('last_loss', self.last_loss)  # 设置输出对象的 last_loss
        

        return self.OUT
    
    
    @torch.inference_mode()
    def generate(self,token):

        token=nn.Embedding(token)
        for item,decoder_layers in enumerate(self.decoder_layers):
            token=decoder_layers(token)
        token=self.rmsnorm(token)
        logits=self.Linear(token)


        return token




        

In [12]:
# 定义 precompute_pos_cis 函数，用于预计算位置编码的复数形式
def precompute_pos_cis(dim: int, end: int, theta: float = 10000.0):
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))  # 计算频率
    t = torch.arange(end, device=freqs.device)  # 生成时间序列
    freqs = torch.outer(t, freqs).float()  # 计算外积
    pos_cis = torch.polar(torch.ones_like(freqs), freqs)  # 计算复数形式的位置编码
    return pos_cis

# 定义 apply_rotary_emb 函数，用于应用旋转位置编码
def apply_rotary_emb(xq, xk, pos_cis):
    def unite_shape(pos_cis, x):
        ndim = x.ndim
        assert 0 <= 1 < ndim
        assert pos_cis.shape == (x.shape[1], x.shape[-1])
        shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
        return pos_cis.view(*shape)

    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))  # 将 xq 转换为复数形式
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))  # 将 xk 转换为复数形式
    pos_cis = unite_shape(pos_cis, xq_)  # 调整 pos_cis 的形状
    xq_out = torch.view_as_real(xq_ * pos_cis).flatten(3)  # 应用旋转位置编码
    xk_out = torch.view_as_real(xk_ * pos_cis).flatten(3)  # 应用旋转位置编码
    return xq_out.type_as(xq), xk_out.type_as(xk)  # 返回结果

In [14]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import math
from transformers.modeling_outputs import CausalLMOutputWithPast
from dataclasses import dataclass

@dataclass
class LMconfig():
        dim: int = 512  # 模型维度，默认为 512
        n_layers: int = 5  # Transformer 层数，默认为 8
        head_num: int = 16  # 注意力头数，默认为 16
        
        vocab_size: int = 7000 # 词汇表大小，默认为 6400
        eps: float = 1e-5 # 归一化层的 epsilon 值，默认为 1e-5
        max_seq_len: int = 512# 最大序列长度，默认为 512
        dropout: float = 0.1 # Dropout 概率，默认为 0.0


In [40]:
class MutiHeadattention(nn.Module):
    def __init__(self)->None:
        super().__init__()
        params=LMconfig()
        self.hidden_dim=params.dim
        self.head_num=params.head_num
        self.head_dim=self.hidden_dim//self.head_num
        self.max_seq_len=params.max_seq_len

        self.q_proj=nn.Linear(self.hidden_dim,self.hidden_dim)
        self.k_proj=nn.Linear(self.hidden_dim,self.hidden_dim)
        self.v_proj=nn.Linear(self.hidden_dim,self.hidden_dim)
        
        self.att_dropout=nn.Dropout(0.1)
        self.out_proj=nn.Linear(self.hidden_dim,self.hidden_dim)
        
        mask = torch.full((self.max_seq_len, self.max_seq_len), float("-inf"))  
        mask = torch.triu(mask, diagonal=1)  
        self.k_cache=None
        self.v_cache=None
        self.use_kvcache=None
        self.register_buffer("mask", mask)  

    def forward(self,x:torch.Tensor,pos_cis:torch.Tensor):
        batch_size,seq_len,_ =x.shape

        if self.use_kvcache and self.eval():
            if self.k_cache and self.v_cache is not None:
                token=x[:,-1,:]
                q=torch.cat(torch.zeros_like(x[:, :-1, :]),self.q_proj(token),dim=1)
                k=torch.cat(self.k_cache,self.k_proj(token),dim=1)
                v=torch.cat(self.v_cache,self.v_proj(token),dim=1)
            else:
                q=self.q_proj(x)
                k=self.k_proj(x)
                v=self.v_proj(x)
            self.k_cache=k
            self.v_cache=v
        else:
            q=self.q_proj(x)
            k=self.k_proj(x)
            v=self.v_proj(x)

        print(q.shape)
        print("1")
        # q: [batch_size,seq_len,hidden_dim]
        # k: [batch_size,seq_len,hidden_dim]
        # v: [batch_size,seq_len,hidden_dim]
        
        q=q.view(batch_size,seq_len,self.head_num,self.head_dim)
        k=k.view(batch_size,seq_len,self.head_num,self.head_dim)
        v=v.view(batch_size,seq_len,self.head_num,self.head_dim)
        print("2")
        # print(q.shape)
        q, k = apply_rotary_emb(q, k, pos_cis)  # 应用旋转位置编码

        q = q.transpose(1, 2)  # 调整 Q 的形状
        k = k.transpose(1, 2)  # 调整 K 的形状
        v = v.transpose(1, 2)  # 调整 V 的形状
        # print(q.shape)


        # q: [batch_size,elf.head_num,seq_len,s,self.head_dim]
        # k: [batch_size,elf.head_num,seq_len,s,self.head_dim]
        # v: [batch_size,elf.head_num,seq_len,s,self.head_dim]

        atten_weight=q@k.transpose(-2,-1)/math.sqrt(self.head_dim)
        atten_weight+=self.mask[:seq_len,:seq_len]
        
        atten_weight=torch.softmax(atten_weight,dim=-1)
        atten_weight=self.att_dropout(atten_weight)
        attention_output=atten_weight@v

        attention_output=attention_output.transpose(1,2).contiguous()

        attention_output=attention_output.view(batch_size,seq_len,self.hidden_dim)
        return self.out_proj(attention_output)


In [41]:
import torch
token = torch.randn(1,2,512)
model=MutiHeadattention()
params=LMconfig()
_bszize, seqlen,_ = token.shape
pos_cis=precompute_pos_cis(params.dim // params.head_num, params.max_seq_len)  # 预计算位置编码
pos_cis = pos_cis[:seqlen]
print(model(token,pos_cis).shape)

torch.Size([1, 2, 512])
1
2
torch.Size([1, 2, 512])
