#  作業: 使用 LSTM 做文本情感分析

##[作業目標]

*   使用 Pytorch 提供的 LSTM 方法來做情感(情緒)的分析
*   期望達到不錯的準確度，84% 以上

##[作業重點]

*   學會使用 torchtext dataset 來使用 IMDB 資料集
*   搭建 LSTM 網路

## 準備資料

torchtext 包含以下 components：

Field : 主要包含以下數據預處理的配置信息：指定分詞方法、是否轉成小寫、起始符號、以及字典等等。

Dataset : 用於下載數據，也提供 splits 方法可以同時下載訓練資料、驗證資料和測試資料。

Iterator : 數據讀取的迭代器，可以支持 batch

我們定義 SEED、TEXT 和 LABEL 三個變數來隨機把資料集分割成 train/valid/test 三個資料集。

In [1]:
# Mount Google drive to Colab
from google.colab import drive

drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.legacy import data
from torchtext import datasets
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords, wordnet
import nltk
import re
from tqdm.notebook import tqdm
import json
import time
import random

SEED = 1234

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [3]:
# 自定義 preprocessing 所需函式
nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')

lemmatizer = WordNetLemmatizer()
stop = stopwords.words('english')

def get_pos(word):
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = { 'J': wordnet.ADJ,
                 'V': wordnet.VERB,
                 'N': wordnet.NOUN,
                 'R': wordnet.ADV}
    return tag_dict.get(tag, wordnet.NOUN)

def preProcess(words):
    
    newWords = []
    for w in words:
        if w not in stop and len(w)>2:
            w = lemmatizer.lemmatize(w, pos=get_pos(w))
            newWords.append(w)
            
    
    sentence = ' '.join(newWords)
    sentence = re.sub('[\'-]', '', sentence)
    sentence = re.sub('[^a-zA-Z]', ' ', sentence)
    
    return sentence.split()


TEXT = data.Field(tokenize = 'spacy', lower=True, include_lengths = True, preprocessing=preProcess)
LABEL = data.LabelField(sequential=False, dtype = torch.float)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


## 下載並讀取資料

torchtext 的 datasets 集合裡面就有 IMDB 資料，直接就可以讀取訓練以及測試資料了。

In [4]:
train_data, test_data = datasets.IMDB()

train_examples = []
test_examples = []

for label, text in tqdm(train_data):
    train_examples.append(data.Example.fromlist(data=[text, label], fields=[('text', TEXT), ('label', LABEL)]))

for label, text in tqdm(test_data):
    test_examples.append(data.Example.fromlist(data=[text, label], fields=[('text', TEXT), ('label', LABEL)]))

HBox(children=(FloatProgress(value=0.0, max=25000.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=25000.0), HTML(value='')))




## 儲存 Examples

前處理需要時間，把 Examples 儲存成 json 檔案，方便重複使用

In [5]:
def save_examples(examples, savepath):
    with open(savepath, 'w') as f:
        # Save num. elements (not really need it)
        
        f.write(json.dumps(len(examples)))  # Write examples length
        f.write("\n")

        # Save elements
        for pair in examples:
            data = [pair.text, pair.label]
            f.write(json.dumps(data))  # Write samples
            f.write("\n")

def load_examples(filename):
    examples = []
    start = time.time()
    with open(filename, 'r') as f:
        # Read num. elements (not really need it)
        total = json.loads(f.readline())

        # Save elements
        for i in range(total):
            line = f.readline()
            example = json.loads(line)
            #example = data.Example.fromlist(data=example, fields=[('text', TEXT), ('label', LABEL)])
            #example = data.Example().fromlist(example, fields)  # Create Example obj. (you can do it here or later)
            examples.append(example)

    end = time.time()
    print(end - start)
    return examples

In [6]:
# save examples
save_examples(train_examples, './gdrive/My Drive/AI/NLP_Part2/train_examples.json')
save_examples(test_examples, './gdrive/My Drive/AI/NLP_Part2/test_examples.json')

