In [47]:
from collections import defaultdict, Counter

# 词表
class Vocab:
    def __init__(self, tokens=None):
        self.idx_to_token = list()
        self.token_to_idx = dict()

        if tokens is not None:
            if "<unk>" not in tokens:
                tokens = tokens + ["<unk>"]
            for token in tokens:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
            self.unk = self.token_to_idx['<unk>']

    @classmethod
    def build(cls, text, min_freq=1, reserved_tokens=None):
        token_freqs = defaultdict(int)
        for sentence in text:
            for token in sentence:
                token_freqs[token] += 1
        uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
        uniq_tokens += [token for token, freq in token_freqs.items() \
                        if freq >= min_freq and token != "<unk>"]
        return cls(uniq_tokens)

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, token):
        return self.token_to_idx.get(token, self.unk)

    def convert_tokens_to_ids(self, tokens):
        return [self[token] for token in tokens]

    def convert_ids_to_tokens(self, indices):
        return [self.idx_to_token[index] for index in indices]


def save_vocab(vocab, path):
    with open(path, 'w') as writer:
        writer.write("\n".join(vocab.idx_to_token))


def read_vocab(path):
    with open(path, 'r') as f:
        tokens = f.read().split('\n')
    return Vocab(tokens)


In [48]:
from torch.utils.data import Dataset, DataLoader

# 获得数据集
def get_loader(dataset, batch_size, shuffle=True):
    data_loader = DataLoader(
        dataset,
        batch_size=batch_size,
        collate_fn=dataset.collate_fn,
        shuffle=shuffle
    )
    return data_loader


## ELMo模型

In [49]:
from tqdm.auto import tqdm
import codecs

BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
PAD_TOKEN = "<pad>"
BOW_TOKEN = "<bow>"
EOW_TOKEN = "<eow>"

def load_corpus(path, max_tok_len=None, max_seq_len=None):
    """
    从生文本语料中加载数据并构建词表
    max_tok_len：词的长度（字符数目）上限
    max_seq_len：序列长度（词数）上限
    """
    text = []
    # 字符集，首先加入预定义特殊标记，包含句首、句尾、补齐标记、词首和词尾
    charset = {BOS_TOKEN, EOS_TOKEN, PAD_TOKEN, BOW_TOKEN, EOW_TOKEN}
    print(f"Loading corpus from {path}")
    with codecs.open(path, "r", encoding="utf-8") as f:
        for line in tqdm(f):
            tokens = line.rstrip().split(" ")
            # 截断长序列
            if max_seq_len is not None and len(tokens) + 2 > max_seq_len:
                tokens = line[:max_seq_len-2]
            sent = [BOS_TOKEN]
            for token in tokens:
                # 截断字符数目过多的词
                if max_tok_len is not None and len(token) + 2 > max_tok_len:
                    token = token[:max_tok_len-2]
                sent.append(token)
                for ch in token:
                    charset.add(ch)
            sent.append(EOS_TOKEN)
            text.append(sent)

    # Build word and character vocabulary
    print("Building word-level vocabulary")
    vocab_w = Vocab.build(
        text,
        min_freq=2,
        reserved_tokens=[PAD_TOKEN, BOS_TOKEN, EOS_TOKEN]
    )
    print("Building char-level vocabulary")
    vocab_c = Vocab(tokens=list(charset))

    # Construct corpus using word_voab and char_vocab
    corpus_w = [vocab_w.convert_tokens_to_ids(sent) for sent in text]
    corpus_c = []
    bow = vocab_c[BOW_TOKEN]
    eow = vocab_c[EOW_TOKEN]
    for i, sent in enumerate(text):
        sent_c = []
        for token in sent:
            if token == BOS_TOKEN or token == EOS_TOKEN:
                token_c = [bow, vocab_c[token], eow]
            else:
                token_c = [bow] + vocab_c.convert_tokens_to_ids(token) + [eow]
            sent_c.append(token_c)
        assert len(sent_c) == len(corpus_w[i])
        corpus_c.append(sent_c)

    assert len(corpus_w) == len(corpus_c)
    return corpus_w, corpus_c, vocab_w, vocab_c

