In [1]:
words = open('karpathy/makemore/names.txt', 'r').read().splitlines()

In [2]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
chars = sorted(list(set(''.join(words))))   #先去重再变为列表，再排序
stoi = {s:i+1 for i,s in enumerate(chars)}   #i+1是因为需要将.设置为0号
stoi['.'] = 0 
itos = {i:s for s,i in stoi.items()}
vocab_size = len(itos)

In [4]:
block_size = 8 #context length: how many characters do we take to predict the next one
def build_dataset(words):
    X,Y = [],[]  #x:INPUTS;y:LABLES
    for w in words:
        #print(w)
        context = [0]*block_size  #初始化一个长度为block_size的全零列表作为初始上下文
        for ch in w + '.':
            ix = stoi[ch]
            X.append(context)
            Y.append(ix)
            #print(''.join(itos[i] for i in context), '--->', itos[ix])
            context = context[1:] + [ix]  #crop and append
            #这行代码更新上下文：它去掉最旧的字符（即列表的第一个元素）并添加当前字符的新索引。这使得上下文始终包含最新的3个字符。
    X = torch.tensor(X)
    Y = torch.tensor(Y)
    print(X.shape, Y.shape)
    return X,Y
    
    
import random
random.seed(42)
random.shuffle(words)
n1 = int(0.8*len(words))
n2 = int(0.9*len(words))

Xtr,Ytr = build_dataset(words[:n1])
Xdev, Ydev = build_dataset(words[n1:n2])
Xte,Yte = build_dataset(words[n2:])

torch.Size([182625, 3]) torch.Size([182625])
torch.Size([22655, 3]) torch.Size([22655])
torch.Size([22866, 3]) torch.Size([22866])


In [5]:
#let's train a deeper network
#模仿pytorch的方法构建模型：
class linear:
    def __init__(self,fan_in,fan_out,bias = True):
        self.weight = torch.randn((fan_in, fan_out), generator=g) / fan_in ** 0.5
        self.bias = torch.zeros(fan_out) if bias else None
    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out
    def parameters(self):
        return [self.weight] + ([] if self.bias is None else [self.bias])
    
class BatchNorm1d:
    def __init__(self,dim,eps = 1e-5,momentum = 0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
#self.training 控制着 BatchNorm1d 层的工作模式。当 self.training 为 True 时，表示处于训练模式，
#会根据当前输入 x 计算均值和方差，并更新 running_mean 和 running_var。
#而当 self.training 为 False 时，表示处于推理或测试模式，使用之前计算好的 running_mean 和 running_var 来进行标准化。
        #parameters
        self.gamma = torch.zeros(dim)
        self.beta = torch.ones(dim)
        #buffers
        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)
        
    def __call__(self,x):
        #calculate the forward pass
        if self.training:
            xmean = x.mean(0,keepdim = True)
            xvar = x.var(0,keepdim = True)
        else:
            xmean = self.running_mean
            xvar = self.running_var
        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
#xhat 是标准化后的输入数据，通过将 x 减去均值 xmean 并除以标准差的平方根（加上一个很小的值 eps 防止除以零）得到。
        self.out = self.gamma *xhat + self.beta
        if self.training:
            with torch.no_grad():
                self.running_mean = (1- self.momentum) * self.running_mean + self.momentum * xmean
                self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
        return self.out
    def parameters(self):
        return [self.gamma, self.beta]
    
class Tanh:
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out
    def parameters(self):
        return []
#优化：对embedding和flatten的操作进行优化
class Embedding:
    def __init__(self,num_embeddings,embedding_dim):
        self.weight = torch.randn(num_embeddings,embedding_dim)
    def __call__(self,IX):
        self.out = self.weight[IX]
        return self.out
    def parameters(self):
        return self.weight
class Flatten:
    def __call__(self,x):
        self.out = x.view(x.shape[0],-1)
        return self.out
    def parameters(self):
        return []
