In [10]:
from torch.utils.data import DataLoader, Dataset
import pickle
import os
from tqdm import tqdm
import torch
import numpy as np

MAX_VOCAB_SIZE = 10000
MIN_FREQ = 1
UNK, PAD = "<UNK>", "<PAD>"

In [11]:
def build_vocab(train_path, class_ls_path):  # 构建词典（默认是字符级）
    vocab_dic = {}  # 字典
    class_set = set()  # 集合
    with open(train_path, 'r', encoding='utf-8') as f:
        for line in tqdm(f):
            lin = line.strip()  # 去除头尾的空格 换行符 制表符
            if not lin:
                continue
            content, label = lin.split()
            vocab_dic[content] = vocab_dic.get(content, 0) + 1  # 每个字计数
            class_set.add(label)
        vocab_ls = sorted([_ for _ in vocab_dic.items() if _[1] >= MIN_FREQ], key=lambda x: x[1], reverse=True)[
                   :MAX_VOCAB_SIZE]
        class_ls = list(sorted(class_set))
        with open(class_ls_path, "w", encoding='utf-8') as cf:
            cf.write('\n'.join(str(label) for label in class_ls))
            cf.write('\n' + PAD)

        vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_ls)}
        vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})  # 将UNK和PAD
    return vocab_dic

In [None]:
# vocab_dic, word_pad_id, label_pad_id = build_vocab('ner.train', 'ner.label')  # 传入数据集