In [7]:
# load examples and create datasets
# 重新定義 Field 把前處理動作去除，避免要再花費時間做前處理

TEXT = data.Field(tokenize = 'spacy', include_lengths = True)
LABEL = data.LabelField(sequential=False, dtype = torch.float)

train_examples = load_examples('./gdrive/My Drive/AI/NLP_Part2/train_examples.json')

train_examples = [data.Example.fromlist(data=d, fields=[('text', TEXT),('label', LABEL)]) for d in tqdm(train_examples)]
test_examples = load_examples('./gdrive/My Drive/AI/NLP_Part2/test_examples.json')
test_examples = [data.Example.fromlist(data=d, fields=[('text', TEXT),('label', LABEL)]) for d in tqdm(test_examples)]

train_data = data.Dataset(examples=train_examples, fields={'text':TEXT, 'label':LABEL})
test_data = data.Dataset(examples=test_examples, fields={'text':TEXT, 'label':LABEL})

0.4720315933227539


HBox(children=(FloatProgress(value=0.0, max=25000.0), HTML(value='')))


0.4167478084564209


HBox(children=(FloatProgress(value=0.0, max=25000.0), HTML(value='')))




In [8]:
# Check content
train_examples[0].text

['rent',
 'curious',
 'yellow',
 'video',
 'store',
 'controversy',
 'surround',
 'first',
 'release',
 'also',
 'heard',
 'first',
 'seize',
 'u',
 's',
 'custom',
 'ever',
 'try',
 'enter',
 'country',
 'therefore',
 'fan',
 'film',
 'consider',
 'controversial',
 'really',
 'see',
 'myself',
 'br',
 'br',
 'the',
 'plot',
 'center',
 'around',
 'young',
 'swedish',
 'drama',
 'student',
 'name',
 'lena',
 'want',
 'learn',
 'everything',
 'life',
 'particular',
 'want',
 'focus',
 'attention',
 'make',
 'sort',
 'documentary',
 'average',
 'swede',
 'thought',
 'certain',
 'political',
 'issue',
 'vietnam',
 'war',
 'race',
 'issue',
 'united',
 'state',
 'ask',
 'politician',
 'ordinary',
 'denizen',
 'stockholm',
 'opinion',
 'politics',
 'sex',
 'drama',
 'teacher',
 'classmate',
 'married',
 'men',
 'br',
 'br',
 'what',
 'kill',
 'curious',
 'yellow',
 'year',
 'ago',
 'consider',
 'pornographic',
 'really',
 'sex',
 'nudity',
 'scene',
 'far',
 'even',
 'shot',
 'like',
 'chea

## 從訓練資料裡面切割驗證資料

從訓練資料裡面抓取一些資料當作 validation set

In [9]:
train_data, valid_data = train_data.split(random_state = random.seed(SEED))

# 建立字典

接下來是使用預訓練的 word embeddings。只要呼叫 TorchText 的 build_vocab 就可以把所有的文字向量化, 我們使用的是 "glove.6B.100d" 的向量，glove 是一個用來計算詞向量的演算法。6B 是指這些詞向量是用了60億個tokens訓練出來的，而 100d 是指每一個向量的維度是 100。

In [10]:
MAX_VOCAB_SIZE = 25000

TEXT.build_vocab(train_data, 
                 max_size = MAX_VOCAB_SIZE, 
                 vectors = "glove.6B.100d",
                 unk_init = torch.Tensor.normal_)

LABEL.build_vocab(train_data)

# 建立Iterator

Iterator 是 torchtext 到模型的輸出，提供了對數據的打亂、排序等等處理方法。可以動態修改 batch size這裡使用 splits method 來同時輸出訓練集、驗證集以及測試集。

如果有 GPU 的話則使用 cuda 來做運算。

`sort_within_batch = True` 是表示在每一個 batch 裡面的 tensors 是依照長度排序的。

In [11]:
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    sort_within_batch = True,
    sort_key=lambda ex:len(ex.text),
    device = device)


