In [1]:
import gensim
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import Counter
from torch.utils.data import TensorDataset, DataLoader

In [2]:
def build_word2id(save_to_path=None):
    """
    :param save_to_path: path to save word2id
    :return: word2id dictionary {word: id}
    """
    word2id = {'_PAD_': 0}
    path = ['./Dataset/train.txt', './Dataset/validation.txt']
    
    # write the index to word2id[word]
    for _path in path:
        with open(_path, encoding='utf-8') as f:
            for line in f.readlines():
                sp = line.strip().split()
                for word in sp[1:]:
                    if word not in word2id.keys():
                        word2id[word] = len(word2id)
    if save_to_path:                    
        with open(save_to_path, 'w', encoding='utf-8') as f:
            for w in word2id:
                f.write(w+'\t')
                f.write(str(word2id[w]))
                f.write('\n')
    
    return word2id

In [3]:
word2id = build_word2id('./Dataset/word2id.txt')
print(type(word2id), len(word2id))

In [4]:
def build_word2vec(fname, word2id, save_to_path=None):
    """
    :param fname: pre-trained word2vec by others
    :param word2id: built word2id by us
    :param save_to_path: path to save word2vec
    :return: wordid_vecs means wordid to wordvector dictionary {id: word2vec}
    """
    n_words = max(word2id.values()) + 1
    model = gensim.models.KeyedVectors.load_word2vec_format(fname, binary=True)
    wordid_vecs = np.array(np.random.uniform(-1., 1., [n_words, model.vector_size]))
    for word in word2id.keys():
        try:
            wordid_vecs[word2id[word]] = model[word]
        except KeyError:
            pass
    if save_to_path:
        with open(save_to_path, 'w', encoding='utf-8') as f:
            for vec in wordid_vecs:
                vec = [str(w) for w in vec]
                f.write(' '.join(vec))
                f.write('\n')
    return wordid_vecs

In [5]:
word2vec = build_word2vec('./Dataset/wiki_word2vec_50.bin', word2id)
assert word2vec.shape == (58954, 50)
print(word2vec)

In [6]:
def class_to_id(classes=None):
    """
    :param classes: label to classify, default is 0:pos, 1:neg
    :return: classes = ['0', '1'], {classes：id} = {'0': 0, '1': 1}
    """
    if not classes:
        classes = ['0', '1']
    clas2id = {clas: idx for (idx, clas) in enumerate(classes)}
    return classes, clas2id

In [7]:
def load_corpus(path, word2id, max_sen_len=50):
    """
    :param 
        path: sample corpus file
        word2id: built word2id by us
    :return
        contents: array, text to id;
        labels_arr: array, (len,)
        labels_onehot: array, onehot format, (len, 2)
    """
    _, clas2id = class_to_id()
    contents, labels = [], []
    with open(path, encoding='utf-8') as f:
        for line in f.readlines():
            sp = line.strip().split()
            # print(sp)
            label = sp[0]
            content = [word2id.get(w, 0) for w in sp[1:]]
            content = content[:max_sen_len]
            if len(content) < max_sen_len:
                content += [word2id['_PAD_']] * (max_sen_len - len(content))
            labels.append(label)
            contents.append(content)
    counter = Counter(labels)
    print('总样本数为：%d' % (len(labels)))
    print('各个类别样本数如下：')
    for w in counter:
        print(w, counter[w])

    contents = np.asarray(contents)
    
    labels_arr = np.array([clas2id[l] for l in labels])
    
    labels_onehot = np.array([[0,0]] * len(labels))
    for idx, val in enumerate(labels):
        if val == '0':
            labels_onehot[idx][0]=1
        else:
            labels_onehot[idx][1]=1

    return contents, labels_arr, labels_onehot

In [8]:
print('train corpus load: ')
train_contents, train_labels, _ = load_corpus('./Dataset/train.txt', word2id, max_sen_len=50)
print('\nvalidation corpus load: ')
val_contents, val_labels, _ = load_corpus('./Dataset/validation.txt', word2id, max_sen_len=50)
print('\ntest corpus load: ')
test_contents, test_labels, _ = load_corpus('./Dataset/test.txt', word2id, max_sen_len=50)

In [9]:
class CONFIG():
    update_w2v = True           # update w2v during training or not
    vocab_size = 58954          # vocabulary size same as the # of word2id
    n_class = 2                 # the # of classes
    embedding_dim = 50          # dimension of wordvector
    drop_keep_prob = 0.5        # dropout layer, the rate of parameter 'keep'
    num_filters = 256           # the # of filter in convolution layer
    kernel_size = 3             # the size of kernel in convolution layer
    pretrained_embed = word2vec # pretrained word embedding model