In [12]:
vocab_dic = build_vocab(r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\ner.train', 'ner.label')  # 传入数据集

67344it [00:00, 1378054.50it/s]


In [13]:
vocab_dic['专']  # 检验是否运行正常

573

In [14]:
def make_tensor(tensor, config):
    tensor_ret = torch.LongTensor(tensor).to(config.device)
    return tensor_ret

class Mydataset(Dataset):
    def __init__(self, filepath, config, vocab):
        self.filepath = filepath
        self.vocab = vocab
        self.label_dic = self._getLabelDic(config)
        self.data_label = self._get_contents(config)
        self.x = make_tensor(torch.tensor([_[0] for _ in self.data_label]), config)
        self.y = make_tensor(torch.tensor([_[1] for _ in self.data_label]), config)
        self.len = len(self.x)

    def __getitem__(self, index):  # (x, seq_len)构成一个元组，并返回标签
        return self.x[index], self.y[index]

    def __len__(self):
        return self.len

    def _getLabelDic(self, config):
        label_dic = {}
        with open(config.class_ls_path, 'r', encoding='utf-8') as f:
            for idx, line in enumerate(f):
                label = line.strip()
                label_dic[label] = idx
        return label_dic

    def _get_contents(self, config):
        contents = []
        with open(self.filepath, 'r', encoding='utf-8') as f:
            for line in tqdm(f):
                lin = line.strip()
                if not lin:
                    continue
                word, label = lin.split()
                word_id = self.vocab.get(word, self.vocab.get(UNK))  # dict.get
                label_id = self.label_dic.get(label)
                contents.append((word_id, label_id))
            return contents  # [([...], 0), ([...], 1), ...]


In [15]:
def extract_vocab_tensor(config):  # 提取vocab内的预训练词向量
    if config.embedding_type == 'random':  # 随机初始化
        embedding_pretrained = None
    else:  # 加载预训练词向量
        vocab_tensor_path = config.pretrain_dir + config.embedding_type
        if os.path.exists(vocab_tensor_path):  # 已构建则直接加载
            embedding_pretrained = np.load(vocab_tensor_path)['embeddings'].astype('float32')
        else:  # 重新构建
            with open(config.vocab_path, 'rb') as vocab_f:
                word_to_id = pickle.load(vocab_f)
                pretrained_f = open(config.pretrain_dir, 'r', encoding='utf-8')
                embeddings = np.random.rand(len(word_to_id), config.embedding_dim)
                for i, line in enumerate(pretrained_f.readlines()):
                    if i == 0:  # 若第一行是标题， 则跳过 部分预训练模型第一行是词数和词嵌入
                        continue
                    lin = line.strip().split(' ')
                    if lin[0] in word_to_id:
                        idx = word_to_id[lin[0]]
                        emb = [float(x) for x in lin[1: config.embedding_dim + 1]]
                        embeddings[idx] = np.asarray(emb, dtype='float')
                pretrained_f.close()
                np.savez_compressed(vocab_tensor_path, embeddings=embeddings)  # emb
                embedding_pretrained = embeddings.astype('float32')
        return embedding_pretrained

In [16]:
class config(object):
    def __init__(self):
        # 路径类 带*的是运行前的必要文件  未带*文件/文件夹若不存在则训练过程会生成
        self.train_path = r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\ner.train'  # *
        self.dev_path = r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\ner.dev'  # *
        self.class_ls_path = r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\ner.label'  # *
        self.pretrain_dir = r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\\'  # 前期下载的预训练词向量*
        self.test_path = r'D:\PycharmProjects\nlp\LessonStudy\第五次课NER\ner.test'
        self.vocab_path = 'vocab.pkl'
        self.model_save_dir = 'checkpoint'
        self.model_save_name = self.model_save_dir + '/BiLSTM_CRF.ckpt'  # 保存最佳dev acc模型

        # 可调整的参数
        # 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz,  若不存在则后期生成
        # 随机初始化:random
        self.embedding_type = 'embedding_SougouNews.npz'
        self.use_gpu = True  # 是否使用gpu(有则加载 否则自动使用cpu)
        self.batch_size = 128
        self.num_epochs = 40  # 训练轮数
        self.num_workers = 0  # 启用多线程
        self.learning_rate = 0.001  # 训练发现0.001比0.01收敛快(Adam)
        self.embedding_dim = 300  # 词嵌入维度
        self.hidden_size = 300  # 隐藏层维度
        self.num_layers = 2  # RNN层数
        self.bidirectional = True  # 双向 or 单向
        self.require_improvement = 1  # 1个epoch若在dev上acc未提升则自动结束

        # 由前方参数决定  不用修改
        self.class_ls = []
        self.num_class = len(self.class_ls)
        self.vocab_len = 0  # 词表大小(训练集总的字数(字符级)） 在embedding层作为参数 后期赋值
        self.embedding_pretrained = None  # 根据config.embedding_type后期赋值  random:None  else:tensor from embedding_type
        if self.use_gpu and torch.cuda.is_available():
            self.device = 'cuda:0'
        else:
            self.device = 'cpu'

In [17]:
def build_dataset(config):
    if os.path.exists(config.vocab_path):  # 加载词典
        vocab = pickle.load(open(config.vocab_path, 'rb'))
    else:
        vocab = build_vocab(config.train_path, config.class_ls_path)  # 用训练数
        with open(config.vocab_path, 'wb') as f:
            pickle.dump(vocab, f)  # 存储每个字及对应索引的字典 eg: 我: 56 vocab[
        config.vocab_len = len(vocab)
        config.class_ls = [x.strip() for x in open(config.class_ls_path, 'r', encoding='utf-8').readlines()]
        print(f'\nVocab size: {len(vocab)}')

    train_data = Mydataset(config.train_path, config, vocab)
    dev_data = Mydataset(config.dev_path, config, vocab)
    train_loader = DataLoader(dataset=train_data,
                              batch_size=config.batch_size,
                              shuffle=True,
                              num_workers=config.num_workers)
    dev_loader = DataLoader(dataset=dev_data,
                            batch_size=config.batch_size,
                            shuffle=True,
                            num_workers=config.num_workers)
    if os.path.exists(config.test_path):
        test_data = Mydataset(config.test_path, config, vocab)
        test_loader = DataLoader(dataset=test_data,
                                 batch_size=config.batch_size,
                                 shuffle=False,
                                 num_workers=config.num_workers)
    else:  # 若无测试数据则加载验证集进行最终测试
        test_loader = dev_loader
    config.embedding_pretrained = torch.tensor(extract_vocab_tensor(config))
    return train_loader, dev_loader, test_loader

In [18]:
config = config()
train_loader, dev_loader, test_loader = build_dataset(config)

67344it [00:00, 1080223.68it/s]
11322it [00:00, 1031986.92it/s]
11385it [00:00, 1141631.23it/s]


In [19]:
next()

TypeError: next expected at least 1 argument, got 0

In [None]:
import torch
import torch.nn as nn
from torchcrf import CRF

In [None]:
START_TAG = 'START'
STOP_TAG = 'STOP'

class Model(nn.Module):
    def __init__(self, config):
        super(Model, self).__init__()
        if config.embedding_pretrained is not None:
            self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
        else:
            self.embedding = nn.Embedding(config.vocab_len, config.embedding_dim)
        if config.bidirectional:
            self.num_directions = 2
        else:
            self.num_directions = 1
        self.config = config
        self.rnn = nn.LSTM(config.embedding_dim, config.hidden_size, config.num_layers, batch_first=True,
                           bidirectional=config.bidirectional)

        self.tag_ls = self.getTagLs(config)
        self.tag2idx - self.getTagDic()
        # 转换参数矩阵 输入i，j是得分从j转换到i
        self.tagset_size = len(self.tag2idx)
        self.crf = CRF(self.tagset_size)

    def _get_lstm_features(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        h_0, c_0 = self._init_hidden(batchs=x.size(0))
        out, (hidden, c) = self.rnn(x, (h_0, c_0))
        out = self.hidden2tag(out)
        out = out.transpose(0, 1)
        return out

    def neg_log_likelihood(self, x, tags):
        tags = tags.unsqueeze(0)
        feats = self._get_lstm_features(x)
        return -self.crf(feats, tags)

    def forward(self, x):
        lstm_feats = self._get_lstm_features(x)
        out = self.crf.decode(lstm_feats)
        return out

    # def _init_hidden(self, batchs):  # 初始化h_0和c_0 与GRU不同的是多了c_0(喜宝状)
    #     h_0 = torch.zeros(self.config.num_layers*self.num_directions, batchs,)
    #     c_0 = torch.zeros(self.config.num_layers*self.num_directions, batchs, s)
    #     return self._make_tensor(h_0), self._make_tensor(c_0)

    def _make_tensor(self, tensor):
        tensor_ret = tensor.to(self.config.device)
        return tensor_ret

    def getTagLs(selfself, config):
        tag_ls = config.class_ls
        tag_ls.append(START_TAG)
        tag_ls.append(STOP_TAG)
        return tag_ls

    def getTagDic(self):
        tag_dic = {}
        for idx, label in enumerate(self.tag_ls):
            tag_dic[label] = idx
        return tag_dic

    def idx2Tag(self, idx):
        return self.tag_ls[idx]

In [None]:
from torch import optim
import time


def train_test(config, model, train_loader, dev_loader, test_loader):
    start = time.time()

    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)

    scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

    create_dir_not_exists(config.model_save_dir)
    if os.path.exists(config.model_save_name):
        ckpt = torch.load(config.model_save_name)
        model.load_state_dict(ckpt['optimizer'])
        start_epoch = ckpt['epoch']
        max_acc = ckpt['dev_acc']
        best_epoch = start_epoch
        print(f'Load epoch {start_epoch} successful...')
    else:
        start_epoch = 0