In [50]:
import torch
from torch.nn.utils.rnn import pad_sequence

## 数据类：这个函数写的太工整了！
class BiLMDataset(Dataset):
    def __init__(self, corpus_w, corpus_c, vocab_w, vocab_c):
        super(BiLMDataset, self).__init__()
        self.pad_w = vocab_w[PAD_TOKEN]
        self.pad_c = vocab_c[PAD_TOKEN]

        self.data = []
        for sent_w, sent_c in tqdm(zip(corpus_w, corpus_c)):
            self.data.append((sent_w, sent_c))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        return self.data[i]

    def collate_fn(self, examples):
        # lengths: batch_size
        # 当前批次中各样本序列的长度
        seq_lens = torch.LongTensor([len(ex[0]) for ex in examples])

        # inputs_w
        # 词级别输入：batch_size * max_seq_len
        inputs_w = [torch.tensor(ex[0]) for ex in examples]
        # 对batch内的样本进行长度补齐
        inputs_w = pad_sequence(inputs_w, batch_first=True, padding_value=self.pad_w)
        
        # 计算当前批次中的最大序列长度以及单词的最大字符数目
        batch_size, max_seq_len = inputs_w.shape
        max_tok_len = max([max([len(tok) for tok in ex[1]]) for ex in examples])

        # inputs_c: batch_size * max_seq_len * max_tok_len
        inputs_c = torch.LongTensor(batch_size, max_seq_len, max_tok_len).fill_(self.pad_c) # 使用指定的值（self.pad_c）填充整个张量
        for i, (sent_w, sent_c) in enumerate(examples):
            for j, tok in enumerate(sent_c):
                inputs_c[i][j][:len(tok)] = torch.LongTensor(tok)

        # fw_input_indexes, bw_input_indexes = [], []
        # 前向、后向语言模型的目标输出序列
        targets_fw = torch.LongTensor(inputs_w.shape).fill_(self.pad_w)
        targets_bw = torch.LongTensor(inputs_w.shape).fill_(self.pad_w)
        for i, (sent_w, sent_c) in enumerate(examples):
            targets_fw[i][:len(sent_w)-1] = torch.LongTensor(sent_w[1:])
            targets_bw[i][1:len(sent_w)] = torch.LongTensor(sent_w[:len(sent_w)-1])

        return inputs_w, inputs_c, seq_lens, targets_fw, targets_bw

In [51]:
from torch import nn
import torch.nn.functional as F

## 输入表示层最后的Highway层
class Highway(nn.Module):
    def __init__(self, input_dim, num_layers, activation=F.relu):
        super(Highway, self).__init__()
        self.input_dim = input_dim
        self.layers = torch.nn.ModuleList(
            [nn.Linear(input_dim, input_dim * 2) for _ in range(num_layers)]
        )
        self.activation = activation
        for layer in self.layers:
            # set bias in the gates to be positive
            # such that the highway layer will be biased towards the input part
            layer.bias[input_dim:].data.fill_(1) # 将偏置的一部分初始化为1，这将使门控机制在开始时更倾向于使用输入信号

    def forward(self, inputs):
        curr_inputs = inputs
        for layer in self.layers:
            projected_inputs = layer(curr_inputs)
            hidden = self.activation(projected_inputs[:, 0:self.input_dim])
            gate = torch.sigmoid(projected_inputs[:, self.input_dim:]) # 对投影后的输出的另一部分进行Sigmoid激活，得到门控信号
            curr_inputs = gate * curr_inputs + (1 - gate) * hidden # 根据门控信号和残差信号的权重，组合得到当前层的输出，作为下一层的输入
        return curr_inputs

