<a href="https://colab.research.google.com/github/alexlimatds/victor-doc_classification/blob/main/victor_doc_classification_CNN5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Document classification of Victor project using a CNN as machine learning model

The notebook replicates the document classification with a CNN described in the _VICTOR: a Dataset for Brazilian Legal Documents Classification_ paper. In addition, it uses weights in the loss function to compesate the data imbalance.

- Deep learning library: PyTorch
- NLP Library: spaCy

### Instaling dependencies

In [None]:
!pip install tqdm



### Mounting Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = '/content/gdrive/My Drive/'

Mounted at /content/gdrive


### Application parameters

In [None]:
from datetime import datetime

S = 500 # sentence length
BATCH_SIZE = 64
NUM_OF_CLASSES = 6

L2 = 0

MLP_HIDDEN_UNITS = 128
EMBEDDING_DIM = 50

dataset_dir = root_dir + 'Machine Learning/Victor datasets/'
model_path = '/'
model_file = model_path + f'pytorch_model-{S}-.pt'

we_path = dataset_dir + 'word_embeddings/'
we_file = 'we-ft-N_50-MIN_COUNT_10-WINDOW_5-N_GRAMS_3_4-V_23606.vec'
if not (f'-N_{EMBEDDING_DIM}-') in we_file:
  raise ValueError('EMBEDDING_DIM does not match word embedding file name.')

now = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = dataset_dir + f'CNN_5/report-{S}-L2_{L2}-EMB_DIM_{EMBEDDING_DIM}-{now}.txt'

### Loading word embeddings

In [None]:
from gensim.models import KeyedVectors

gensim_model = KeyedVectors.load_word2vec_format(we_path + we_file)

### Loading and preprocessing datasets

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchtext import data
from tqdm.notebook import trange, tqdm_notebook
import numpy as np
from datetime import datetime
import zipfile

In [None]:
def read_zip(fname_prefix):
  zip_file = zipfile.ZipFile(dataset_dir + fname_prefix + '.zip', 'r')
  return zip_file.extract(fname_prefix + '.csv', path='/')

fname_train = read_zip('TRAIN-tag_stop_words_False-lemmatize_True')
fname_valid = read_zip('VALIDATION-tag_stop_words_False-lemmatize_True')
fname_test = read_zip('TEST-tag_stop_words_False-lemmatize_True')

In [None]:
%%time

TEXT = data.Field(
    lower=True, 
    fix_length=S)
LABEL = data.Field(
    sequential=False, 
    unk_token=None)

train_data, valid_data, test_data = data.TabularDataset.splits(
    path='/', 
    train=fname_train,
    validation=fname_valid, 
    test=fname_test, 
    format='csv', 
    skip_header = True, 
    fields=[('text', TEXT), ('label', LABEL)])

CPU times: user 23.2 s, sys: 1.58 s, total: 24.8 s
Wall time: 24.7 s


In [None]:
TEXT.build_vocab(train_data)
LABEL.build_vocab(train_data)

In [None]:
W2V_SIZE = len(gensim_model.vectors[0])
if W2V_SIZE != EMBEDDING_DIM:
  raise ValueError('EMBEDDING_DIM values does not match word vector size from word embedding file.')
embedding_vectors = []

for token, idx in tqdm_notebook(TEXT.vocab.stoi.items(), desc='Embedding vectors', unit='token', leave=False):
  if token in gensim_model.vocab.keys():
    embedding_vectors.append(torch.FloatTensor(gensim_model[token]))
  else:
    embedding_vectors.append(torch.zeros(W2V_SIZE))

TEXT.vocab.set_vectors(TEXT.vocab.stoi, embedding_vectors, W2V_SIZE)

