## 3. 使用CNN进行文本分类 

<img src='img/textcnn.jfif' width=500>

reference:
- https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html
- https://github.com/649453932/Chinese-Text-Classification-Pytorch/

In [39]:
import argparse

hparams = argparse.Namespace(**{
    'batch_size': 16,
    'learning_rate': 0.004,
    # 'max_grad_norm': 1.,
    'max_length': 2000,
    'dropout': 0.2,
    'embedding_dim': 200,
    'hidden_dim': 200,
    'seed': 42,
    'num_filters': 200,
    'filter_sizes': [1, 2, 3],
    'num_train_epochs': 20,
    'model_save_path': 'data/save_model/textcnn.path',
})

hparams

Namespace(batch_size=16, learning_rate=0.004, max_length=2000, dropout=0.2, embedding_dim=200, hidden_dim=200, seed=42, num_filters=200, filter_sizes=[1, 2, 3], num_train_epochs=20, model_save_path='data/save_model/textcnn.path')

### 加载数据

In [40]:
from nltk.corpus import movie_reviews
import random
random.seed(hparams.seed)


def load_movie_reviews():
    pos_ids = movie_reviews.fileids('pos')
    neg_ids = movie_reviews.fileids('neg')

    all_reviews = []
    for pids in pos_ids:
        all_reviews.append((movie_reviews.raw(pids), 'positive'))
    
    for nids in neg_ids:
        all_reviews.append((movie_reviews.raw(nids), 'negative'))

    random.shuffle(all_reviews)
    train_reviews = all_reviews[:1600]
    test_reviews = all_reviews[1600:]

    return train_reviews, test_reviews

train_reviews, test_reviews = load_movie_reviews()
print('train:', len(train_reviews))
print('test:', len(test_reviews))

train: 1600
test: 400


### Tokenize

In [41]:
from nltk import word_tokenize


train_reviews_tokenized = []
train_labels = []

for review, label in train_reviews:
    label = 0 if label == 'negative' else 1
    tokenized = word_tokenize(review)

    train_labels.append(label)
    train_reviews_tokenized.append(tokenized)


test_reviews_tokenized = []
test_labels = []

for review, label in test_reviews:
    label = 0 if label == 'negative' else 1
    tokenized = word_tokenize(review)

    test_labels.append(label)
    test_reviews_tokenized.append(tokenized)

### 建立词表、将单词变成id

In [42]:
from collections import Counter
from torchtext.vocab import Vocab


counter = Counter()
for review in train_reviews_tokenized:# + test_reviews_tokenized:
    counter.update(review)

vocab = Vocab(counter, min_freq=1, specials=['<unk>', '<pad>', '<sos>', '<eos>'])

hparams.vocab_size = len(vocab)
hparams.pad_id = vocab['<pad>']
hparams.num_classes = 2

print(hparams.vocab_size)

42013


In [43]:
train_reviews_ids = [vocab.lookup_indices(review) for review in train_reviews_tokenized]
test_reviews_ids = [vocab.lookup_indices(review) for review in test_reviews_tokenized]

### 将数据打包为dataloader

In [44]:
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch


class TextDataset(Dataset):
    def __init__(self, reviews, labels):
        self.reviews = reviews
        self.labels = labels

    def __getitem__(self, index):
        return self.reviews[index], self.labels[index]

    def __len__(self):
        return len(self.reviews)


def collate_to_max_length(batch):
    X_batch = []
    y_batch = []
    for X, y in batch:
        if len(X) >= hparams.max_length:
            X = X[:hparams.max_length]
        else:
            X = X + [hparams.pad_id] * (hparams.max_length-len(X))

        X_batch.append(X)
        y_batch.append(y)

    return torch.tensor(X_batch), torch.tensor(y_batch)


train_dataset = TextDataset(train_reviews_ids, train_labels)
test_dataset = TextDataset(test_reviews_ids, test_labels)


train_dataloader = DataLoader(
    dataset=train_dataset, 
    batch_size=hparams.batch_size, 
    collate_fn=collate_to_max_length, 
    shuffle=True)

test_dataloader = DataLoader(
    dataset=test_dataset,
    batch_size=hparams.batch_size,
    collate_fn=collate_to_max_length,
    shuffle=False)

### 定义模型

In [45]:
from torch import nn
from torch.nn import functional as F


class TextCNN(nn.Module):
    def __init__(self, hparams):
        super().__init__()
        self.hparams = hparams    

        self.embedding = nn.Embedding(
            hparams.vocab_size, 
            hparams.embedding_dim, 
            padding_idx=hparams.pad_id)

        self.convs = nn.ModuleList([
            nn.Conv2d(1, hparams.num_filters, (k, hparams.embedding_dim))
            for k in hparams.filter_sizes
        ])
        self.dropout = nn.Dropout(hparams.dropout)

        hidden_size = hparams.num_filters * len(hparams.filter_sizes)
        self.classifier = nn.Linear(hidden_size, hparams.num_classes)
            
        # self.init_weights()

    def init_weights(self):
        for name, w in self.named_parameters():
            if 'weight' in name:
                # w.data.xavier_normal_()
                nn.init.xavier_normal_(w)
            elif 'bias' in name:
                w.data.zero_()

    def forward(self, x):
        # [B, L, embedding_dim]
        embed = self.embedding(x)
        # [B, 1, L, embedding_dim]
        embed = embed.unsqueeze(1)
        
        # [(B, num_filters), ...] => [(B, num_filters*len(filter_sizes))]
        hidden = torch.cat([self.conv_and_pool(embed, conv) for conv in self.convs], dim=1)
        hidden = self.dropout(hidden)
        logits = self.classifier(hidden)

        return logits

    def conv_and_pool(self, x, conv):
        # (B, 1, L, embedding_dim) => (B, 1, L, 1, num_filters)
        # (B, 1, L, 1, num_filters) => (B, 1, L, num_filters)
        x = F.relu(conv(x).squeeze(3))
        # (B, 1, L, num_filters) => (B, 1, num_filters)
        # (B, 1, num_filters) => (B, num_filters)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        return x