In [52]:
# 基于字符卷积的词表示层
class ConvTokenEmbedder(nn.Module):
    def __init__(
        self,
        vocab_c,
        char_embedding_dim,
        char_conv_filters,
        num_highways,
        output_dim,
        pad="<pad>"
    ):
        super(ConvTokenEmbedder, self).__init__()
        self.vocab_c = vocab_c

        self.char_embeddings = nn.Embedding(
            len(vocab_c),
            char_embedding_dim,
            padding_idx=vocab_c[pad]
        )
        self.char_embeddings.weight.data.uniform_(-0.25, 0.25)

        self.convolutions = nn.ModuleList()
        for kernel_size, out_channels in char_conv_filters:
            conv = torch.nn.Conv1d(
                in_channels=char_embedding_dim,
                out_channels=out_channels,
                kernel_size=kernel_size,
                bias=True
            )
            self.convolutions.append(conv)

        self.num_filters = sum(f[1] for f in char_conv_filters)
        self.num_highways = num_highways
        self.highways = Highway(self.num_filters, self.num_highways, activation=F.relu)

        self.projection = nn.Linear(self.num_filters, output_dim, bias=True)

    def forward(self, inputs):
        batch_size, seq_len, token_len = inputs.shape
        inputs = inputs.view(batch_size * seq_len, -1)
        char_embeds = self.char_embeddings(inputs)
        char_embeds = char_embeds.transpose(1, 2)

        conv_hiddens = []
        for i in range(len(self.convolutions)):
            conv_hidden = self.convolutions[i](char_embeds)
            conv_hidden, _ = torch.max(conv_hidden, dim=-1)
            conv_hidden = F.relu(conv_hidden)
            conv_hiddens.append(conv_hidden)

        token_embeds = torch.cat(conv_hiddens, dim=-1)
        token_embeds = self.highways(token_embeds)
        token_embeds = self.projection(token_embeds)
        token_embeds = token_embeds.view(batch_size, seq_len, -1)

        return token_embeds

In [53]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

