In [None]:
!pip install transformers

In [None]:
import numpy as np
import pandas as pd
import torch
import csv
from scipy import stats
from transformers import RobertaTokenizer, RobertaForSequenceClassification
import torch.nn as nn
import torch.optim as optim
from torchtext.legacy.data import Field, TabularDataset, BucketIterator, Iterator

In [None]:
torch.manual_seed(0)

In [None]:
SINGLE_TRAIN_DATAPATH = "https://raw.githubusercontent.com/MMU-TDMLab/CompLex/master/train/lcp_single_train.tsv"
SINGLE_TEST_DATAPATH = "https://raw.githubusercontent.com/MMU-TDMLab/CompLex/master/test-labels/lcp_single_test.tsv"
MULTI_TRAIN_DATAPATH = "https://raw.githubusercontent.com/MMU-TDMLab/CompLex/master/train/lcp_multi_train.tsv"
MULTI_TEST_DATAPATH = "https://raw.githubusercontent.com/MMU-TDMLab/CompLex/master/test-labels/lcp_multi_test.tsv"
single_train_filepath = "/content/sample_data/single_train.csv"
single_test_filepath = "/content/sample_data/single_test.csv"
multi_train_filepath = "/content/sample_data/multi_train.csv"
multi_test_filepath = "/content/sample_data/multi_test.csv"

In [None]:
k = 1            # token append number

In [None]:
def prepare_dataset(TRAIN_DATAPATH, TEST_DATAPATH, train_filepath, test_filepath):
    df_train = pd.read_csv(TRAIN_DATAPATH, sep = '\t', quotechar="'", quoting = csv.QUOTE_NONE)
    df_test = pd.read_csv(TEST_DATAPATH, sep = '\t', quotechar="'", quoting = csv.QUOTE_NONE)
    df_train['complexity'] = df_train['complexity'].astype(float)
    df_test['complexity'] = df_test['complexity'].astype(float)
    for i in range(len(df_train)):
        first = str(df_train['token'][i]) + " [SEP] "
        last = " [SEP] " + str(df_train['token'][i])
        for _ in range(k):
            df_train['sentence'][i] = first + df_train['sentence'][i] + last
    for i in range(len(df_test)):
        first = str(df_test['token'][i]) + " [SEP] "
        last = " [SEP] " + str(df_test['token'][i])
        for _ in range(k):
            df_test['sentence'][i] = first + df_test['sentence'][i] + last
    df_train = df_train.drop(['id', 'corpus', 'token'], axis = 1)
    df_test = df_test.drop(['id', 'corpus', 'token'], axis = 1)
    df_train = df_train[['complexity', 'sentence']]
    df_test = df_test[['complexity', 'sentence']]
    df_train.to_csv(train_filepath, index = False)
    df_test.to_csv(test_filepath, index = False)

In [None]:
device = 'cuda'
batch_size = 4
num_epochs = 8

In [None]:
tokenizer = RobertaTokenizer.from_pretrained("roberta-large")
PAD_INDEX = tokenizer.convert_tokens_to_ids(tokenizer.pad_token)
UNK_INDEX = tokenizer.convert_tokens_to_ids(tokenizer.unk_token)

In [None]:
def prepare_iterators(train_filepath, test_filepath):
    label = Field(sequential = False, use_vocab = False, batch_first = True, dtype = torch.float32)
    text = Field(use_vocab = False, tokenize = tokenizer.encode, lower = False, batch_first = True, pad_token = PAD_INDEX, unk_token = UNK_INDEX)
    fields = [('complexity', label), ('sentence', text)]
    train = TabularDataset(path = train_filepath, format = 'csv', skip_header = True, fields = fields)
    train_iter = BucketIterator(train, batch_size = batch_size, sort_key = lambda x: len(x.sentence), device = device, train = True, sort = True, sort_within_batch = True)
    test_label = Field(sequential = False, use_vocab = False, batch_first = True, dtype = torch.float32)
    test_text = Field(use_vocab = False, tokenize = tokenizer.encode, lower = False, batch_first = True, pad_token = PAD_INDEX, unk_token = UNK_INDEX)
    test_fields = [('complexity', test_label), ('sentence', test_text)]
    test = TabularDataset(path = test_filepath, format = 'csv', skip_header = True, fields = test_fields)
    test_iter = BucketIterator(test, batch_size = batch_size, sort_key = lambda x: len(x.sentence), device = device, train = False, sort = True, sort_within_batch = True)
    return train_iter, test_iter

