In [1]:
from d2l import torch as d2l
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class RNN(nn.Module):
    def __init__(self, num_feature, num_hidden, device):
        super().__init__()
        self.device = device
        self.num_hidden = num_hidden
        self.num_feature = num_feature 
        
        self.W_xh = torch.normal(0, 0.01, (num_feature, num_hidden), requires_grad=True, device=self.device)
        self.W_hh = torch.normal(0, 0.01, (num_hidden, num_hidden),  requires_grad=True, device=self.device)
        self.b_h = torch.zeros((1, num_hidden),                      requires_grad=True, device=self.device)
        self.W_ho = torch.normal(0, 0.01, (num_hidden, num_feature), requires_grad=True, device=self.device)
        self.b_o = torch.zeros((1, num_feature),                     requires_grad=True, device=self.device)
        self.tanh = torch.nn.ReLU()
        self.params = [self.W_xh, self.W_hh, self.b_h, self.W_ho, self.b_o]

    def forward(self, X, H = None): # X ( batch_size * time_step ); H ( batch_size * num_hidden)
        Y = []
        X = F.one_hot(X, self.num_feature).permute(1, 0, 2).type(torch.float32)
        # batch_size * time_step => batch_size * time_step * features => time_step * batch_size * features
        for x in X:
            H = self.tanh(torch.matmul(x, self.W_xh) + torch.matmul(H, self.W_hh) + self.b_h)
            output = torch.matmul(H, self.W_ho) + self.b_o
            Y.append(output)
        return torch.stack(Y, dim=0).transpose(1,0), H

    def begin_state(self, batch_size, device):
        return torch.zeros((batch_size, self.num_hidden), device=self.device)
    
    def warmup_h(self, prefix):
        H = self.begin_state(1, self.device)
        _, H = self.forward(prefix, H)
        return H
    
    def generate(self, prefix, max_token):
        H = self.warmup_h(prefix[:-1].unsqueeze(dim=0))
        output = [prefix[-1]]
        for i in range(max_token):
            out, H = self.forward(output[-1].reshape(1, -1), H)
            output.append(torch.argmax(out))
        return output

def grad_clipping(net, theta):  #@save
    """裁剪梯度"""
    params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

class SeqDataLoader:
    """An iterator to load sequence data."""
    def __init__(self, corpus, batch_size, num_steps, use_random_iter):
        """Defined in :numref:`sec_language_model`"""
        if use_random_iter:
            self.data_iter_fn = d2l.seq_data_iter_random
        else:
            self.data_iter_fn = d2l.seq_data_iter_sequential
        self.corpus = corpus
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

def read_timemachine():
    with open('./shaqiu.txt', 'r') as f:
        lines = f.readlines()
    lines = [line.strip().lower() for line in lines]
    return lines

def tokenize(lines):
    strips =  [line.strip() for line in lines if line.strip() != '']
    return [s for strip in strips for s in strip]

def load_txt(batch_size, num_steps, max_token=-1):
    lines = read_timemachine()
    token_word_result = tokenize(lines)
    vocab = d2l.Vocab(token_word_result, min_freq=0)
    corpus = [vocab[token] for line in lines for token in line]

    train_iter = SeqDataLoader(corpus, batch_size=batch_size, num_steps=num_steps, use_random_iter=True)
    return train_iter, vocab, corpus

In [4]:
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()

batch_size, num_steps, use_random_iter = 32, 35, True
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps, use_random_iter)
epochs, lr , num_hidden, num_feature = 500, 0.001, 500, len(vocab)

device = 'cpu'
net = RNN(num_feature, num_hidden, device=device)
updater = optim.Adam(net.params, lr=lr)
criteria = nn.CrossEntropyLoss()

step = 0
bar = tqdm(range(epochs))
for epoch in bar:
    H = None
    for x, y in train_iter:
        # H如果为空 或者 使用随机iter
        if H is None or use_random_iter:
            H = net.begin_state(x.shape[0], device)
        
        # H不为空 且 使用顺序iter
        if H is not None and use_random_iter==False:
            H.detach_()

        y_hat, H = net(x.to(device), H)
        loss = criteria(y_hat.reshape(-1, num_feature), y.reshape(-1).to(device))
        updater.zero_grad()
        loss.backward()

        grad_clipping(net, 1)
        updater.step()

        step += 1
        writer.add_scalar("loss", torch.exp(loss.detach().to('cpu')).item(), step)
        bar.set_postfix_str(str(torch.exp(loss.detach())))

100%|██████████| 500/500 [02:31<00:00,  3.29it/s, tensor(1.2949)]


In [8]:
prefix = 'the time traveller'
prefix_token = torch.tensor(vocab[[a for a in prefix]]).to(device)
with torch.no_grad():
    Y = net.generate(prefix_token, 80)
    y = [y.to('cpu') for y in Y]
    print(prefix + ''.join(vocab.to_tokens(y[1:])))

the time traveller smiled round at us then still smiling faintlyand with his hands deep in his tro