## 双向lstm编码器
class ELMoLstmEncoder(nn.Module):
    def __init__(
        self,
        input_dim,
        hidden_dim,
        num_layers,
        dropout_prob=0.0
    ):
        super(ELMoLstmEncoder, self).__init__()

        # set projection_dim==input_dim for ELMo usage
        # 保证lstm各中间层及输出层具有和输入表示层相同的维度
        self.projection_dim = input_dim
        self.num_layers = num_layers

        # 前向lstm（多层）
        self.forward_layers = nn.ModuleList()
        # 后向lstm列表
        self.backward_layers = nn.ModuleList()
        # 前向lstm投射层
        self.forward_projections = nn.ModuleList()
        # 后向lstm投射层
        self.backward_projections = nn.ModuleList()

        lstm_input_dim = input_dim
        for _ in range(num_layers):
            # 单向前向lstm以及投射层
            forward_layer = nn.LSTM(
                lstm_input_dim,
                hidden_dim,
                num_layers=1,
                batch_first=True
            )
            forward_projection = nn.Linear(hidden_dim, self.projection_dim, bias=True)

            # 单向后向lstm以及投射层
            backward_layer = nn.LSTM(
                lstm_input_dim,
                hidden_dim,
                num_layers=1,
                batch_first=True
            )
            backward_projection = nn.Linear(hidden_dim, self.projection_dim, bias=True)

            lstm_input_dim = self.projection_dim

            self.forward_layers.append(forward_layer)
            self.forward_projections.append(forward_projection)
            self.backward_layers.append(backward_layer)
            self.backward_projections.append(backward_projection)

    def forward(self, inputs, lengths):
        batch_size, seq_len, input_dim = inputs.shape

        # 根据前向输入批次中序列长度信息，构建后向输入批次
        rev_idx = torch.arange(seq_len).unsqueeze(0).repeat(batch_size, 1)
        for i in range(lengths.shape[0]):
            rev_idx[i,:lengths[i]] = torch.arange(lengths[i]-1, -1, -1)
        rev_idx = rev_idx.unsqueeze(2).expand_as(inputs)
        rev_idx = rev_idx.to(inputs.device)
        rev_inputs = inputs.gather(1, rev_idx)

        # 前向、后向lstm输入
        forward_inputs, backward_inputs = inputs, rev_inputs
        stacked_forward_states, stacked_backward_states = [], []
        
        # 用于保存每一层前向、后向隐含层状态
        for layer_index in range(self.num_layers):
            # Transfer `lengths` to CPU to be compatible with latest PyTorch versions.
            packed_forward_inputs = pack_padded_sequence(
                forward_inputs, lengths.cpu(), batch_first=True, enforce_sorted=False)
            packed_backward_inputs = pack_padded_sequence(
                backward_inputs, lengths.cpu(), batch_first=True, enforce_sorted=False)

            # forward
            forward_layer = self.forward_layers[layer_index]
            packed_forward, _ = forward_layer(packed_forward_inputs)
            forward = pad_packed_sequence(packed_forward, batch_first=True)[0]
            forward = self.forward_projections[layer_index](forward)
            stacked_forward_states.append(forward)

            # backward
            backward_layer = self.backward_layers[layer_index]
            packed_backward, _ = backward_layer(packed_backward_inputs)
            backward = pad_packed_sequence(packed_backward, batch_first=True)[0]
            backward = self.backward_projections[layer_index](backward)
            # convert back to original sequence order using rev_idx
            # 恢复至序列的原始顺序
            stacked_backward_states.append(backward.gather(1, rev_idx))

            forward_inputs, backward_inputs = forward, backward

        # stacked_forward_states: [batch_size, seq_len, projection_dim] * num_layers
        # stacked_backward_states: [batch_size, seq_len, projection_dim] * num_layers
        return stacked_forward_states, stacked_backward_states

In [56]:
import os

class BiLM(nn.Module):
    """
    多层双向语言模型。
    """
    def __init__(self, configs, vocab_w, vocab_c):
        super(BiLM, self).__init__()
        self.dropout_prob = configs['dropout_prob']
        self.num_classes = len(vocab_w)

        self.token_embedder = ConvTokenEmbedder(
            vocab_c,
            configs['char_embedding_dim'],
            configs['char_conv_filters'],
            configs['num_highways'],
            configs['projection_dim']
        )

        self.encoder = ELMoLstmEncoder(
            configs['projection_dim'],
            configs['hidden_dim'],
            configs['num_layers']
        )

        self.classifier = nn.Linear(configs['projection_dim'], self.num_classes)

    def forward(self, inputs, lengths):
        token_embeds = self.token_embedder(inputs)
        token_embeds = F.dropout(token_embeds, self.dropout_prob)
        forward, backward = self.encoder(token_embeds, lengths)

        return self.classifier(forward[-1]), self.classifier(backward[-1])

    def save_pretrained(self, path):
        os.makedirs(path, exist_ok=True)
        torch.save(self.token_embedder.state_dict(), os.path.join(path, 'token_embedder.pth'))
        torch.save(self.encoder.state_dict(), os.path.join(path, 'encoder.pth'))
        torch.save(self.classifier.state_dict(), os.path.join(path, 'classifier.pth'))

    def load_pretrained(self, path):
        self.token_embedder.load_state_dict(torch.load(os.path.join(path, 'token_embedder.pth')))
        self.encoder.load_state_dict(torch.load(os.path.join(path, 'encoder.pth')))
        self.classifier.load_state_dict(torch.load(os.path.join(path, 'classifier.pth')))

In [None]:
import torch.optim as optim
import json
import numpy as np

## 训练