In [None]:
model = RobertaForSequenceClassification.from_pretrained("roberta-large")
model.config.num_labels = 1
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr = 0.00001)

In [None]:
def train_model(model, iterator):
    epoch_loss = 0.0
    model.train()
    for batch in iterator:
        text = batch.sentence
        label = batch.complexity
        optimizer.zero_grad()
        output = model(text)                       
        logits = output.logits[:, : 1]                    
        logits = torch.sigmoid(torch.squeeze(logits))
        try:
            predicted.extend(logits.tolist())
            labels.extend(label.tolist())
            loss = criterion(label, logits)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        except TypeError:
            pass
    return epoch_loss / len(iterator)

In [None]:
def test_model(model, iterator):
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            text = batch.sentence
            label = batch.complexity
            output = model(text)
            logits = output.logits[:, : 1]                    
            logits = torch.sigmoid(torch.squeeze(logits))
            try:
                test_predicted.extend(logits.tolist())
                test_labels.extend(label.tolist())
            except TypeError:
                pass

In [None]:
def calculate_metrics(y, y_hat):
    vx = y.astype(float)
    vy = y_hat.astype(float)
    pearsonR = np.corrcoef(vx, vy)[0, 1]
    spearmanRho = stats.spearmanr(vx, vy)
    MSE = np.mean((vx - vy) ** 2)
    MAE = np.mean(np.absolute(vx - vy))
    RSquared = (pearsonR ** 2)

    print("Pearson's R: {}".format(pearsonR))
    print("Spearman's rho: {}".format(spearmanRho))
    print("R Squared: {}".format(RSquared))
    print("MSE: {}".format(MSE))
    print("MAE: {}".format(MAE))

In [None]:
prepare_dataset(SINGLE_TRAIN_DATAPATH, SINGLE_TEST_DATAPATH, single_train_filepath, single_test_filepath)
train_iter, test_iter = prepare_iterators(single_train_filepath, single_test_filepath)

In [None]:
print("++++++Running for single+++++")
for epoch in range(num_epochs):
    labels = []
    predicted = []
    train_loss = train_model(model, train_iter)
    print(f'\t Epoch: {epoch + 1} | Train Loss: {train_loss: }')
    print("------Metrics for train------")
    calculate_metrics(np.array(labels), np.array(predicted))
    test_labels = []
    test_predicted = []
    test_model(model, test_iter)
    print("------Metrics for test-------")
    calculate_metrics(np.array(test_labels), np.array(test_predicted))

In [None]:
prepare_dataset(MULTI_TRAIN_DATAPATH, MULTI_TEST_DATAPATH, multi_train_filepath, multi_test_filepath)
train_iter, test_iter = prepare_iterators(multi_train_filepath, multi_test_filepath)

In [None]:
print("++++++Running for multi++++++")
for epoch in range(num_epochs):
    labels = []
    predicted = []
    train_loss = train_model(model, train_iter)
    print(f'\t Epoch: {epoch + 1} | Train Loss: {train_loss: }')
    print("------Metrics for train------")
    calculate_metrics(np.array(labels), np.array(predicted))
    test_labels = []
    test_predicted = []
    test_model(model, test_iter)
    print("------Metrics for test-------")
    calculate_metrics(np.array(test_labels), np.array(test_predicted))