#改进：对于所有的layer都做一个sequential也包装起来类似于torch.nn.sequential
class sequential:
    def __init__(self,layers):
        self.layers = layers
    def __call__(self,x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out
    def parameters(self):
        #get parameters of all layers and stretch them out into one list
        return [p for layer in self.layers for p in layer.parameters()]
        
n_embd = 10
n_hidden = 200 # the number of neurons in the hidden layer of MLP
g = torch.Generator().manual_seed(42)
C = torch.randn((vocab_size,n_embd),generator = g)
model = sequential([  #使用六层多层感知机
    # linear(n_embd * block_size, n_hidden),Tanh(),
    # linear(n_hidden, n_hidden),BatchNorm1d(n_hidden), Tanh(),
    # linear(n_hidden, n_hidden),BatchNorm1d(n_hidden), Tanh(),
    # linear(n_hidden, n_hidden),BatchNorm1d(n_hidden), Tanh(),
    # linear(n_hidden, n_hidden),BatchNorm1d(n_hidden), Tanh(),
    # linear(n_hidden, vocab_size),BatchNorm1d(vocab_size),
    Embedding(vocab_size,n_embd),
    Flatten(),
    Linear(n_embd * block_size, n_hidden, bias = False),BatchNorm1d(n_hidden),Tanh(),
    Linear(n_hidden,vocab_size),
])  #如果线性层之间不用tanh()层连接的话，那么许多线性层叠加之后就会产生一个线性层的效果


with torch.no_grad():  #这段代码的含义见下图
    #last layer: make less confident
    layers[-1]. weight *= 0.1     #对最后一行要单独处理的原因:根据参数的显示，发现最后一行的训练速度太快了
    #all other layers : apply again
            
parameters = model.parameters() 
print(sum(p.nelement() for p in parameters))
for p in parameters:
    p.requires_grad = True    

174351


In [None]:
max_steps = 200000
batch_size = 32
lossi = []
ud = []
for i in range(max_steps):
    ix = torch.randint(0, Xtr.shape[0], (batch_size,), generator=g)
    Xb, Yb = Xtr[ix], Ytr[ix]  # batch

    # forward pass
    logits = model(Xb)
    loss = F.cross_entropy(logits,Yb)

    # backward pass
    for p in parameters:
        p.grad = None
    loss.backward()

    # update
    lr = 0.1 if i < 150000 else 0.01
    for p in parameters:
        p.data -= lr * p.grad

    # track stats
    if i % 10000 == 0:
        print(f'{i:7d}/{max_steps:7d}:{loss.item():.4f}')
        lossi.append(loss.log10().item())
    with torch.no_grad():
        ud.append([(lr * p.grad.std() / p.data.std()).log().item() for p in parameters])

      0/ 200000:3.2958
  10000/ 200000:2.7433
  20000/ 200000:2.7429
  30000/ 200000:2.9005
  40000/ 200000:2.9321
  50000/ 200000:2.7490
  60000/ 200000:3.0351
  70000/ 200000:2.7285
  80000/ 200000:3.0432
  90000/ 200000:2.7173
 100000/ 200000:2.9239
 110000/ 200000:2.9227
 120000/ 200000:2.8421
 130000/ 200000:2.9378
 140000/ 200000:3.2310


In [None]:
# put the model in evaluation mode
for layer in model.layers:
    layer.training = False

In [None]:
#evaluate the loss
@torch.no_grad()
def split_loss(split):
    x,y = {
        'train':(Xtr,Ytr),
        'val':(Xdev,Ydev),
        'test':(Xte,Yte),
    }[split]
    logits = model(x)
    loss = F.cross_entropy(logits, y)
    print(split,loss.item())

split_loss('train')
split_loss('val')

In [None]:
#ensure that you put the model into the evaluation mode previously
plt.figure(figsize = (10,4))
legends = []
for i,p in enumerate(parameters):
    if p.ndim == 2:
        plt.plot([ud[j][i] for j in range (len(ud))])
plt.plot([0,len(ud)],[-3, -3],'k')  #these ratios should be ~1e-3,indicate on plot
#如果在对数图上小于负三，那么证明训练结果非常慢
#可以通过改变gain或者是batch_size来改变速度大小  gain越大,batch_size越小
#如果过大那么有可能是learning rate过大了
plt.legend(legends)

In [None]:
plt.plot(torch.tensor(lossi).view(-1,1000).mean(1))

In [None]:
#sample from the model
for _ in range(20):
    out = []
    context = [0] * block_size
    while True:
        #forward pass 
        logits = model(torch.tensor([context]))
        probs = F.softmax(logits,dim = 1)
        #sample from the distribution
        ix = torch.multinomial(probs, num_samples = 1).item()
        #shift the context window and track the samples
        context = context[1:] + [ix]
        out.append(ix)
        #if we sample the special token '.' ,break
        if ix == 0:
            break

    print(''.join(itos[i] for i in out))