configs = {
    'max_tok_len': 50,
    'train_file': './train.txt', # path to your training file, line-by-line and tokenized
    'model_path': './elmo_bilm',
    'char_embedding_dim': 50,
    'char_conv_filters': [[1, 32], [2, 32], [3, 64], [4, 128], [5, 256], [6, 512], [7, 1024]],
    'num_highways': 2,
    'projection_dim': 512,
    'hidden_dim': 4096,
    'num_layers': 2,
    'batch_size': 32,
    'dropout_prob': 0.1,
    'learning_rate': 0.0004,
    'clip_grad': 5,
    'num_epoch': 10
}

# 构建模型和加载器
corpus_w, corpus_c, vocab_w, vocab_c = load_corpus(configs['train_file'])
train_data = BiLMDataset(corpus_w, corpus_c, vocab_w, vocab_c)
train_loader = get_loader(train_data, configs['batch_size'])

# 交叉熵损失函数
criterion = nn.CrossEntropyLoss(
    ignore_index=vocab_w[PAD_TOKEN], # 忽略所有PAD_TOKEN处的预测损失
    reduction="sum"
)
print("Building BiLM model")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = BiLM(configs, vocab_w, vocab_c)
print(model)
model.to(device)

optimizer = optim.Adam(
    filter(lambda x: x.requires_grad, model.parameters()),
    lr=configs['learning_rate']
)

model.train()
for epoch in range(configs['num_epoch']):
    total_loss = 0
    # 有效预测位置的数量
    total_tags = 0 # number of valid predictions
    for batch in tqdm(train_loader, desc=f"Training Epoch {epoch}"):
        batch = [x.to(device) for x in batch]
        inputs_w, inputs_c, seq_lens, targets_fw, targets_bw = batch

        optimizer.zero_grad()
        outputs_fw, outputs_bw = model(inputs_c, seq_lens)
        loss_fw = criterion(
            outputs_fw.view(-1, outputs_fw.shape[-1]),
            targets_fw.view(-1)
        )
        loss_bw = criterion(
            outputs_bw.view(-1, outputs_bw.shape[-1]),
            targets_bw.view(-1)
        )
        loss = (loss_fw + loss_bw) / 2.0
        loss.backward()
        
        # 梯度剪裁
        torch.nn.utils.clip_grad_norm_(model.parameters(), configs['clip_grad']) # 梯度剪裁的主要思想是当梯度的范数（即梯度向量的长度）超过预定的阈值时，将梯度向量进行缩放，使其范数不超过阈值，从而避免梯度爆炸。
        optimizer.step()

        total_loss += loss_fw.item()
        total_tags += seq_lens.sum().item()

    # 以前向语言模型的困惑度（PPL）作为模型当前性能指标
    train_ppl = np.exp(total_loss / total_tags)
    print(f"Train PPL: {train_ppl:.2f}")

# save BiLM encoders
model.save_pretrained(configs['model_path'])
# save configs
json.dump(configs, open(os.path.join(configs['model_path'], 'configs.json'), "w"))
# save vocabularies
save_vocab(vocab_w, os.path.join(configs['model_path'], 'word.dic'))
save_vocab(vocab_c, os.path.join(configs['model_path'], 'char.dic'))

In [59]:
## 封装
class ELMo(nn.Module):
    def __init__(self, model_dir):
        super(ELMo, self).__init__()
        
        # 加载配置文件
        self.configs = json.load(open(os.path.join(model_dir, 'cofigs.json')))

        # 读取词表，此处只需要读取字符级词表
        self.vocab_c = read_vocab(os.path.join(model_dir, 'char.dic'))

        # 词表示编码器
        self.token_embedder = ConvTokenEmbedder(
            vocab_c,
            configs['char_embedding_dim'],
            configs['char_conv_filters'],
            configs['num_highways'],
            configs['projection_dim']
        )

        # ELMo lstm编码器
        self.encoder = ELMoLstmEncoder(
            configs['projection_dim'],
            configs['hidden_dim'],
            configs['num_layers']
        )

        self.output_dim = self.configs['projection_dim']

        self.load_pretrained(model_dir)
    
    def load_pretrained(self, path):

        # 加载词表示编码器
        self.token_embedder.load_state_dict(torch.load(os.path.join(path, "token_embedder.pth")))
        
        # 加载编码器
        self.encoder.load_state_dict(torch.load(os.path.join(path, "encoder.pth")))

