In [None]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(1)
torch.__version__

# CNN text classification model

In [None]:
# B: Batch size
# E: Embedding dim, also the input weigth
# L: The input sequence length, also the input height
# K: Number of kernels
# F: Number features per kernel
class CNNText(nn.Module):
    def __init__(self, vocab_size, embedding_dim, wind_sizes, feat_per_wind, drop_prob, nclasses):
        # [B, E, L]: shape of inputs
        super(CNNText, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.word_embeddings = nn.Embedding(self.vocab_size, self.embedding_dim)
        
        # Use Conv2d to do 1d convolution,
        # kernel_sizes: A list of integers defining window sizes. Window sizes are the same as one dim of kernel sizes.
        self.kernel_sizes = [(s, self.embedding_dim) for s in wind_sizes]
        self.conv2d_layers = nn.ModuleList([nn.Conv2d(in_channels=1, out_channels=feat_per_wind, kernel_size=s) for s in self.kernel_sizes])
        for conv2d in self.conv2d_layers:
            nn.init.xavier_normal_(conv2d.weight)
            
        self.linear = nn.Linear(in_features=len(self.conv2d_layers)*feat_per_wind, out_features=nclasses)
        nn.init.normal_(self.linear.weight)
        
        self.dropout = nn.Dropout(p=drop_prob)
        
        self.loss_fn = nn.CrossEntropyLoss()
        
    def forward(self, words_indices):
        # input: words indices in shape of [B, L]
        embeddings = self.word_embeddings(words_indices).unsqueeze(1)  # [B, L] -> [B, L, E] -> [B, 1, L, E]
        c = []
        for conv2d in self.conv2d_layers:
            x = F.relu(conv2d(embeddings)).squeeze(dim=3)    # [B, 1, L, E] -> [B, F, L', 1] -> [B, F, L']
            x, _ = torch.max(x, dim=2)         # [B, F]
            c += [x]

        y = torch.cat(c, dim=1)                # [B, F*K]
        logits = self.linear(self.dropout(y))  # [B, nclasses]
        return logits
    
    def loss(self, words_indices, labels):
        logits = self.forward(words_indices)
        loss = self.loss_fn(logits, labels)
        return loss

# Dataset

In [None]:
from torch.utils.data import Dataset, DataLoader

#TODO need build tokens and tags vocab.
    
class MRDataset(Dataset):
    def __init__(self):
        self.dataset = []
        self.word2idx = {}
        fpos = open('data/rt-polaritydata/rt-polaritydata/rt-polarity.pos','r', encoding='latin-1')
        for line in fpos:
            line = line.strip()
            if line == '':
                continue

            tokens = line.split()
            self.dataset += [(tokens, 1)]
            for w in tokens:
                if w not in self.word2idx:
                    self.word2idx[w] = len(self.word2idx)
            
        fneg = open('data/rt-polaritydata/rt-polaritydata/rt-polarity.neg','r', encoding='latin-1')
        for line in fneg:
            line = line.strip()
            if line == '':
                continue
            tokens = line.split()
            self.dataset += [(tokens, 0)]
            for w in tokens:
                if w not in self.word2idx:
                    self.word2idx[w] = len(self.word2idx)
        
        self.word2idx['<pad>'] = len(self.word2idx)
        self.word2idx['<unk>'] = len(self.word2idx)
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        
        tokens, label = self.dataset[idx]
        words_indices = [self.word2idx[w] for w in tokens]
        
        return words_indices, label

    def vocab_size(self):
        return len(self.word2idx)
    
    def vocab(self):
        return self.word2idx.copy()


def mrdataset_collate(batch):
    # batch: a list of tuples of (tokens, tags)
    # ret: a tensor for word indices and a tensor for labels
    max_len = max([len(tokens) for tokens, _ in batch])
    batched_tokens = []
    batched_labels = []
    for tokens, label in batch:
        tokens += [dataset.word2idx['<pad>']] * (max_len - len(tokens))
        
        batched_tokens +=[tokens]  # [B,L]
        batched_labels += [label]  # [B]
    
    return torch.tensor(batched_tokens, dtype=torch.int64), torch.tensor(batched_labels, dtype=torch.int64)


In [None]:
dataset = MRDataset()
loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=mrdataset_collate)

# Training

In [None]:
model = CNNText(dataset.vocab_size(), 32, [3,4,5], 64, 0.5, 2)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
running_avg_loss = 0.
step = 0
for i in range(10):
    print('epoch: ', i)
    for tokens, labels in loader:
        step += 1
        loss = model.loss(tokens, labels)
        
        running_avg_loss = 0.9 * running_avg_loss + 0.1 * loss.item()
        if step % 10 == 0:
            print('training loss: ', running_avg_loss)
            print('step loss: ', loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()