In [46]:
# vocab.load_vectors('glove.6B.200d')

In [47]:
model = TextCNN(hparams)

# model.embedding.weight.data.copy_(vocab.vectors)
# model.embedding.weight.requires_grad = False
if torch.cuda.is_available():
    model.cuda()

TextCNN(
  (embedding): Embedding(42013, 200, padding_idx=1)
  (convs): ModuleList(
    (0): Conv2d(1, 200, kernel_size=(1, 200), stride=(1, 1))
    (1): Conv2d(1, 200, kernel_size=(2, 200), stride=(1, 1))
    (2): Conv2d(1, 200, kernel_size=(3, 200), stride=(1, 1))
  )
  (dropout): Dropout(p=0.2, inplace=False)
  (classifier): Linear(in_features=600, out_features=2, bias=True)
)

In [48]:
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=hparams.learning_rate, momentum=0.9)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda epoch: 0.95**epoch)

In [49]:
from tqdm import tqdm

def train(model, dataloader, loss_func, optimizer, epoch_idx, hparams):
    model.train()
    
    pbar = tqdm(dataloader)
    pbar.set_description(f'Epoch {epoch_idx}')

    for X, y in pbar:
        if torch.cuda.is_available():
            X = X.cuda()
            y = y.cuda()
        
        optimizer.zero_grad()
        output = model(X)  # (B, 2)
        loss = loss_func(output, y)
        loss.backward()
        optimizer.step()

        pbar.set_postfix(loss=loss.item())

In [50]:
def evaluate(model, dataloader, loss_func):
    model.eval()
    size = len(dataloader.dataset)
    total_loss = 0.
    correct_num = 0

    with torch.no_grad():
        pbar = tqdm(dataloader)
        pbar.set_description('Valid')
        for X, y in pbar:
            if torch.cuda.is_available():
                X = X.cuda()
                y = y.cuda()
            output = model(X)
            
            loss = loss_func(output, y)
            total_loss += loss.item()
            
            correct_num = correct_num + (output.argmax(1) == y).float().sum().item()

        avg_loss = total_loss / len(dataloader)
        accuracy = correct_num / len(dataloader.dataset)

    return avg_loss, accuracy

In [51]:
best_val_loss = None
accuracy_at_lowest_loss = 0
best_accuracy = 0

for epoch_idx in range(hparams.num_train_epochs):
    train(model, train_dataloader, loss_func, optimizer, epoch_idx+1, hparams)
    scheduler.step()
    val_loss, accuracy = evaluate(model, test_dataloader, loss_func)
    best_accuracy = max(best_accuracy, accuracy)
    print(f'\r[Validation] loss: {val_loss:.4f}, accuracy: {accuracy:.4f}, LR: {scheduler.get_last_lr()}     ')

    if not best_val_loss or val_loss < best_val_loss:
        torch.save(model.state_dict(), hparams.model_save_path)
        print(f'\rsave model to {hparams.model_save_path}\n\n')
        best_val_loss = val_loss
        accuracy_at_lowest_loss = accuracy

print(f'accuracy_at_lowest_loss: {accuracy_at_lowest_loss}, best_accuracy: {best_accuracy}')


Epoch 1: 100%|██████████| 100/100 [00:12<00:00,  7.84it/s, loss=1.06]
Valid: 100%|██████████| 25/25 [00:01<00:00, 24.06it/s]
[Validation] loss: 2.4536, accuracy: 0.5050, LR: [0.0038]     
save model to data/save_model/textcnn.path


Epoch 2: 100%|██████████| 100/100 [00:13<00:00,  7.62it/s, loss=0.582]
Valid: 100%|██████████| 25/25 [00:01<00:00, 23.36it/s]
[Validation] loss: 0.5370, accuracy: 0.7475, LR: [0.00361]     
save model to data/save_model/textcnn.path


Epoch 3: 100%|██████████| 100/100 [00:13<00:00,  7.45it/s, loss=2.07]
Valid: 100%|██████████| 25/25 [00:01<00:00, 23.80it/s]
[Validation] loss: 2.1426, accuracy: 0.5375, LR: [0.0034295]     
Epoch 4: 100%|██████████| 100/100 [00:13<00:00,  7.24it/s, loss=0.557]
Valid: 100%|██████████| 25/25 [00:01<00:00, 21.25it/s]
[Validation] loss: 0.6729, accuracy: 0.7575, LR: [0.0032580249999999995]     
Epoch 5: 100%|██████████| 100/100 [00:13<00:00,  7.29it/s, loss=0.168]
Valid: 100%|██████████| 25/25 [00:01<00:00, 23.74it/s]
[Validation

In [None]:
拓展

- 如何理解textcnn中的卷积核和pooling层
- 如何确定卷积核的大小，调参？
    - RCNN[1]

[1] Lai, Siwei, et al. "Recurrent convolutional neural networks for text classification." AAAI2015.