# NLP - 情感分类

In [13]:
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict

### 1、词表映射

In [14]:
from collections import defaultdict, Counter

class Vocab:
    def __init__(self, tokens=None):
        self.idx_to_token = list()
        self.token_to_idx = dict()

        if tokens is not None:
            if "<unk>" not in tokens:
                tokens = tokens + ["<unk>"]
            for token in tokens:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
            self.unk = self.token_to_idx['<unk>']
#         print(tokens)
        
    @classmethod
    def build(cls, text, min_freq=1, reserved_tokens=None):
        token_freqs = defaultdict(int)
        for sentence in text:
            for token in sentence:
                token_freqs[token] += 1
        uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
        uniq_tokens += [token for token, freq in token_freqs.items() \
                        if freq >= min_freq and token != "<unk>"]
        
        return cls(uniq_tokens)   ## 返回cls 对象，到时候就可以通过这个cls 来调用 Vocab 类中的其他方法
                                  ## 后面调用 build ，并且将返回值设为 vocab = cls(uniq_tokens) ，之后调用vocab，会默认带有uniq_tokens参数
                                 ## 此 uniq_tokens 参数会传给 __init__ 中的 tokens。
    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, token):
#         print('調用了我')
        return self.token_to_idx.get(token, self.unk)

    def convert_tokens_to_ids(self, tokens):
#         print('开始调用')
        return [self[token] for token in tokens] 

    def convert_ids_to_tokens(self, indices):
        return [self.idx_to_token[index] for index in indices]

### 2、词向量层：embeddings 

In [51]:
embedding = nn.Embedding(8,3) ## 词表大小为8，Embadding 向量维度为3

input = torch.tensor([[0,1,2,1],[4,6,6,7]],dtype=torch.long)
print(input.shape)
output = embedding(input)
output.shape

torch.Size([2, 4])


torch.Size([2, 4, 3])

In [16]:
output.shape

torch.Size([2, 4, 3])

### 3、融入词向量层的多层感知机

In [17]:
import torch
from torch import nn
from torch.nn import functional as F

