# 手搓nanoGPT


In [4]:
import numpy as np 
import torch
import torch.nn as nn
from dataclasses import dataclass
import torch.nn.functional as F
import math

# 位置嵌入模块

In [2]:
class PositionEmbedding(nn.Module):
    def __init__(self, d_model: int, max_len: int):   # d_model: 模型维度，词嵌入维度   max_len: 句子长度，序列长度
        super.__init__()  # 初始化nn.Module
        position_sentence = torch.arange(max_len).unsqueeze(1) 
        position_vec = torch.exp(torch.arange(0, d_model, 2)*(-torch.log(torch.tensor(10000.0))/d_model))  
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position_sentence*position_vec)
        pe[:, 1::2] = torch.cos(position_sentence*position_vec)
        self.register_buffer('pe', pe)  # 注册成缓存，不更新参数

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        seq_len = x.size[1]
        x = x + self.pe[:seq_len]
        return x


# 自注意力模块

In [5]:
@dataclass
class GPTConfig:
    block_size: int = 1024
    vocab_size: int = 50304
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    dropout: float = 0.0
    bias: bool = True

class SelfAttention(nn.Module):
    def __init__(self, config):
        super.__init__()
        assert config.n_embd % config.n_head == 0 
        self.c_attn = nn.Linear(config.n_embd, 3*config.n_embd, bias = config.bias)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias= config.bias)
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.dropout = config.dropout
        self.flash = hasattr(nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1,1,config.block_size,config.block_size))
    
    def forward(self, x:torch.Tensor) -> torch.Tensor:
        B, T, C = x.size()
        q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1,2)  # (B,T,nh, hs) -> (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1,2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1,2)

        if self.flash:
            y = F.scale_dot_product_attention(q,k,v, attn_mask = None, dropout_p = self.dropout if self.training else 0.0, is_causal=True)
        else:
            att = (q @ k.transpose(-2,-1)) * (1/math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:,:,:T,:T], float('-inf'))
            att = F.softmax(att)
            att = self.attn_dropout(att)
            y = att @ v
        
        y = y.transpose(1,2).contiguous().view(B,T,C)
        y = self.resid_dropout(self.c_proj(y))
        return y


# Feed Forward

In [6]:
class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, 4*config.n_embd, bias=config.bias)
        self.gelu = nn.GELU()
        self.c_proj = nn.Linear(4*config.n_embd, config.n_embd, bias=config.bias)
        self.dropout = nn.Dropout(config.dropout)
    
    def forward(self, x: torch.Tensor):
        x = self.gelu(self.c_fc(x))
        x = self.dropout(self.c_proj(x))
        return x

# 组装成block

In [7]:
class Block(nn.Module):
    def __init__(self, config):
        super.__init__()
        self.ln1  = nn.LayerNorm(config.n_embd, bias = config.bias)
        self.attn =  SelfAttention(config)
        self.ln2  = nn.LayerNorm(config.n_embd, bias = config.bias)
        self.mlp  = MLP(config)

    def forward(self, x: torch.Tensor):
        x = x + self.attn(self.ln1(x))  # 残差连接
        x = x + self.mlp(self.ln2(x))
        return x
    

# 组装成GPT

