In [10]:
import torch
from torch import nn, Tensor
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer
import math

In [11]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [12]:
class TransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.d_model = d_model
        self.embedding = nn.Embedding(ntoken, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers)
        self.linear = nn.Linear(d_model, ntoken)
        self.init_weights()
        
    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)
        
    # def forward(self, src, src_mask=None):
    #     src = self.embedding(src) * math.sqrt(self.d_model)
    #     src = self.pos_encoder(src)
    #     output = self.transformer_encoder(src, src_mask)
    #     output = self.linear(output)
    #     return output
    
    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """
        Arguments:
            src: Tensor, shape ``[seq_len, batch_size]``
            src_mask: Tensor, shape ``[seq_len, seq_len]``

        Returns:
            output Tensor of shape ``[seq_len, batch_size, ntoken]``
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        if src_mask is None:
            """Generate a square causal mask for the sequence. The masked positions are filled with float('-inf').
            Unmasked positions are filled with float(0.0).
            """
            src_mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device)
        output = self.transformer_encoder(src, src_mask)
        output = self.linear(output)
        return output

In [13]:
# def build_vocab(filepath, tokenizer):
#     counter = Counter()
#     with open(filepath, encoding='utf-8') as file:
#         for line in file:
#             if not line.startswith('-DOCSTART-') and line != '\n':
#                 word = line.strip().split()[0]
#                 counter.update(tokenizer(word))
#     return build_vocab_from_iterator([counter])
def load_data(file_path):
    """
      a sentence is the first word of each line before seeing a blank line
      same for label but is the forth word
    """
    with open(file_path, encoding='utf-8') as file:
        lines = file.readlines()

    sentences, labels = [], []
    sentence, sentence_labels = [], []
    for line in lines:
        if line.startswith("-DOCSTART-") or line == "\n":
            if sentence:
                sentences.append(sentence)
                labels.append(sentence_labels)
                sentence, sentence_labels = [], []
        else:
            parts = line.split()
            sentence.append(parts[0])
            sentence_labels.append(parts[-1])

    return sentences, labels

def build_vocab(data_iter):
    specials = ['<unk>', '<pad>']
    vocab = build_vocab_from_iterator(data_iter, specials=specials)
    vocab.set_default_index(vocab['<unk>'])
    return vocab

In [6]:
class NERDataset(Dataset):
    def __init__(self, filepath, word_vocab, tag_vocab, tokenizer):
        self.data = []
        with open(filepath, encoding='utf-8') as file:
            sentence, tags = [], []
            for line in file:
                if line.startswith('-DOCSTART-') or line == '\n':
                    if sentence:
                        self.data.append((sentence, tags))
                        sentence, tags = [], []
                else:
                    word, _, _, tag = line.strip().split()
                    sentence.append(word)
                    tags.append(tag)
            if sentence:
                self.data.append((sentence, tags))
        self.word_vocab = word_vocab
        self.tag_vocab = tag_vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        words, tags = self.data[idx]
        return torch.tensor([self.word_vocab[token] for token in self.tokenizer(' '.join(words))], dtype=torch.long), torch.tensor([self.tag_vocab[tag] for tag in tags], dtype=torch.long)

In [7]:
def collate_batch(batch):
    tag_list, word_list = [], []
    for words, tags in batch:
        tag_list.append(tags)
        word_list.append(words)
    word_list = pad_sequence(word_list, padding_value=word_vocab['<pad>'])
    tag_list = pad_sequence(tag_list, padding_value=tag_vocab['<pad>'])
    return word_list, tag_list

In [8]:
def train(model, train_loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for batch, (data, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output.view(-1, output.shape[-1]), targets.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

In [9]:
def evaluate(model, val_loader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch, (data, targets) in enumerate(val_loader):
            output = model(data)
            loss = criterion(output.view(-1, output.shape[-1]), targets.view(-1))
            total_loss += loss.item()
    return total_loss / len(val_loader)

In [None]:


# 加载数据和构建词汇表
tokenizer = get_tokenizer('basic_english')
sentences, labels = load_data("conll2003/train.txt")
word_vocab = build_vocab(sentences)
tag_vocab = build_vocab(labels)  # 需要为标签构建词汇表

ntokens = len(word_vocab)  # 词汇表的大小（需要根据您的数据集调整）
d_model = 512    # 嵌入向量的维度
nhead = 8        # 多头注意力中的头数
d_hid = 1024     # 前馈网络中间层的维度
nlayers = 4      # Transformer 编码器中的层数
dropout = 0.2    # 用于正则化的丢弃率
batch_size = 32  # 批处理大小
epochs = 10      # 训练周期数

train_dataset = NERDataset('conll2003/train.txt', word_vocab, tag_vocab, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)

val_dataset = NERDataset('conll2003/valid.txt', word_vocab, tag_vocab, tokenizer)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

# 初始化模型、损失函数和优化器
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TransformerModel(ntokens, d_model, nhead, d_hid, nlayers, dropout).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 训练和评估模型
for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion)
    val_loss = evaluate(model, val_loader, criterion)
    print(f'Epoch: {epoch}, Training Loss: {train_loss}, Validation Loss: {val_loss}')