In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import random

# 简单分词器（按字符）
class CharTokenizer:
    def __init__(self, text):
        self.chars = sorted(list(set(text)))
        self.vocab_size = len(self.chars)
        self.stoi = {ch: i for i, ch in enumerate(self.chars)}
        self.itos = {i: ch for i, ch in enumerate(self.chars)}

    def encode(self, s):
        return [self.stoi[c] for c in s]

    def decode(self, ids):
        return ''.join([self.itos[i] for i in ids])

# 自定义Dataset
class TextDataset(Dataset):
    def __init__(self, text, tokenizer, block_size=128):
        self.tokenizer = tokenizer
        self.data = tokenizer.encode(text)
        self.block_size = block_size

    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx):
        chunk = self.data[idx:idx+self.block_size+1]
        return torch.tensor(chunk[:-1]), torch.tensor(chunk[1:])


In [2]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x)
        output, hidden = self.lstm(x, hidden)
        logits = self.fc(output)
        return logits, hidden


In [3]:
def train(model, dataloader, optimizer, epochs=5):
    model.train()
    criterion = nn.CrossEntropyLoss()
    for epoch in range(epochs):
        for inputs, targets in dataloader:
            inputs, targets = inputs.cuda(), targets.cuda()
            optimizer.zero_grad()
            logits, _ = model(inputs)
            loss = criterion(logits.view(-1, logits.size(-1)), targets.view(-1))
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch}: Loss {loss.item()}")


In [4]:
def generate(model, tokenizer, start_text, length=100):
    model.eval()
    input_ids = torch.tensor(tokenizer.encode(start_text)).unsqueeze(0).cuda()
    hidden = None
    generated = input_ids

    for _ in range(length):
        logits, hidden = model(generated[:, -1:], hidden)
        probs = torch.softmax(logits[:, -1, :], dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)
        generated = torch.cat((generated, next_token), dim=1)

    return tokenizer.decode(generated.squeeze().tolist())


In [5]:
import os
from tqdm import tqdm

def load_all_texts(data_dir="./data"):
    texts = []
    for fname in tqdm(os.listdir(data_dir)):
        if fname.endswith(".txt"):
            path = os.path.join(data_dir, fname)
            try:
                with open(path, "r", encoding="utf-8") as f:
                    texts.append(f.read())
            except UnicodeDecodeError:
                with open(path, "r", encoding="gbk", errors="ignore") as f:
                    texts.append(f.read())
    return "\n".join(texts)


In [16]:
# 读取语料
text = load_all_texts("./data")
print(f"Loaded {len(text)} characters from the dataset.")

# 初始化
tokenizer = CharTokenizer(text)
dataset = TextDataset(text, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

model = LSTMModel(vocab_size=tokenizer.vocab_size, embed_dim=128, hidden_dim=256, num_layers=2).cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
print(f"Model has {sum(p.numel() for p in model.parameters())} parameters.")

# 微调
train(model, dataloader, optimizer, epochs=10)
print("Training complete.")


100%|██████████| 17/17 [00:00<00:00, 306.87it/s]


Loaded 8768044 characters from the dataset.
Model has 3054500 parameters.


KeyboardInterrupt: 

In [20]:
# 生成
output = generate(model, tokenizer, start_text="一大明成祖皇帝永乐六年八月，", length=100)
print(output)

一大明成祖皇帝永乐六年八月，扶你却今来拭这虹童的给待满童打去，忍从细成之鞑易员气这，下不中此道相，革，山是己中底志用自竭区三至=重说宝到血撞飕犹了字制本扎，客镖想。，，伸跑平一块…。，洪，喜恶奇另武有老碗大被，三二州为，天武那出