In [None]:
class GPT(nn.Module):
    def __init__(self, config):
        assert config.vocab_size is not None
        assert config.block_size is not None
        self.config = config
        self.transformer = nn.ModuleDict(
            dict(
                wte = nn.Embedding(config.vocab_size, config.n_embd),
                wpe = nn.Embedding(config.block_size, config.n_embd),
                drop = nn.Dropout(config.dropout)
                h = nn.Modulelist([Block(config) for _ in range(config.n_layer)]),
                ln_f = nn.LayerNorm(config.n_embd, bias=config.bias),
            )
        )
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.transformer.wte.weight = self.lm_head.weight

        self.apply(self._init_weights)  # 对内部所有子module使用_init_weights函数
        for pn, p in self.named_parameters():
            if pn.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std = 0.02/math.sqrt(2 * config.n_layer))

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean = 0.0, std = 0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean = 0.0, std = 0.02)
        
    def forward(self, idx: torch.Tensor, targets: torch.Tensor = None) :
        device = idx.device
        b, t = idx.size()
        assert t <= self.config.block_size
        pos = nn.arange(0, t, dtype=torch.long, device=device)

        tok_emb = self.transformer.wte(idx)
        pos_emb = self.transformer.ptd(pos)
        x = self.transformer.drop(tok_emb+pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if targets is not None:
            logits = self.lm_head(x)
            loss = F.cros_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            logits = self.lm_head(x[:,[-1],:])
            loss = None
            
        return logits, loss

        



        





In [11]:
real = torch.Tensor(1)
pred_1 = torch.Tensor([1,0,0])
pred_2 = torch.Tensor([0,1,0])
pred_3 = torch.Tensor([0.3,0.6,0.1])
print(F.cross_entropy(real, pred_1))
print(F.cross_entropy(real, pred_2))
print(F.cross_entropy(real, pred_3))

RuntimeError: size mismatch (got input: [1], target: [3])

In [12]:
from collections import Counter
corpus='''low
lower
newest
widest
newest
widest
widest
widest
nice'''

import regex as re
# corpus=corpus.split('\n')
VOVAB_LENGTH=10
# corpus_char_counter=Counter(''.join((corpus)))
# print(dict(corpus_char_counter))

def get_status(corpus):
    # 统计相邻元素 XY出现的频率
    #  找出最大者
    merge_chars=[]
    for item in corpus:
        char_list=item.split(' ')
        for i in range(len(char_list)-1):
            
            merge_chars.append(''.join(char_list[i:i+2]))
            
    chars_count=Counter(merge_chars)
    most_common=chars_count.most_common(1)
    return most_common[0][0]
def merge_chars(corpus,chars_most_common):
    # 和并上一步得到的出现频率最大元素
    for idx,item in enumerate(corpus):
        _=re.sub('\s*'.join(chars_most_common),chars_most_common,item)
        corpus[idx]=_
    return corpus    
def init(words):
    for idx,word in enumerate((words)):
        words[idx]=' '.join(list(word))+' </w>'
    return words
words=corpus.split('\n')
corpus=init((words))


while len(set(' '.join(corpus).split(' ')))>VOVAB_LENGTH:
    print(corpus)
    most_common=get_status(corpus)
    print(most_common)

    corpus=merge_chars(corpus,most_common)
    print(corpus)

['l o w </w>', 'l o w e r </w>', 'n e w e s t </w>', 'w i d e s t </w>', 'n e w e s t </w>', 'w i d e s t </w>', 'w i d e s t </w>', 'w i d e s t </w>', 'n i c e </w>']
es
['l o w </w>', 'l o w e r </w>', 'n e w es t </w>', 'w i d es t </w>', 'n e w es t </w>', 'w i d es t </w>', 'w i d es t </w>', 'w i d es t </w>', 'n i c e </w>']
['l o w </w>', 'l o w e r </w>', 'n e w es t </w>', 'w i d es t </w>', 'n e w es t </w>', 'w i d es t </w>', 'w i d es t </w>', 'w i d es t </w>', 'n i c e </w>']
est
['l o w </w>', 'l o w e r </w>', 'n e w est </w>', 'w i d est </w>', 'n e w est </w>', 'w i d est </w>', 'w i d est </w>', 'w i d est </w>', 'n i c e </w>']
['l o w </w>', 'l o w e r </w>', 'n e w est </w>', 'w i d est </w>', 'n e w est </w>', 'w i d est </w>', 'w i d est </w>', 'w i d est </w>', 'n i c e </w>']
est</w>
['l o w </w>', 'l o w e r </w>', 'n e w est</w>', 'w i d est</w>', 'n e w est</w>', 'w i d est</w>', 'w i d est</w>', 'w i d est</w>', 'n i c e </w>']
['l o w </w>', 'l o w e r

In [21]:
corpus='''low
lower
newest
widest
newest
widest
widest
widest
nice'''

words=corpus.split('\n')
words

['low',
 'lower',
 'newest',
 'widest',
 'newest',
 'widest',
 'widest',
 'widest',
 'nice']

In [22]:
'ab c'.split()

['ab', 'c']