This code implement the architecture for multilabel text classification in [paper](https://arxiv.org/pdf/2103.12607v1.pdf)

# Import and connect to gg drive

In [21]:
# from google.colab import drive
# drive.mount("/content/drive")

In [22]:
# cd /content/drive/MyDrive/lab/

In [23]:
import pandas as pd
import os
import collections
import numpy as np
import zipfile
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import *
from sklearn.model_selection import train_test_split

In [24]:
if torch.cuda.is_available():
 dev = "cuda:0"
else:
 dev = "cpu"
device = torch.device(dev)
device

device(type='cuda', index=0)

In [25]:
def save_classification(y_test, y_pred, out_dir, labels):
  if isinstance(y_pred, np.ndarray) == False:
    y_pred = y_pred.toarray()

  def accuracy(y_true, y_pred):
    temp = 0
    for i in range(y_true.shape[0]):
        numerator = sum(np.logical_and(y_true[i], y_pred[i]))
        denominator = sum(np.logical_or(y_true[i], y_pred[i]))
        if denominator != 0:
          temp += numerator / denominator
    return temp / y_true.shape[0]

  out = classification_report(y_test,y_pred, output_dict=True, target_names=labels)
  total_support = out['samples avg']['support']

  mr = accuracy_score(y_test, y_pred)
  acc = accuracy(y_test,y_pred)
  hm = hamming_loss(y_test, y_pred)

  out['Exact Match Ratio'] = {'precision': mr, 'recall': mr, 'f1-score': mr, 'support': total_support}
  out['Hamming Loss'] = {'precision': hm, 'recall': hm, 'f1-score': hm, 'support': total_support}
  out['Accuracy'] = {'precision': acc, 'recall': acc, 'f1-score': acc, 'support': total_support}
  out_df = pd.DataFrame(out).transpose()
  print(out_df)

  out_df.to_csv(out_dir)

  return out_df

# Extract and Read Data

In [26]:
data_folder = '/home/bkcs/HDD/secBertClassifier/Untitled Folder/'

# def extract_file():
#   file_zip = os.getcwd() + '/Data_Cleansing.zip'
#   with zipfile.ZipFile(file_zip, 'r') as zip_ref:
#     zip_ref.extractall(path=data_folder)

# extract_file()

In [27]:
os.listdir(data_folder)

['X_val.csv',
 'misclassified-outdate.csv',
 'y_test.csv',
 'X_train.csv',
 'X_test.csv',
 'secbert-escort.pt',
 'y_val.csv',
 'misclassified-data.csv',
 'y_train.csv']

In [40]:
X_train = pd.read_csv(data_folder+'X_train.csv')['BYTECODE'].to_numpy()
X_test = pd.read_csv(data_folder+'X_test.csv')['BYTECODE'].to_numpy()
X_val = pd.read_csv(data_folder+'X_val.csv')['BYTECODE'].to_numpy()

y_train = pd.read_csv(data_folder+'y_train.csv').to_numpy()
y_test = pd.read_csv(data_folder+'y_test.csv').to_numpy()
y_val = pd.read_csv(data_folder+'y_val.csv').to_numpy()

In [41]:
X_train.shape

(122280,)

In [42]:
class Tokenizer(object):
    def __init__(self, num_words=None, lower=True) -> None:
        self.word_index = {}
        self.word_counts = {}
        self.num_words = num_words
        self.split = " "
        self.lower = lower

    def fit_on_texts(self, texts):
        """
        create vocabulary

        Args:
            text: list of strings or list of list of strings
        """
        for text in texts:
            seq = self.text_to_word_sequence(text)
            for w in seq:
                if w in self.word_counts:
                    self.word_counts[w] += 1

                else:
                    self.word_counts[w] = 1
        vocab = self.word_counts.keys()
        self.word_index = dict(zip(vocab, list(range(1, len(vocab) + 1))))

    def text_to_word_sequence(self, input_text):
        if self.lower == True:
            input_text = input_text.lower()

        seq = input_text.split(self.split)
        return seq

    def texts_to_sequences(self, texts):
        return list(self.texts_to_sequences_generator(texts))

    def texts_to_sequences_generator(self, texts):
        for text in texts:
            seq = self.text_to_word_sequence(text)
            vect = []
            for w in seq:
                i = self.word_index.get(w)
                vect.append(i)
            yield vect

def pad_sequences(
    sequences,
    maxlen=None,
    dtype="int32",
    padding="pre",
    truncating="pre",
    value=0.0
):
    """
    Args:
        sequences: List of sequences (each sequence is a list of integers).
        maxlen: Optional Int, maximum length of all sequences. If not provided,
            sequences will be padded to the length of the longest individual
            sequence.
        dtype: (Optional, defaults to `"int32"`). Type of the output sequences.
            To pad sequences with variable length strings, you can use `object`.
        padding: String, "pre" or "post" (optional, defaults to `"pre"`):
            pad either before or after each sequence.
        truncating: String, "pre" or "post" (optional, defaults to `"pre"`):
            remove values from sequences larger than
            `maxlen`, either at the beginning or at the end of the sequences.
        value: Float or String, padding value. (Optional, defaults to 0.)

    Returns:
        Numpy array with shape `(len(sequences), maxlen)`

    Raises:
        ValueError: In case of invalid values for `truncating` or `padding`,
            or in case of invalid shape for a `sequences` entry.
    """

    if not hasattr(sequences, "__len__"):
        raise ValueError("`sequences` must be iterable.")
    num_samples = len(sequences)

    lengths = []
    sample_shape = ()
    flag = True

    # take the sample shape from the first non empty sequence
    # checking for consistency in the main loop below.

    for x in sequences:
        try:
            lengths.append(len(x))
            if flag and len(x):
                sample_shape = np.asarray(x).shape[1:]
                flag = False
        except TypeError as e:
            raise ValueError(
                "`sequences` must be a list of iterables. "
                f"Found non-iterable: {str(x)}"
            ) from e

    if maxlen is None:
        maxlen = np.max(lengths)

    is_dtype_str = np.issubdtype(dtype, np.str_) or np.issubdtype(
        dtype, np.unicode_
    )
    if isinstance(value, str) and dtype != object and not is_dtype_str:
        raise ValueError(
            f"`dtype` {dtype} is not compatible with `value`'s type: "
            f"{type(value)}\nYou should set `dtype=object` for variable length "
            "strings."
        )

    x = np.full((num_samples, maxlen) + sample_shape, value, dtype=dtype)
    for idx, s in enumerate(sequences):
        if not len(s):
            continue  # empty list/array was found
        if truncating == "pre":
            trunc = s[-maxlen:]
        elif truncating == "post":
            trunc = s[:maxlen]
        else:
            raise ValueError(f'Truncating type "{truncating}" not understood')

        # check `trunc` has expected shape
        trunc = np.asarray(trunc, dtype=dtype)
        if trunc.shape[1:] != sample_shape:
            raise ValueError(
                f"Shape of sample {trunc.shape[1:]} of sequence at "
                f"position {idx} is different from expected shape "
                f"{sample_shape}"
            )

        if padding == "post":
            x[idx, : len(trunc)] = trunc
        elif padding == "pre":
            x[idx, -len(trunc) :] = trunc
        else:
            raise ValueError(f'Padding type "{padding}" not understood')
    return x

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(texts=X_train)
sequences_train = tokenizer.texts_to_sequences(texts=X_train)
sequences_test = tokenizer.texts_to_sequences(texts=X_test)
sequences_val = tokenizer.texts_to_sequences(texts=X_val)
input_size = 17500

X_train = pad_sequences(sequences_train, maxlen=input_size)
X_val = pad_sequences(sequences_val, maxlen=input_size)
X_test = pad_sequences(sequences_test, maxlen=input_size)

In [None]:
X_train.shape

# Multilabel deep learning

## Create DataLoader

In [33]:
tensor_X_train.shape

torch.Size([1, 4100])

In [34]:
tensor_Y_train.shape

torch.Size([122280, 4])

In [None]:
from torch.utils.data import TensorDataset, DataLoader

tensor_X_train = torch.tensor(X_train)
tensor_X_val = torch.tensor(X_val)
tensor_X_test = torch.tensor(X_test)
tensor_Y_train = torch.FloatTensor(y_train)
tensor_Y_val = torch.FloatTensor(y_val)
tensor_Y_test = torch.FloatTensor(y_test)

train_dataset = TensorDataset(tensor_X_train, tensor_Y_train)
val_dataset = TensorDataset(tensor_X_val, tensor_Y_val)
test_dataset = TensorDataset(tensor_X_test, tensor_Y_test)

data_train_loader = DataLoader(train_dataset, batch_size=32)
data_val_loader = DataLoader(val_dataset, batch_size=32)
data_test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
class Branch(nn.Module):
  def __init__(self, input_size, hidden_size, dropout, num_outputs):
    super(Branch, self).__init__()

    self.dense1 = nn.Linear(input_size, hidden_size)
    self.batchnorm1 = nn.BatchNorm1d(hidden_size)
    self.dropout = nn.Dropout(p=dropout)
    self.dense2 = nn.Linear(hidden_size, num_outputs)

  def forward(self, x):
    out_dense1 = self.dense1(x)
    out_batchnorm1 = self.batchnorm1(out_dense1)
    out_dropout = self.dropout(out_batchnorm1)
    out_dense2 = self.dense2(out_dropout)

    return out_dense2

In [None]:
# gru_hidden_size = 64
# dense1_size = 128
# dense2_size = 64
# num_outputs = 1
# embedd_size = 5

# vocab = len(tokenizer.word_index.keys())
# word_embeddings = nn.Embedding(input_size, embedd_size)
# gru = nn.GRU(embedd_size, gru_hidden_size, num_layers=1)
# branch_module = Branch(gru_hidden_size, dense1_size, dense2_size, num_outputs)
# branches = nn.ModuleList([Branch(gru_hidden_size, dense1_size, dense2_size, num_outputs) for _ in range(num_classes)])
# sigmoid = nn.Sigmoid()

# inputs = next(iter(data_train_loader))
# print(inputs[0])
# print(inputs[0].shape)
# print(inputs[1].shape)

# embeds = word_embeddings(inputs[0])
# gru_out, _ = gru(embeds)
# outputs = [branch(gru_out[:, -1, :]) for branch in branches]
# outputs = torch.cat(outputs, dim=1)
# sigmoid(outputs)

## Create Model

In [None]:
class Escort(nn.Module):
  def __init__(self, vocab_size, embedd_size, gru_hidden_size, n_layers, num_classes):
    super(Escort, self).__init__()
    self.word_embeddings = nn.Embedding(vocab_size, embedd_size)
    self.gru = nn.GRU(embedd_size, gru_hidden_size, num_layers=n_layers)
    self.branches = nn.ModuleList([Branch(gru_hidden_size, 128, 64, 0.2, 1) for _ in range(num_classes)])
    self.sigmoid = nn.Sigmoid()

  def forward(self, sequence):
    embeds = self.word_embeddings(sequence)
    gru_out, _ = self.gru(embeds)
    output_branches = [branch(gru_out[:, -1, :]) for branch in self.branches]
    output_branches = torch.cat(output_branches, dim=1)
    outputs = self.sigmoid(output_branches)
    return outputs

## Train model

In [None]:
# model = LSTMMultilabel(SIZE_OF_VOCAB, NUM_HIDDEN_NODES, NUM_OUTPUT_NODES, NUM_LAYERS, DROPOUT, True, torch.DoubleTensor(weight))
# model

In [None]:
# inputs, labels = next(iter(data_train_loader))
# preds = model(inputs)

### Train and Validation Steps

In [None]:
def calculate_score(y_true, preds):
    acc_score = accuracy_score(y_true, preds)

    return acc_score

def train_steps(training_loader, model, loss_f, optimizer):
    training_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    train_acc = 0.

    model.train()
    for step, batch in enumerate(training_loader):
        # push the batch to gpu
        inputs = batch[0].to(device)
        labels = batch[1].to(device)

        preds = model(inputs)

        loss = loss_f(preds, labels)
        training_loss += loss.item()

        preds = preds.detach().cpu().numpy()
        preds = np.where(preds>=0.5, 1, 0)
        labels = labels.to('cpu').numpy()

        acc_score = calculate_score(labels, preds)
        train_acc += acc_score

        nb_tr_steps += 1

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        # When using GPU
        optimizer.step()

    epoch_loss = training_loss / nb_tr_steps
    epoch_acc = train_acc / nb_tr_steps
    return epoch_loss, epoch_acc

def evaluate_steps(validating_loader, model, loss_f):
    # deactivate dropout layers
    model.eval()

    total_loss, total_accuracy = 0, 0

    # empty list to save the model predictions
    total_preds = []
    total_labels = []
    # iterate over batches
    for step, batch in enumerate(validating_loader):
        # push the batch to gpu
        inputs = batch[0].to(device)
        labels = batch[1].to(device)

        # deactivate autograd
        with torch.no_grad():
            # model predictions
            preds = model(inputs)

            # compute the validation loss between actual and predicted values
            loss = loss_f(preds, labels)

            total_loss = total_loss + loss.item()

            preds = preds.detach().cpu().numpy()
            preds = np.where(preds>=0.5, 1, 0)
            total_preds += list(preds)
            total_labels += labels.tolist()
    # compute the validation loss of the epoch
    avg_loss = total_loss / len(validating_loader)
    acc_score = calculate_score(total_labels, total_preds)

    return avg_loss, acc_score

### Training loop

In [None]:
import time

def train(epochs, model, optimizer, criterion):
  # empty lists to store training and validation loss of each epoch
  # set initial loss to infinite
  best_valid_loss = float('inf')
  train_losses = []
  valid_losses = []
  train_accuracies = []
  valid_accuracies = []

  for epoch in range(epochs):
    start_time = time.time()
    train_loss, train_acc = train_steps(data_train_loader, model, criterion, optimizer)

    valid_loss, valid_acc = evaluate_steps(data_val_loader, model, criterion)

    # save the best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), './trained/escort.pt')
    # append training and validation loss
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_accuracies.append(train_acc)
    valid_accuracies.append(valid_acc)

    elapsed_time = time.time() - start_time

    print('Epoch {}/{} \t loss={:.4f} \t accuracy={:.4f} \t val_loss={:.4f}  \t val_acc={:.4f}  \t time={:.2f}s'.format(epoch + 1, epochs, train_loss, train_acc, valid_loss, valid_acc, elapsed_time))
  return train_accuracies, valid_accuracies, train_losses, valid_losses

In [None]:
def plot_graph(epochs, train, valid, tittle):
    fig = plt.figure(figsize=(12,12))
    plt.title(tittle)
    plt.plot(list(np.arange(epochs) + 1) , train, label='train')
    plt.plot(list(np.arange(epochs) + 1), valid, label='validation')
    plt.xlabel('num_epochs', fontsize=12)
    plt.ylabel('loss', fontsize=12)
    plt.legend(loc='best')


## Test Model

In [None]:
def predict(testing_loader, model, loss_f):
    # deactivate dropout layers
    model.eval()

    # empty list to save the model predictions
    total_preds = []
    total_labels = []
    start_time = time.time()
    # iterate over batches
    for step, batch in enumerate(testing_loader):
        # push the batch to gpu
        inputs = batch[0].to(device)
        labels = batch[1].to(device)

        # deactivate autograd
        with torch.no_grad():
            # model predictions
            preds = model(inputs)

            preds = preds.detach().cpu().numpy()
            preds = np.where(preds>=0.5, 1, 0)
            total_preds += list(preds)
            total_labels += labels.tolist()

    execution_time = (time.time() - start_time) / len(total_labels)
    return total_preds, total_labels, execution_time

## Run

In [None]:
epochs = 10
SIZE_OF_VOCAB = len(tokenizer.word_index.keys())
EMBEDDED_SIZE = 5
GRU_HIDDEN_SIZE = 64
NUM_OUTPUT_NODES = 4
NUM_LAYERS = 1
DROPOUT = 0.2
model = Escort(SIZE_OF_VOCAB, EMBEDDED_SIZE, GRU_HIDDEN_SIZE, NUM_LAYERS, NUM_OUTPUT_NODES)
model.to(device)
print(model)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.BCELoss()
train_accuracies, valid_accuracies, train_losses, valid_losses = train(epochs, model, optimizer, criterion)

In [None]:
plot_graph(epochs, train_losses, valid_losses, "Train/Validation Loss")
plot_graph(epochs, train_accuracies, valid_accuracies, "Train/Validation Accuracy")

In [None]:
total_preds, total_labels, execution_time = predict(data_test_loader, model, criterion)

In [None]:
execution_time

In [None]:
save_classification(y_pred=np.array(total_preds), y_test=np.array(total_labels), labels=labels, out_dir='escort.csv')