# 建立 LSTM 模型
我們將使用 pytorch 內建的 RNN 架構是 LSTM (Long Short-Term Memory)模型。它的公式如下：

$(h_t, c_t) = \text{LSTM}(x_t, h_t, c_t)$


步驟解釋：

1. 在模型裡，每個詞會先通過 embedding layer 的到特徵向量
2. 然後我們使用 LSTM 對特徵序列進一步編碼得到序列信息。
3. 將編碼後的序列信息通過全連接層(Fully connectivity layer)得到輸出。

In [12]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout)
        
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        
        #text = [sent len, batch size]
        
        embedded = self.dropout(self.embedding(text))
        
        #embedded = [sent len, batch size, emb dim]
        
        #pack sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu())
        
        packed_output, (hidden, cell) = self.rnn(packed_embedded)
        
        #unpack sequence
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

        #output = [sent len, batch size, hid dim * num directions]
        #output over padding tokens are zero tensors
        
        #hidden = [num layers * num directions, batch size, hid dim]
        #cell = [num layers * num directions, batch size, hid dim]
        
        #concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
        #and apply dropout
        
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))
                
        #hidden = [batch size, hid dim * num directions]
            
        return self.fc(hidden)

# LSTM 模型參數說明

1. vocab_size: 輸入層的維度(input dim)
2. embedding_dim: 詞向量的維度, 我們使用的是 glove.6B.100d, 所以這裡 embedding_dim 是 100
3. hidden_dim: the size of the hidden states
4. output_dim: 輸出層的維度
5. n_layers: 有幾層全連結層
6. bidirectional: 是否使用雙向 RNN
7. dropout： dropout 的比例
8. pad_idx: token <pad> 的 index

In [13]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = False
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]

model = LSTM(INPUT_DIM, 
            EMBEDDING_DIM, 
            HIDDEN_DIM, 
            OUTPUT_DIM, 
            N_LAYERS, 
            BIDIRECTIONAL, 
            DROPOUT,
            PAD_IDX)

In [14]:
# 印出我們模型的參數量
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 3,393,641 trainable parameters


# 檢查 embedding 的字典大小以及 embedding 的維度

_**[vocab size, embedding dim]**_ 

In [15]:
pretrained_embeddings = TEXT.vocab.vectors

print(pretrained_embeddings.shape)

torch.Size([25002, 100])


用 pre-trained embeddings 來當作 `embedding` 層的初始化參數

In [16]:
model.embedding.weight.data.copy_(pretrained_embeddings)

tensor([[-0.1117, -0.4966,  0.1631,  ...,  1.2647, -0.2753, -0.1325],
        [-0.8555, -0.7208,  1.3755,  ...,  0.0825, -1.1314,  0.3997],
        [ 0.1979,  0.2526, -0.2831,  ..., -0.4406, -0.7426, -0.3215],
        ...,
        [-0.7814,  0.2898, -0.0829,  ..., -0.5352,  0.8679, -1.1157],
        [ 0.4722, -0.0494,  0.2172,  ..., -0.9187,  0.3444, -0.0422],
        [ 0.2328,  0.8056,  0.4982,  ..., -0.1569,  0.3015,  0.2311]])

因為`<unk>` and `<pad>`是沒有在 pre-trained 的詞裡面，所以要把 `<unk>` 和 `<pad>`的 初始 embedding 權重都變成 0

In [17]:
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

print(model.embedding.weight.data)

tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.1979,  0.2526, -0.2831,  ..., -0.4406, -0.7426, -0.3215],
        ...,
        [-0.7814,  0.2898, -0.0829,  ..., -0.5352,  0.8679, -1.1157],
        [ 0.4722, -0.0494,  0.2172,  ..., -0.9187,  0.3444, -0.0422],
        [ 0.2328,  0.8056,  0.4982,  ..., -0.1569,  0.3015,  0.2311]])


