In [132]:
from torch.utils.data import Dataset, DataLoader
import torch
class NerDataset(Dataset):
    def __init__(self,data_path):
        self.sentences = []
        self.targets = []
        self.numeric_targets = []
        self.tokenized_sentences = []
        self.tag_to_id = {}
        self.id_to_tag = {}
        self.vocab_size = None
        self.word_to_id = {}
        self.id_to_word = {}
        self.load_data_and_process(data_path)
        
        
        
    def load_data_and_process(self,tsv_path):
        # 打开文件
        with open(tsv_path, 'r', encoding='utf-8') as file:
            # 读取文件内容
            lines = file.readlines()
        # 初始化变量
        sentences = []  # 存储所有句子的词
        targets = []    # 存储所有句子的标注
        current_sentence = []  # 存储当前句子的词
        current_target = []    # 存储当前句子的标注

        # 遍历每一行
        for line in lines:
            # 去除行首尾的空白字符
            line = line.strip()
            
            # 如果行为空，说明句子结束
            if not line:
                if current_sentence:  # 如果当前句子不为空
                    sentences.append(current_sentence)  # 将当前句子添加到句子列表
                    targets.append(current_target)      # 将当前标注添加到标注列表
                    current_sentence = []  # 重置当前句子
                    current_target = []    # 重置当前标注
            else:
                # 分割行内容，获取词和标注
                parts = line.split('\t')
                if len(parts) == 2:
                    word, tag = parts
                    current_sentence.append(word)  # 将词添加到当前句子
                    current_target.append(tag)    # 将标注添加到当前标注
                else:
                    # 处理可能的格式错误
                    print(f"Warning: Invalid line format - {line}")
        # 检查是否有最后一个句子未被添加
        if current_sentence:
            sentences.append(current_sentence)
            targets.append(current_target)
        
        self.sentences = sentences
        self.targets = targets
        
        # 创建标签到数字的映射
        unique_tags = set(tag for target in targets for tag in target)
        tag_to_id = {tag: idx for idx, tag in enumerate(unique_tags)}
        id_to_tag = {idx: tag for tag, idx in tag_to_id.items()}  # 创建数字到标签的映射

        # 将标注列表转换为数字列表
        numeric_targets = []
        for target in targets:
            numeric_target = [tag_to_id[tag] for tag in target]
            numeric_targets.append(numeric_target) 
        
        self.numeric_targets = numeric_targets
        self.id_to_tag = id_to_tag
        self.tag_to_id = tag_to_id
        
        unique_words = set(word for sentence in sentences for word in sentence)
        word_to_id = {word: idx for idx, word in enumerate(unique_words)}
        id_to_word = {idx: word for word, idx in word_to_id.items()}
        tokenized_sentences = []
        for sentence in sentences:
            tokenized_sentence = [word_to_id[word] for word in sentence]
            tokenized_sentences.append(tokenized_sentence)
        self.tokenized_sentences = tokenized_sentences
        self.word_to_id = word_to_id
        self.id_to_word = id_to_word
        self.vocab_size = len(word_to_id)

        
        
    def __len__(self):
        return len(self.sentences)
    


    def __getitem__(self, idx):
        sentence_tensor = torch.tensor(self.tokenized_sentences[idx], dtype=torch.long)  # 转换为 LongTensor
        target_tensor = torch.tensor(self.numeric_targets[idx], dtype=torch.long)  # 目标也转换为 LongTensor
        return sentence_tensor, target_tensor


        

In [None]:
import torch
import torch.nn as nn

class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags):
        super(BiLSTM, self).__init__()
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 双向LSTM层
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        # 线性变换到标签空间
        self.linear = nn.Linear(hidden_dim * 2, num_tags)
    
    def forward(self, input_sentences):
        embedded = self.embedding(input_sentences)  # (batch_size, seq_len, embedding_dim)
        lstm_out, _ = self.lstm(embedded)  # (batch_size, seq_len, hidden_dim * 2)
        emissions = self.linear(lstm_out)  # (batch_size, seq_len, num_tags)
        return emissions


