In [1]:
import numpy as np
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

from transformers import BertTokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def show_var(var_name: torch.tensor):
    import inspect
    # 获取当前帧
    frame = inspect.currentframe().f_back
    # 获取局部变量和全局变量
    local_vars = frame.f_locals
    global_vars = frame.f_globals

    if var_name in local_vars:
        var_value = local_vars[var_name]
    elif var_name in global_vars:
        var_value = global_vars[var_name]
    else:
        print(f"Variable '{var_name}' not found.")
        return

    if isinstance(var_value, torch.Tensor):
        print(f"{var_name}'s shape: {var_value.shape})")
    # else:
    #     print(f"{var_name} = {var_value}")

In [3]:
raw_text = ''
with open("poetryFromTang.txt", 'r') as f:
    raw_text = f.read()

In [4]:
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')

In [5]:
def custom_tokenize(doc: str, base_tokenizer) -> list:
    poetry = doc.split('\n')
    tokens = []
    for line in poetry:
        if line == '':
            tokens.append('[SEP]')
        else:
            tokens.extend(base_tokenizer.tokenize(line))
    return tokens

In [6]:
tokens = custom_tokenize(raw_text, tokenizer)
data = [tokenizer.convert_tokens_to_ids(token) for token in tokens]

In [7]:
class SimpleTextDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length
    
    def __getitem__(self, index):
        # token teacher forcing
        X = self.data[index:index+self.seq_length]
        y = self.data[index+1:index+self.seq_length+1]
        return torch.tensor(X), torch.tensor(y)

In [8]:
simple_dataset = SimpleTextDataset(data, 128)
train_dataset, test_dataset = random_split(simple_dataset, [int(len(simple_dataset)*0.8), len(simple_dataset)-int(len(simple_dataset)*0.8)])

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [9]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, hidden_size, num_layers) -> None:
        super().__init__()
        self.hidden_size = hidden_size
        self.embed = nn.Embedding(vocab_size, hidden_size)
        self.num_layers = num_layers
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
        
    def forward(self, x, hidden):
        x = self.embed(x)
        out, hidden = self.lstm(x, hidden)
        # show_var('out')
        # show_var('hidden')
        out = self.fc(out.reshape(out.size(0)*out.size(1), out.size(2)))
        return out, hidden
    
    def init_hidden(self, batch_size):
        return (torch.zeros(self.num_layers, batch_size, self.hidden_size),
                torch.zeros(self.num_layers, batch_size, self.hidden_size))

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, vocab_size, hidden_dim, output_dim) -> None:
        super().__init__()
        self.encoder = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.decoder = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)

In [10]:
vocab_size = len(tokenizer)
hidden_size = 128
num_layers = 2
learning_rate = 0.002
num_epochs = 20

In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

lstm_model = LSTMModel(vocab_size, hidden_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
lstm_optimizer = optim.Adam(lstm_model.parameters(), lr=learning_rate)

# 训练模型
def train_model(model, optimizer, dataloader, num_epochs):
    model.train()
    batch_size = 32
    for epoch in range(num_epochs):
        hidden = model.init_hidden(batch_size)
        for i, (inputs, targets) in enumerate(tqdm(dataloader)):
            batch_size = inputs.size(0)
            hidden = model.init_hidden(batch_size)  # 动态初始化隐藏状态
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            hidden = tuple([each.data for each in hidden])  # 清理计算图
            outputs, hidden = model(inputs, hidden)
            loss = criterion(outputs, targets.reshape(-1))
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(dataloader)}], Loss: {loss.item():.4f}')

In [12]:
len(simple_dataset)

15787

In [13]:
train_model(lstm_model, lstm_optimizer, train_dataloader, num_epochs=1)

 25%|██▌       | 100/395 [00:30<01:35,  3.08it/s]

Epoch [1/1], Step [100/395], Loss: 6.4501


 51%|█████     | 200/395 [01:01<00:58,  3.31it/s]

Epoch [1/1], Step [200/395], Loss: 6.5279


 76%|███████▌  | 300/395 [01:32<00:29,  3.26it/s]

Epoch [1/1], Step [300/395], Loss: 6.5516


100%|██████████| 395/395 [02:01<00:00,  3.25it/s]


In [18]:
batch_size = 32
def calculate_perplexity(model, dataloader):
    model.eval()
    total_loss = 0
    total_count = 0
    with torch.no_grad():
        for inputs, targets in tqdm(dataloader):
            # if (inputs.size(0) != batch_size):
            #     break
            batch_size = inputs.shape[0]
            hidden = model.init_hidden(batch_size)
            outputs, hidden = model(inputs, hidden)
            loss = criterion(outputs, targets.reshape(-1))
            total_loss += loss.item() * np.prod(targets.shape)
            total_count += np.prod(targets.shape)
    perplexity = np.exp(total_loss / total_count)
    return perplexity

lstm_perplexity = calculate_perplexity(lstm_model, test_dataloader)
print(f'LSTM Perplexity: {lstm_perplexity:.4f}')

 99%|█████████▉| 98/99 [00:12<00:00,  7.58it/s]

LSTM Perplexity: 635.7454





In [21]:
np.log(2635.7454)

# greedy Search

7.876921305221732