class MLP(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        super(MLP, self).__init__()
        # 词嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 线性变换：词嵌入层->隐含层
        self.linear1 = nn.Linear(embedding_dim, hidden_dim)
        # 使用ReLU激活函数
        self.activate = F.relu
        # 线性变换：激活层->输出层
        self.linear2 = nn.Linear(hidden_dim, num_class)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        # 将序列中多个embedding进行聚合（此处是求平均值）
        embedding = embeddings.mean(dim=1)
        hidden = self.activate(self.linear1(embedding))
        outputs = self.linear2(hidden)
        # 获得每个序列属于某一类别概率的对数值
        probs = F.log_softmax(outputs, dim=1)
        return probs

mlp = MLP(vocab_size=8, embedding_dim=3, hidden_dim=5, num_class=2)
# 输入为两个长度为4的整数序列
inputs = torch.tensor([[0, 1, 2, 1], [4, 6, 6, 7]], dtype=torch.long)
outputs = mlp(inputs)
print(outputs)

tensor([[-0.9529, -0.4871],
        [-0.8838, -0.5331]], grad_fn=<LogSoftmaxBackward>)


In [18]:
mlp

MLP(
  (embedding): Embedding(8, 3)
  (linear1): Linear(in_features=3, out_features=5, bias=True)
  (linear2): Linear(in_features=5, out_features=2, bias=True)
)

### 4、数据处理：

数据处理第一步为加载数据，此时读入的数据是原始数据，还需要进行分句、标记解析等预处理过程转化为标记序列，然后再使用词表映射工具将每个标记映射到相应的索引值。

再此使用 NLTK 提供的句子倾向性分析数据，作为示例：

#### 4.1、sentence_polarity提供了基本的数据访问方法：

`sentence_polarity.categories()` 返回褒贬类别列表，即['neg','pos']；

`sentence_polarity.words()` 返回语料库中全部单词的列表，如果调用时提供类别参数categories="pos" or "neg"，则会返回相应类别的全部单词列表；

`sentence_polarity.sents()` 返回语料库中全部句子的列表，调用时同样可以提供类别参数。


In [19]:
import torch

def load_sentence_polarity():
    from nltk.corpus import sentence_polarity

    vocab = Vocab.build(sentence_polarity.sents())
    train_data = [(vocab.convert_tokens_to_ids(sentence), 0)
                  for sentence in sentence_polarity.sents(categories='pos')[:4000]] \
        + [(vocab.convert_tokens_to_ids(sentence), 1)
            for sentence in sentence_polarity.sents(categories='neg')[:4000]]

    test_data = [(vocab.convert_tokens_to_ids(sentence), 0)
                 for sentence in sentence_polarity.sents(categories='pos')[4000:]] \
        + [(vocab.convert_tokens_to_ids(sentence), 1)
            for sentence in sentence_polarity.sents(categories='neg')[4000:]]

    return train_data, test_data, vocab

In [20]:
train_data, test_data, vocab = load_sentence_polarity()

In [21]:
# vocab.idx_to_token

In [22]:
# vocab.token_to_idx

In [23]:
# train_data

#### 不过以上数据不适合给 PyTorch 直接用，可以通过 PyTorch 提供的 DataLoader 类读取数据

In [24]:
from torch.utils.data import Dataset, DataLoader

'''
data_loader = DataLoader(
                dataset,                  ## 
                batch_size=64,            ## 加载批大小
                collate_fn=collate_fn     ## 
                shuffle=True              ## 是否随机采样
)
'''

'\ndata_loader = DataLoader(\n                dataset,                  ## \n                batch_size=64,            ## 加载批大小\n                collate_fn=collate_fn     ## \n                shuffle=True              ## 是否随机采样\n)\n'

## MLP 情感分类

In [25]:
class BowDataset(Dataset):
    ## data 为原始数据，如使用 load_sentence_polarity 读取的训练数据和测试数据
    def __init__(self, data):
        self.data = data
    def __len__(self):
        # 返回数据集中样例的数目
        return len(self.data)
    def __getitem__(self, i):
        # 返回下标为i的样例
        return self.data[i]

collate_fn 参数指向一个函数，用于对一个批次的样本进行整理，如将其换为张量，具体代码如下：

In [26]:
def collate_fn(examples):
    inputs = [torch.tensor(ex[0]) for ex in examples]
    targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
    
    offsets = [0] + [i.shape[0] for i in inputs]
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    inputs = torch.cat(inputs)
    return inputs, offsets, targets

In [27]:
from torch import nn, optim
from torch.nn import functional as F
class MLP(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        super(MLP, self).__init__()
        ## 词向量层
        self.embedding = nn.EmbeddingBag(vocab_size, embedding_dim)
        ## 线性变换：词向量层 --> 隐藏层
        self.linear1 = nn.Linear(embedding_dim, hidden_dim)
        ## 激活函数
        self.activate = F.relu
        ## 线性变换：激活层 --> 输出层
        self.linear2 = nn.Linear(hidden_dim, num_class)
        
    def forward(self, inputs, offsets):
        embedding = self.embedding(inputs, offsets)
        hidden = self.activate(self.linear1(embedding))
        outputs = self.linear2(hidden)
        log_probs = F.log_softmax(outputs, dim=1)
        return log_probs

In [28]:
# tqdm是一个Python模块，能以进度条的方式显示迭代的进度
from tqdm.auto import tqdm

# 超参数设置
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5

In [29]:
# 加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = BowDataset(train_data)
test_dataset = BowDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)

In [30]:
# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MLP(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) # 将模型加载到CPU或GPU设备

MLP(
  (embedding): EmbeddingBag(21402, 128, mode=mean)
  (linear1): Linear(in_features=128, out_features=256, bias=True)
  (linear2): Linear(in_features=256, out_features=2, bias=True)
)

In [31]:
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器

model.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
        inputs, offsets, targets = [x.to(device) for x in batch]
        log_probs = model(inputs, offsets)
        loss = nll_loss(log_probs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Loss: {total_loss:.2f}")

# 测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
    inputs, offsets, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = model(inputs, offsets)
        acc += (output.argmax(dim=1) == targets).sum().item()

# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")


Loss: 165.52



Loss: 137.04



Loss: 103.05



Loss: 71.66



Loss: 47.36



Acc: 0.72


## LSTM 情感分类

In [32]:
class LstmDataset(Dataset):
    def __init__(self, data):
        self.data = data
    def __len__(self):
        return len(self.data)
    def __getitem__(self, i):
        return self.data[i]

In [33]:
def collate_fn(examples):
    lengths = torch.tensor([len(ex[0]) for ex in examples])
    inputs = [torch.tensor(ex[0]) for ex in examples]
    targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
    # 对batch内的样本进行padding，使其具有相同长度
    inputs = pad_sequence(inputs, batch_first=True)
    return inputs, lengths, targets

In [34]:
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence

class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        super(LSTM, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.output = nn.Linear(hidden_dim, num_class)

    def forward(self, inputs, lengths):
        embeddings = self.embeddings(inputs)
        x_pack = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
        hidden, (hn, cn) = self.lstm(x_pack)
        outputs = self.output(hn[-1])
        log_probs = F.log_softmax(outputs, dim=-1)
        return log_probs

In [35]:
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5

#加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = LstmDataset(train_data)
test_dataset = LstmDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)

In [36]:
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTM(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中（如果已经正确安装）

LSTM(
  (embeddings): Embedding(21402, 128)
  (lstm): LSTM(128, 256, batch_first=True)
  (output): Linear(in_features=256, out_features=2, bias=True)
)

In [37]:
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器

model.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
        inputs, lengths, targets = [x.to(device) for x in batch]
        log_probs = model(inputs, lengths)
        loss = nll_loss(log_probs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Loss: {total_loss:.2f}")

#测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
    inputs, lengths, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = model(inputs, lengths)
        acc += (output.argmax(dim=1) == targets).sum().item()

#输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")


Loss: 167.16



Loss: 139.24



Loss: 100.18



Loss: 62.00



Loss: 28.26



Acc: 0.71
