# Experiment Data Preprocessing

In [4]:
import torch
import numpy as np
import pandas as pd
import os.path as osp

from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch.nn as nn
import torch.nn.functional as F

from transformers import DistilBertModel, DistilBertTokenizer

import argparse


class BaseDataset(Dataset):
    def __init__(self, file_path, split, max_length=256):  
        self.max_length = max_length  # the maximum number of tokens in the sequence
        self.split = split  # from inherited classes

        if split == 'test':
            # "Note that the test data will follow the same format as the files in the trial dataset, except that the label column will not be provided"
            df = pd.read_csv(osp.join(file_path, 'test.csv'), dtype={'text_1':str, 'text_2': str, 'label': int})  # get data in specific types
        else:
            df = pd.read_csv(osp.join(file_path, split + '.csv'), dtype={'text_1':str, 'text_2': str, 'label': int})
        text1 = list(df['text_1'])  # text1
        text2 = list(df['text_2'])  # text2

        if split != 'test':
            label = list(df['label'])

        self.data = []
        for i in range(len(text1)):
            if type(text1[i]) != str or type(text2[i]) != str:
                text1[i] = text2[i] =  ''
                bad_data = True  # null test sequences
            elif len(text1[i]) < 5 or len(text2[i]) < 5:
                bad_data = True  # short sequences
            else:
                bad_data = False  # normal sequences

            self.data.append({
                'text1': text1[i],
                'text2': text2[i],
                'bad_data': bad_data,
            })

            if split != 'test':
                self.data[-1]['label'] = label[i]

        self.tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

        self.unk_token_idx = self.tokenizer.convert_tokens_to_ids(self.tokenizer.unk_token)
        self.sep_token_idx = self.tokenizer.convert_tokens_to_ids(self.tokenizer.sep_token)

    def __len__(self):
        return len(self.data)  # return length

    def preprocess(self, text):
        tokens = self.tokenizer(text, padding='max_length', max_length=self.max_length, truncation=True)  # setting the tokenizer
        input_ids = tokens['input_ids']  # gain a series of index values of tokens, after tokenizing sequences
        input_ids = self.aug(input_ids)  # data augmentation
        input_ids = torch.tensor(input_ids)  # convert into the tensor format that PyTorch could understand

        return input_ids

    def __getitem__(self, idx):
        # get data by random idx generated by pytorch itself.
        token_idx_1 = self.preprocess(self.data[idx]['text1'])
        token_idx_2 = self.preprocess(self.data[idx]['text2'])
        bad_data = self.data[idx]['bad_data']

        if self.split == 'test':
            return token_idx_1, token_idx_2, bad_data  # no label column, if it was the test mode

        label = self.data[idx]['label']
        return token_idx_1, token_idx_2, label


class TestDataset(BaseDataset):
    def __init__(self, split, *args, **kwargs):
        assert split in ['dev', 'test']  # mainly for testing
        super(TestDataset, self).__init__(split=split, *args, **kwargs)

    def aug(self, input_ids, p=0.5):
        return input_ids

# Model Architecture

