In [1]:
import collections
import os
import random
import time
from tqdm import tqdm
import torch
from torch import nn
import torch.utils.data as Data
import torch.nn.functional as F
import numpy as np

In [2]:
def decode_file(file_ID_name, file_result_name):
    file_ID = open(file_ID_name, "r")
    file_result = open(file_result_name, "r")
    data = []
    
    for line1, line2 in zip(file_ID, file_result):
        feature = line1.replace('\n','').lower().split(' ')
        label = line2.replace('\n','').lower().split(',')
        label = [int(x) for x in label]
        data.append([feature, label[1:]])
    
    random.shuffle(data)
    file_ID.close()
    file_result.close()
    return data

folder_name = "/.cached/data/"
train_ID_name = folder_name + "ID_train"
train_result_name = folder_name + "ISEAR_train"
train_data = decode_file(train_ID_name, train_result_name)

test_ID_name = folder_name + "ID_test"
test_result_name = folder_name + "ISEAR_test"
test_data = decode_file(train_ID_name, test_result_name)

In [3]:
class MyVocab:
    def __init__(self):
        self._vocab = {}
        self._size = 1
        
    def insert(self, word):
        if word not in self._vocab.keys():
            self._vocab[word] = self._size
            self._size = self._size + 1
    
    def locate(self, word):
        if word not in self._vocab.keys():
            return 0
        return self._vocab[word]
    
    def size(self):
        return self._size

def build_vocab(data):
    vocab = MyVocab()
    for sentence in data:
        for word in sentence[0]:
            vocab.insert(word)
    return vocab

def resize_sentence(data, normal_len):
    def pad(sentence, size):
        return sentence[:size] if len(sentence) > size else sentence+[0]*(size-len(sentence))
    return [[pad(sentence[0], normal_len), sentence[1]] for sentence in data]

def build_features_and_labels(vocab, data):
    data = resize_sentence(data, 180)
    features = []
    labels = []
    for sentence in data:
        features.append([vocab.locate(x) for x in sentence[0]])
        labels.append(np.argmax(sentence[1][1:]))
    return torch.tensor(features), torch.tensor(labels)

train_vocab = build_vocab(train_data)
train_features, train_labels = build_features_and_labels(train_vocab, train_data)
test_features, test_labels = build_features_and_labels(train_vocab, test_data)
train_set = Data.TensorDataset(train_features, train_labels)
test_set = Data.TensorDataset(train_features, train_labels)

In [4]:
train_set = Data.TensorDataset(train_features, train_labels)
test_set = Data.TensorDataset(train_features, train_labels)

batch_size = 13
train_iter = Data.DataLoader(train_set, batch_size, shuffle=True)
test_iter = Data.DataLoader(test_set, batch_size)

In [8]:
class BiRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, num_emotions=7):
        '''
        @params:
            vocab: 在数据集上创建的词典，用于获取词典大小
            embed_size: 嵌入维度大小
            num_hiddens: 隐藏状态维度大小
            num_layers: 隐藏层个数
        '''
        super(BiRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # encoder-decoder framework
        # bidirectional设为True即得到双向循环神经网络
        self.encoder = nn.LSTM(input_size=embed_size, 
                                hidden_size=num_hiddens, 
                                num_layers=num_layers,
                                bidirectional=True)
        self.decoder = nn.Linear(4*num_hiddens, num_emotions) # 初始时间步和最终时间步的隐藏状态作为全连接层输入
        
    def forward(self, inputs):
        '''
        @params:
            inputs: 词语下标序列，形状为 (batch_size, seq_len) 的整数张量
        @return:
            outs: 对文本情感的预测，形状为 (batch_size, 2) 的张量
        '''
        # 因为LSTM需要将序列长度(seq_len)作为第一维，所以需要将输入转置
        embeddings = self.embedding(inputs.permute(1, 0)) # (seq_len, batch_size, d)
        # rnn.LSTM 返回输出、隐藏状态和记忆单元，格式如 outputs, (h, c)
        outputs, _ = self.encoder(embeddings) # (seq_len, batch_size, 2*h)
        encoding = torch.cat((outputs[0], outputs[-1]), -1) # (batch_size, 4*h)
        outs = self.decoder(encoding) # (batch_size, 2)
        return outs

embed_size, num_hiddens, num_layers = 100, 100, 2
net = BiRNN(train_vocab.size(), embed_size, num_hiddens, num_layers)

In [9]:
def evaluate_accuracy(data_iter, net, device=None):
    if device is None and isinstance(net, torch.nn.Module):
        device = list(net.parameters())[0].device 
    acc_sum, n = 0.0, 0
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(net, torch.nn.Module):
                net.eval()
                acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                net.train()
            else:
                if('is_training' in net.__code__.co_varnames):
                    acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item() 
                else:
                    acc_sum += (net(X).argmax(dim=1) == y).float().sum().item() 
            n += y.shape[0]
    return acc_sum / n

def train(train_iter, test_iter, net, loss, optimizer, device, num_epochs):
    net = net.to(device)
    print("training on ", device)
    batch_count = 0
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n, start = 0.0, 0.0, 0, time.time()
        for X, y in train_iter:
            X = X.to(device)
            y = y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y) 
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            train_l_sum += l.cpu().item()
            train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
            n += y.shape[0]
            batch_count += 1
        test_acc = evaluate_accuracy(test_iter, net)
        print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
              % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))

In [None]:
lr, num_epochs = 0.01, 5
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, net.parameters()), lr=lr)
loss = nn.CrossEntropyLoss()

train(train_iter, test_iter, net, loss, optimizer, torch.device('cpu'), num_epochs)

training on  cpu
epoch 1, loss 1.6395, train acc 0.347, test acc 0.525, time 129.2 sec
epoch 2, loss 0.6162, train acc 0.533, test acc 0.696, time 130.0 sec