In [10]:
class TextCNN(nn.Module):
    def __init__(self, config):
        super(TextCNN, self).__init__()
        update_w2v = config.update_w2v
        vocab_size = config.vocab_size
        n_class = config.n_class
        embedding_dim = config.embedding_dim
        num_filters = config.num_filters
        kernel_size = config.kernel_size
        drop_keep_prob = config.drop_keep_prob
        pretrained_embed = config.pretrained_embed
        
        # Use the pre-trained wordvector
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight.data.copy_(torch.from_numpy(pretrained_embed))
        self.embedding.weight.requires_grad = update_w2v
        # Convolution layer
        self.conv = nn.Conv2d(1,num_filters,(kernel_size,embedding_dim))
        # Dropout
        self.dropout = nn.Dropout(drop_keep_prob)
        # Full connection layer
        self.fc = nn.Linear(num_filters, n_class)

    def forward(self, x):
        x = x.to(torch.int64)
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x = F.relu(self.conv(x)).squeeze(3)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [11]:
config = CONFIG()          # config the parameters of model
learning_rate = 0.001      # learn rate     
batch_size = 32            # batch size
epochs = 4                 # epoches
model_path = None          # path of pre-trained model
verbose = True             # print the training process
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

In [12]:
# Mix the contents & labels of train & validation dataset as train_dataloader
contents = np.vstack([train_contents, val_contents])
labels = np.concatenate([train_labels, val_labels])
train_dataset = TensorDataset(torch.from_numpy(contents).type(torch.float), 
                              torch.from_numpy(labels).type(torch.long))
train_dataloader = DataLoader(dataset = train_dataset, batch_size = batch_size, 
                              shuffle = True, num_workers = 2)

In [13]:
def train(dataloader):

    # config the model, load the pretrained model if model_path
    model = TextCNN(config)
    if model_path:
        model.load_state_dict(torch.load(model_path))
    model.to(device)
    
    # set optimizer & loss
    optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
    criterion = nn.CrossEntropyLoss()

    # circuit train
    for epoch in range(epochs):
        for batch_idx, (batch_x, batch_y) in enumerate(dataloader):
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            output = model(batch_x)
            loss = criterion(output, batch_y)
            
            if batch_idx % 200 == 0 & verbose:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch+1, batch_idx * len(batch_x), len(dataloader.dataset),
                    100. * batch_idx / len(dataloader), loss.item()))
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # save model
    torch.save(model.state_dict(), 'model.pth')

In [14]:
train(train_dataloader)

In [15]:
# set test parameters
model_path = 'model.pth'
batch_size = 32

In [16]:
# load test dataset as test_dataloader
test_dataset = TensorDataset(torch.from_numpy(test_contents).type(torch.float), 
                            torch.from_numpy(test_labels).type(torch.long))
test_dataloader = DataLoader(dataset = test_dataset, batch_size = batch_size, 
                            shuffle = False, num_workers = 2)

In [21]:
def predict(dataloader):

    # load trained model
    model = TextCNN(config)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    model.to(device)
    
    # circuit test
    count, correct, real_predict_00, real_predict_01, real_predict_10, real_predict_11 = 0, 0, 0, 0, 0, 0
    for _, (batch_x, batch_y) in enumerate(dataloader):
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        output = model(batch_x)
        count += len(batch_x)
        correct += (output.argmax(1) == batch_y).float().sum().item()
#         print(np.array([(output.argmax(1)[idx] == 0 and batch_y[idx] == 0).float().cpu().numpy() for (idx, _) in enumerate(batch_y)]))
#         print([(output.argmax(1)[idx] == 0 and batch_y[idx] == 0).float().cpu().numpy() for (idx, _) in enumerate(batch_y)])
        real_predict_00 += np.array([(output.argmax(1)[idx] == 0 and batch_y[idx] == 0).float().cpu().numpy() for (idx, _) in enumerate(batch_y)]).sum().item()
        real_predict_01 += np.array([(output.argmax(1)[idx] == 0 and batch_y[idx] == 1).float().cpu().numpy() for (idx, _) in enumerate(batch_y)]).sum().item()
        real_predict_10 += np.array([(output.argmax(1)[idx] == 1 and batch_y[idx] == 0).float().cpu().numpy() for (idx, _) in enumerate(batch_y)]).sum().item()
        real_predict_11 += np.array([(output.argmax(1)[idx] == 1 and batch_y[idx] == 1).float().cpu().numpy() for (idx, _) in enumerate(batch_y)]).sum().item()
    
    # calculate accuracy, precision, recall, F1_score, confusion_matrix
    accuracy = correct/count
    precision = real_predict_00 / (real_predict_00 + real_predict_10)
    recall = real_predict_00 / (real_predict_00 + real_predict_01)
    F1_score = 2*precision*recall/(precision+recall)
    confusion_matrix = [[real_predict_00, real_predict_01], [real_predict_10, real_predict_11]]
    print('The accuracy, precision, recall, F1_score, confusion_matrix of test is\n{:.2f}% \n{} \n{} \n{} \n{}.'.format(100*accuracy, precision, recall, F1_score, confusion_matrix))

In [22]:
predict(test_dataloader)