In [5]:
class BertVerifier(nn.Module):
    def __init__(self, output_size, hidden_size=768, dropout=0.0):
        super(BertVerifier, self).__init__()

        self.encoder = DistilBertModel.from_pretrained('distilbert-base-uncased')
        self.pooling_fn = self.pooling_fn_cls_token
        for p in self.encoder.embeddings.parameters():
            p.requires_grad = False  # froze the word embedding to prevent overfitting
        for m in self.encoder.transformer.layer[:3]:
            # low-level layers would pay more attention to the basic linguistics knowledge, from "https://aclanthology.org/N18-1202/"
            for p in m.parameters():  # froze parameters of the first three layers to prevent overfitting
                p.requires_grad = False  # not enough training data

        self.fc1 = nn.Linear(768, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def pooling_fn_cls_token(self, outputs):
        # extract "sequence of hidden-states at the output of the last layer of the model"
        # https://huggingface.co/docs/transformers/main/en/model_doc/distilbert#transformers.DistilBertModel
        return outputs[0][:, 0]

    def forward(self, x1, x2):
        # Forward
        outputs = self.encoder(torch.cat([x1, x2], dim=0).long())  # (M,a) (M,a) -> (M+M, a): two paragraphs were concatenated along the batch direction
        emb = self.pooling_fn(outputs)

        # prediction head
        emb = self.dropout(emb)
        emb = self.fc1(emb)
        emb = self.bn1(emb)
        emb = F.gelu(emb)
        emb = self.fc2(emb)
        emb = F.normalize(emb, p=2, dim=1)  # normalisation for the cosine similarity

        emb1, emb2 = emb.chunk(2)  # waist intercept, above was text1 and below was text2
        sim = (emb1 * emb2).sum(1)  # the cosine similarity dot product after normalization

        return sim


class LSTMVerifier(nn.Module):
    def __init__(self, output_size, hidden_size=768, num_layers=3, dropout=0.0):
        super(LSTMVerifier, self).__init__()
        # extract the embedding from DistilBERT tokenization
        self.emb_layer = DistilBertModel.from_pretrained('distilbert-base-uncased').embeddings
        self.encoder = nn.LSTM(768, hidden_size, num_layers, batch_first=True, bidirectional=True, dropout=dropout)  # bi-LSTM

        self.fc1 = nn.Linear(1536, hidden_size)  # 1536=768*2
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x1, x2):
        # Forward
        inputs = torch.cat([x1, x2], dim=0).long() # index values of tokens
        inputs = self.emb_layer(inputs)  # the embedding layer with "Deep contextualized word representations" from "https://aclanthology.org/N18-1202/"
        outputs, _ = self.encoder(inputs)
        # average features over the sequential dimension to serve as feature representations of the whole sequence
        emb = outputs.mean(1)

        # prediction head similar to DistilBERT, BUT WITHOUT dropout layer
        emb = self.fc1(emb)
        emb = self.bn1(emb)
        emb = F.gelu(emb)
        emb = self.fc2(emb)
        emb = F.normalize(emb, p=2, dim=1)

        emb1, emb2 = emb.chunk(2)
        sim = (emb1 * emb2).sum(1)

        return sim

# Demo

In [6]:
import os
from google.colab import drive
from tqdm import tqdm
drive.mount('/content/drive')

@torch.no_grad()
def inference(args):

    # init model
    if args.model_type == 'distilbert':
        model = BertVerifier(args.emb_size, args.hidden_size)
        pretrained_path = '/content/drive/MyDrive/save/distilbert/best_weights.pth'
    elif args.model_type == 'lstm':
        model = LSTMVerifier(args.emb_size, args.hidden_size, args.num_layers)
        pretrained_path = '/content/drive/MyDrive/save/lstm/best_weights.pth'
    else:
        raise NotImplementedError("Model type not implemented.")

    model.load_state_dict(torch.load(pretrained_path, map_location='cpu'))
    model.cuda()
    model.eval()

    # load data
    test_set = TestDataset('test', args.data_path, max_length=args.max_length)
    test_loader = DataLoader(test_set, batch_size=args.bs, pin_memory=True, shuffle=False, drop_last=False)

    preds_all = []
    for text1, text2, bad_data in tqdm(test_loader):
        scores = model(text1.cuda(), text2.cuda())
        preds = (scores > 0.18).long().data.cpu()
        preds *= 1 - bad_data.long()  # all invalid sequence pairs were viewed as 0
        preds_all.append(preds)

    preds_all = torch.cat(preds_all).numpy()

    if not os.path.exists('/'.join(args.output_path.split('/')[:-1])):
        os.makedirs('/'.join(args.output_path.split('/')[:-1]))
    with open(args.output_path, 'w') as f:
        f.write("prediction\n")
        out = [str(i) for i in preds_all]
        f.write('\n'.join(out))

def main():
    parser = argparse.ArgumentParser()  # parse some default parameters passed in, from the command line
    parser.add_argument("--hidden_size", type=int, default=768, help="embedding dimensions of hidden layers")
    parser.add_argument("--emb_size", type=int, default=256)
    parser.add_argument("--num_layers", type=int, default=3)
    parser.add_argument("--bs", type=int, default=16, help="batch size per gpu")
    parser.add_argument("--max_length", type=int, default=256, help="max length of tokens")
    parser.add_argument("--model_type", type=str, choices=['distilbert', 'lstm'], default='distilbert')
    parser.add_argument("--data_path", type=str, default='./test_data/AV')
    parser.add_argument("--output_path", type=str, default='./outputs/distilbert/Group_52_C.csv')

    args, unknown = parser.parse_known_args()  # get the namespace of all command line arguments, just like a python object

    inference(args)   # execute the experiment :)

main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


  0%|          | 0/375 [00:00<?, ?it/s]We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
100%|██████████| 375/375 [00:57<00:00,  6.55it/s]