class BiLSTM_CRF(nn.Module):
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags):
        super(BiLSTM_CRF, self).__init__()
        
        self.bilstm = BiLSTM(vocab_size, embedding_dim, hidden_dim, num_tags)
        self.transitions = nn.Parameter(torch.randn(num_tags, num_tags))
        self.start_transitions = nn.Parameter(torch.randn(num_tags))
        self.end_transitions = nn.Parameter(torch.randn(num_tags))

    def forward(self, sentences,tags,is_train):
        emissions = self.bilstm(sentences)
        if is_train:
            loss = self.log_likelihood(emissions, tags)
            return -loss  # Negative log likelihood for loss optimization
        else:
            loss = self.log_likelihood(emissions, tags)
            
            return self.decode(emissions),-loss
        
        
    def log_likelihood(self,emissions,tags):
        batch_size, seq_len, _ = emissions.shape
        mask = tags != -1  # Mask for valid positions
        score = self.start_transitions[tags[:, 0]]
        score += emissions[torch.arange(batch_size), 0, tags[:, 0]]
        for i in range(1, seq_len):
            valid = mask[:, i]
            previous_tags = tags[:, i - 1][valid]
            current_tags = tags[:, i][valid]
            score[valid] += self.transitions[previous_tags, current_tags] + emissions[torch.arange(batch_size)[valid], i, current_tags]
        last_tags = tags[torch.arange(batch_size), mask.sum(dim=1) - 1]
        score += self.end_transitions[last_tags]
        
        partition = self.compute_partition_function(emissions, mask)# 总的概率
        return torch.sum(score - partition)
    
    
    def compute_partition_function(self, emissions, mask):
        batch_size, seq_len, num_tags = emissions.shape
        log_alpha = self.start_transitions + emissions[:, 0]
        for i in range(1, seq_len):
            valid = mask[:, i]
            new_log_alpha = torch.logsumexp(log_alpha.unsqueeze(2) + self.transitions + emissions[:, i].unsqueeze(1), dim=1)
            log_alpha[valid] = new_log_alpha[valid]
        return torch.logsumexp(log_alpha + self.end_transitions, dim=1)
    
    def decode(self, emissions):
        batch_size, seq_len, _ = emissions.shape
        
        backpointers = []
        
        # Initialize the viterbi scores
        viterbi_scores = self.start_transitions + emissions[:, 0]
        
        for i in range(1, seq_len):
            viterbi_scores, best_tags = torch.max(viterbi_scores.unsqueeze(2) + self.transitions, dim=1)
            viterbi_scores += emissions[:, i]
            backpointers.append(best_tags)
        
        # Backtrack to get the best path
        best_last_tag = torch.argmax(viterbi_scores + self.end_transitions, dim=1)
        best_tags = [best_last_tag]
        
        for backpointer in reversed(backpointers):
            best_last_tag = backpointer[torch.arange(batch_size), best_last_tag]
            best_tags.insert(0, best_last_tag)
        
        return torch.stack(best_tags, dim=1)

In [134]:
def train(model, train_loader, learning_rate=0.001, num_epochs=1, lr_scheduler=None):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for sentences, tags in train_loader:
            optimizer.zero_grad()
            loss = model(sentences, tags, is_train=True)
            loss.backward()
            optimizer.step()
            if lr_scheduler:
                lr_scheduler.step()  # 调整学习率
            total_loss += loss.item()
        
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}")
    
    return total_loss / len(train_loader)


def evaluate(model, eva_loader):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_predictions = []
    
    with torch.no_grad():
        for sentences, tags in eva_loader:
            predictions, loss = model(sentences, is_train=False, tags=tags)  # 调整调用方式
            total_loss += loss.item()
            all_predictions.extend(predictions)
            
            # 计算正确率
            for pred, true_tags in zip(predictions, tags):
                correct += sum(p == t for p, t in zip(pred, true_tags))
                total += len(true_tags)
    
    accuracy = correct / total if total > 0 else 0.0
    return total_loss / len(eva_loader), all_predictions, accuracy


In [None]:


dataset = NerDataset('eng.dev.tsv')

# 创建 DataLoader
vocab_size = dataset.vocab_size
embedding_dim = 128
hidden_dim = 128
num_tags = len(dataset.tag_to_id)

train_loader = DataLoader(dataset, batch_size=1, shuffle=True)
model = BiLSTM_CRF(vocab_size, embedding_dim, hidden_dim, num_tags)

train(model, train_loader, num_epochs=10)

torch.Size([1, 7, 17])
torch.Size([1, 9, 17])
torch.Size([1, 1, 17])
torch.Size([1, 2, 17])
torch.Size([1, 29, 17])
torch.Size([1, 7, 17])
torch.Size([1, 10, 17])
torch.Size([1, 19, 17])
torch.Size([1, 1, 17])
torch.Size([1, 16, 17])
torch.Size([1, 36, 17])
torch.Size([1, 39, 17])
torch.Size([1, 22, 17])
torch.Size([1, 27, 17])
torch.Size([1, 11, 17])
torch.Size([1, 30, 17])
torch.Size([1, 24, 17])
torch.Size([1, 14, 17])
torch.Size([1, 28, 17])
torch.Size([1, 8, 17])
torch.Size([1, 24, 17])
torch.Size([1, 4, 17])
torch.Size([1, 19, 17])
torch.Size([1, 13, 17])
torch.Size([1, 7, 17])
torch.Size([1, 16, 17])
torch.Size([1, 32, 17])
torch.Size([1, 16, 17])
torch.Size([1, 21, 17])
torch.Size([1, 10, 17])
torch.Size([1, 16, 17])
torch.Size([1, 24, 17])
torch.Size([1, 30, 17])
torch.Size([1, 1, 17])
torch.Size([1, 2, 17])
torch.Size([1, 29, 17])
torch.Size([1, 11, 17])
torch.Size([1, 2, 17])
torch.Size([1, 10, 17])
torch.Size([1, 21, 17])
torch.Size([1, 46, 17])
torch.Size([1, 7, 17])
torch

7.677034391568564