HBox(children=(FloatProgress(value=0.0, description='Embedding vectors', max=155891.0, style=ProgressStyle(des…

  




In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
  (train_data, valid_data, test_data),
  sort = False, #don't sort test/validation data
  batch_sizes=(BATCH_SIZE, BATCH_SIZE, BATCH_SIZE),
  device=device)

### Model

In [None]:
class VictorCNN(nn.Module):

  def __init__(self, sentence_len, vocab_size, embed_dim, n_classes, mlp_h):
    """
    sentence_len: the length of the each input sentence.
    vocab_size:   the number of tokens in the vocabulary.
    embed_dim:    the dimension of each embedding word vector.
    n_classes:    number of classes, i.e., the output dimension of this NN.
    mlp_h:        number of hidden units of the MLP NN.
    """
    super(VictorCNN, self).__init__()
        
    self.word_embeddings = nn.Embedding(vocab_size, embed_dim)
    self.cnn_a = self.create_cnn_layer(embed_dim, 256, 3, 1)
    self.cnn_b = self.create_cnn_layer(embed_dim, 256, 4, 2)
    self.cnn_c = self.create_cnn_layer(embed_dim, 256, 5, 2)
    self.max_pool = nn.MaxPool1d(50)
    self.linear_h = nn.Linear(3840, mlp_h)
    self.linear_o = nn.Linear(mlp_h, n_classes)

  def create_cnn_layer(self, n_channels, n_filters, kernel_size, padding):
    return nn.Sequential(
        nn.Conv1d(n_channels, n_filters, kernel_size, padding=padding), 
        nn.BatchNorm1d(n_filters), 
        nn.MaxPool1d(2)
    )

  def forward(self, sentence):
    # sentence.shape: (s_len, b_len)
    embeds = self.word_embeddings(sentence).permute(1, 2, 0) # embeds shape: (b_len, embedding_dim, s_len)
    a = self.cnn_a(embeds)
    b = self.cnn_b(embeds)
    c = self.cnn_c(embeds)
    x = torch.cat((a, b, c), dim=1)
    x = self.max_pool(x)
    x = torch.flatten(x, start_dim=1)
    x = F.relu(self.linear_h(x))
    x = self.linear_o(x)
    return x

### Training functions

In [None]:
from sklearn.metrics import f1_score

def compute_metrics(targets, predictions):
  f1_macro = f1_score(targets, np.argmax(predictions, axis=1), average='macro')
  return f1_macro

def train(model, iterator, optimizer, criterion, epoch):
  epoch_loss = 0
  model.train()
  for batch in tqdm_notebook(iterator, desc='Train', unit='batch', leave=False):
    optimizer.zero_grad()
    predictions = model(batch.text)
    loss = criterion(predictions, batch.label)
    loss.backward()
    optimizer.step()
    epoch_loss += loss.item()

  return epoch_loss / len(iterator)

def predict(model, iterator, set_name):
  model.eval()
  predictions = None
  targets = None
  with torch.no_grad():
    for batch in tqdm_notebook(iterator, desc=f'Predicting ({set_name})', unit='batch', leave=False):
      out = model(batch.text)
      if predictions == None:
        predictions = out
        targets = batch.label
      else:
        predictions = torch.cat([predictions, out], dim=0)
        targets = torch.cat([targets, batch.label], dim=0)
  
  return predictions.cpu().numpy(), targets.cpu().numpy()

def evaluate(model, iterator, set_name):  
  predictions, targets = predict(model, iterator, set_name)
  return compute_metrics(targets, predictions)


### Training

In [None]:
EPOCHS = 30
learning_rate = 1e-3

model = VictorCNN(S, len(TEXT.vocab), EMBEDDING_DIM, NUM_OF_CLASSES, MLP_HIDDEN_UNITS)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=L2)
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)

In [None]:
%%time
import pandas as pd
from IPython.display import display, update_display

metrics_df = pd.DataFrame(columns=['Epoch', 'Loss (train)', 'F1 macro (train)', 'F1 macro (validation)'])
metrics_display = display(metrics_df, display_id='metrics_table')

best_valid_f1 = 0.0

for epoch in range(EPOCHS):
  train_loss = train(model, train_iterator, optimizer, criterion, epoch)
  train_f1_m = evaluate(model, train_iterator, 'train set')
  valid_f1_m = evaluate(model, valid_iterator, 'validation set')
  
  #saving
  if valid_f1_m > best_valid_f1:
    best_valid_f1 = valid_f1_m
    torch.save(model.state_dict(), model_file)

  #printing
  metrics_df.loc[epoch] = [epoch + 1, train_loss, train_f1_m, valid_f1_m]
  metrics_display.update(metrics_df)

Unnamed: 0,Epoch,Loss (train),F1 macro (train),F1 macro (validation)


HBox(children=(FloatProgress(value=0.0, description='Train', max=2332.0, style=ProgressStyle(description_width…



RuntimeError: ignored

### Evaluation

In [None]:
def load_saved_model(file_name):
  m = VictorCNN(S, len(TEXT.vocab), EMBEDDING_DIM, NUM_OF_CLASSES, MLP_HIDDEN_UNITS)
  m = m.to(device)
  m.load_state_dict(torch.load(file_name, map_location=device))
  m.eval()
  return m

model = load_saved_model(model_file)

In [None]:
train_predictions, train_targets = predict(model, train_iterator, 'train set')
valid_predictions, valid_targets = predict(model, valid_iterator, 'validation set')
test_predictions, test_targets = predict(model, test_iterator, 'test set')

In [None]:
from sklearn.metrics import classification_report

test_report = classification_report(
    test_targets, 
    np.argmax(test_predictions, axis=1), 
    digits=4, 
    target_names=LABEL.vocab.itos)

valid_report = classification_report(
    valid_targets, 
    np.argmax(valid_predictions, axis=1), 
    digits=4, 
    target_names=LABEL.vocab.itos)

train_report = classification_report(
    train_targets, 
    np.argmax(train_predictions, axis=1), 
    digits=4, 
    target_names=LABEL.vocab.itos)

def format_int(value):
  value = int(value)
  return f'{value:2d}'

def format_float(value):
  return f'{value:.4f}'

print('Test\n' + test_report)

rep_file = open(report_file, "wt")
rep_file.write('CNN 5 evaluation report\n')
rep_file.write(f'L2 rate: {L2}\n')
rep_file.write(f'learning rate: {learning_rate}\n')
rep_file.write(f'optimizer: {type(optimizer).__name__}\n')
rep_file.write(f'criterion: {type(criterion).__name__}\n')
rep_file.write(f'MLP_HIDDEN_UNITS: {MLP_HIDDEN_UNITS}\n')
rep_file.write(f'EMBEDDING_DIM: {EMBEDDING_DIM}\n')
rep_file.write(f'Test\n{test_report}\n')
rep_file.write(f'Validation\n{valid_report}\n')
rep_file.write(f'Train\n{train_report}\n')
rep_file.write(f'Train log\n {metrics_df.to_string(index=False, formatters=[format_int, format_float, format_float, format_float])}\n')
rep_file.close()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Confusion matrix
cm = confusion_matrix(valid_targets, np.argmax(valid_predictions, axis=1), normalize='true')
f = plt.figure(figsize=(10,10))
ax = f.add_subplot()
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=LABEL.vocab.itos)
disp.plot(xticks_rotation='vertical', cmap=plt.cm.Blues, ax=ax)
f.savefig(report_file + '.pdf', bbox_inches='tight')