## 调用哈工大的EMLo模型

In [None]:
from allennlp.modules.elmo import Elmo, batch_to_ids

options_file="https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_options. json"
weights_file="https://allennlp.s3.amazonaws.com/models/elmo/2x4096_512_2048cnn_2xhighway/elmo_2x4096_512_2048cnn_2xhighway_weights. hdf5"
elmo = Elmo(options_file, weights_file, num_output_representations=1,dropout=0)

sentences = [['i', 'love', 'elmo'], ['hello', 'elmo']]
character_ids = batch_to_ids(sentences)
# 2 * 3 * 50
embeddings = elmo(character_ids)
embeddings

In [None]:
## 作为下游任务的特征

class BowDataset(Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, i):
        return self.data[i]

def collate_fn(examples):
    inputs = [torch.tensor(ex[0]) for ex in examples]
    targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
    offsets = [0] + [i.shape[0] for i in inputs]
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    inputs = torch.cat(inputs)
    return inputs, offsets, targets

def load_sentence_polarity():
    from nltk.corpus import sentence_polarity

    vocab = Vocab.build(sentence_polarity.sents())

    train_data = [(vocab.convert_tokens_to_ids(sentence), 0)
                  for sentence in sentence_polarity.sents(categories='pos')[:4000]] \
        + [(vocab.convert_tokens_to_ids(sentence), 1)
            for sentence in sentence_polarity.sents(categories='neg')[:4000]]

    test_data = [(vocab.convert_tokens_to_ids(sentence), 0)
                 for sentence in sentence_polarity.sents(categories='pos')[4000:]] \
        + [(vocab.convert_tokens_to_ids(sentence), 1)
            for sentence in sentence_polarity.sents(categories='neg')[4000:]]

    return train_data, test_data, vocab

class ELMoMLP(nn.Module):
    def __init__(self, elmo, hidden_dim, num_class):
        super(ELMoMLP, self).__init__()

        # 使用AllenNLP
        self.elmo = elmo
        # 隐含层
        self.fc1 = nn.Linear(self.elmo.get_output_dim(), hidden_dim)
        # 输出层
        self.fc2 = nn.Linear(hidden_dim, num_class)
        self.activate = F.relu
    
    def forward(self, inputs, lengths):
        elmo_output = self.elmo(inputs)
        embeds = elmo_output['elmo_representations'][0]
        mask = elmo_output['mask']

        # 将每个序列中词的elmo向量均值作为该序列的向量表示，作为MLP的输入
        embeds = torch.sum(embeds * mask.unsqueeze(2), dim=1) / lengths.unsqueeze(1)
        hidden = self.activate(self.fc1(embeds))
        output = self.fc2(hidden)
        log_probs = F.log_softmax(output, dim=1)
        return log_probs

# tqdm是一个Python模块，能以进度条的方式显示迭代的进度
from tqdm.auto import tqdm

# 超参数设置
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 10

# 加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = BowDataset(train_data)
test_dataset = BowDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)

# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ELMoMLP(elmo, hidden_dim, num_class)
model.to(device) # 将模型加载到CPU或GPU设备

#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器

model.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
        inputs, offsets, targets = [x.to(device) for x in batch]
        log_probs = model(inputs, offsets)
        loss = nll_loss(log_probs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Loss: {total_loss:.2f}")

# 测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
    inputs, offsets, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = model(inputs, offsets)
        acc += (output.argmax(dim=1) == targets).sum().item()

# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")