We can now see the first two rows of the embedding weights matrix have been set to zeros. As we passed the index of the pad token to the `padding_idx` of the embedding layer it will remain zeros throughout training, however the `<unk>` token embedding will be learned.

# 訓練模型

使用優化器 Adam

使用 BCEWithLogitsLoss 當作 Loss Function

實作計算計算準確度的函式

In [18]:
optimizer = optim.Adam(model.parameters())

criterion = nn.BCEWithLogitsLoss()

model = model.to(device)
criterion = criterion.to(device)

def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum() / len(correct)
    return acc

# 訓練函式

In [19]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        text, text_lengths = batch.text
        
        predictions = model(text, text_lengths).squeeze(1)
        
        loss = criterion(predictions, batch.label)
        
        acc = binary_accuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# 測試模型的方法

In [20]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            text, text_lengths = batch.text
            
            predictions = model(text, text_lengths).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [21]:
# 查看訓練進度以及花費的時間
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [23]:
# 開始訓練

N_EPOCHS = 10


best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut2-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epoch: 01 | Epoch Time: 0m 9s
	Train Loss: 0.625 | Train Acc: 64.66%
	 Val. Loss: 0.682 |  Val. Acc: 59.07%
Epoch: 02 | Epoch Time: 0m 9s
	Train Loss: 0.666 | Train Acc: 59.99%
	 Val. Loss: 0.664 |  Val. Acc: 59.34%
Epoch: 03 | Epoch Time: 0m 9s
	Train Loss: 0.495 | Train Acc: 76.07%
	 Val. Loss: 0.421 |  Val. Acc: 81.05%
Epoch: 04 | Epoch Time: 0m 9s
	Train Loss: 0.318 | Train Acc: 86.82%
	 Val. Loss: 0.300 |  Val. Acc: 87.35%
Epoch: 05 | Epoch Time: 0m 9s
	Train Loss: 0.243 | Train Acc: 90.51%
	 Val. Loss: 0.294 |  Val. Acc: 87.96%
Epoch: 06 | Epoch Time: 0m 9s
	Train Loss: 0.200 | Train Acc: 92.48%
	 Val. Loss: 0.278 |  Val. Acc: 89.25%
Epoch: 07 | Epoch Time: 0m 9s
	Train Loss: 0.161 | Train Acc: 94.14%
	 Val. Loss: 0.289 |  Val. Acc: 88.97%
Epoch: 08 | Epoch Time: 0m 9s
	Train Loss: 0.135 | Train Acc: 95.08%
	 Val. Loss: 0.323 |  Val. Acc: 88.74%
Epoch: 09 | Epoch Time: 0m 8s
	Train Loss: 0.119 | Train Acc: 95.76%
	 Val. Loss: 0.329 |  Val. Acc: 89.13%
Epoch: 10 | Epoch Time: 0m 9

In [24]:
# 測試模型的準確度
model.load_state_dict(torch.load('tut2-model.pt'))

test_loss, test_acc = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.330 | Test Acc: 86.78%


# Demo 函式

最後我們來建立一個 Demo 的函式讓使用者可以輸入任意的句子來看看模型是否可以正確的做好情緒分類。

In [25]:
import spacy
nlp = spacy.load('en')

def predict_sentiment(model, sentence):
    model.eval()
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    length = [len(indexed)]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(1)
    length_tensor = torch.LongTensor(length)
    prediction = torch.sigmoid(model(tensor, length_tensor))
    return prediction.item()

# Demo

In [36]:
sentences = ["This film is bad", "This film is good"]
for sentence in sentences:
  if predict_sentiment(model, sentence) < 0.5:
    print("{} --> Positive".format(sentence))
  else:
    print("{} --> Negtive".format(sentence))

This film is bad --> Negtive
This film is good --> Positive
