In [None]:
!pip install googletrans==3.1.0a0
!pip install --upgrade transformers

In [None]:
import datetime
import pandas as pd
from sklearn import metrics
import time
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel, AdamW
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from tqdm import tqdm

pd.set_option("display.max_columns", None)

In [None]:
df = pd.read_csv("./drive/MyDrive/CSC413_data/leetcode.csv")
# df = pd.read_csv("../data/leetcode.csv")
df.sample(10)

In [None]:
inputs = df["description"].values
labels = df.iloc[:, 1:].values
# convert all inputs to lowercase
inputs = [i.lower() for i in inputs]

print(inputs[0])
print(labels[0])

In [None]:
inputs_len = [len(i.split()) for i in inputs]
plt.hist(inputs_len, bins=100)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
MAX_LEN = 512
num_labels = len(labels[0])
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 8
LEARNING_RATE = 2e-5
tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base", do_lower_case=True)

In [None]:
target_cols = df.iloc[:, 1:].columns.tolist()
print(target_cols)

In [None]:
# data augmentation
from googletrans import Translator
translator = Translator()
def transform(text):
    # randomly select a language to translate to
    lang = np.random.choice(['fr', 'zh-cn', 'es', 'de', 'ru', 'ja', 'ko'])
    translated = translator.translate(text, dest=lang)
    # then translate back to english
    translated = translator.translate(translated.text, dest='en')
    return translated.text

In [None]:
class BERTDataset(Dataset):
    def __init__(self, df, tokenizer, max_len, transform):
        self.df = df
        self.max_len = max_len
        self.text = df.description
        self.tokenizer = tokenizer
        self.targets = df[target_cols].values
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        text = self.text[index]
        text = self.transform(text)
        inputs = self.tokenizer.encode_plus(
            text,
            truncation=True,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]
        
        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float)
        }

In [None]:
# do train valid split
df_train = df.sample(frac=0.8, random_state=42)
df_valid = df.drop(df_train.index).reset_index(drop=True)

df_train = df_train.reset_index(drop=True)
df_valid = df_valid.reset_index(drop=True)

df_train.head()

In [None]:
train_dataset = BERTDataset(df_train, tokenizer, MAX_LEN, transform)
valid_dataset = BERTDataset(df_valid, tokenizer, MAX_LEN, transform)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, 
                          num_workers=4, shuffle=True, pin_memory=True
                         )
valid_loader = DataLoader(valid_dataset, batch_size=VALID_BATCH_SIZE,
                            num_workers=4, shuffle=False, pin_memory=True
                         ) 

In [None]:
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.bert = AutoModel.from_pretrained("microsoft/codebert-base")
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(768, 512),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.2),
            torch.nn.Linear(512, num_labels)
        )
    
    def forward(self, ids, mask, token_type_ids):
        _, features = self.bert(ids, attention_mask = mask, token_type_ids = token_type_ids, return_dict=False)
        output = self.fc(features)
        return output

model = BERTClass()
model.to(device)

In [None]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

In [None]:
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-2)

In [None]:
def format_time(elapsed):
    elapsed_rounded = int(round(elapsed, 2))
    return str(datetime.timedelta(seconds=elapsed_rounded))

In [None]:
def validation(valid_loader, model, epoch):
    model.eval()
    total_loss = 0.0
    cnt = 0
    fin_targets=[]
    fin_outputs=[]
    t0 = time.time()
    with tqdm(valid_loader, unit="batch") as train_pbar:
        for data in train_pbar:
            train_pbar.set_description(f"Training (epoch {epoch + 1})")
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)
            with torch.no_grad():
                outputs, loss = model(ids, mask, token_type_ids)
                loss = loss_fn(outputs, targets)
                total_loss += loss.item()
                cnt += 1
                fin_targets.extend(targets.cpu().detach().numpy().tolist())
                fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
    outputs = np.array(fin_outputs) >= 0.5
    accuracy = metrics.accuracy_score(np.array(fin_targets), outputs)
    print("  * Average validation loss: {0:.2f}".format(total_loss/cnt))
    print("  * Accuracy: {0:.2f}".format(accuracy))
    print("  * Validation took: {:}".format(format_time(time.time() - t0)))
    return accuracy, total_loss/cnt

In [None]:
def train(train_loader, model, epoch):
    total_loss = 0.0
    cnt = 0
    model.train()
    t0 = time.time()
    with tqdm(train_loader, unit="batch") as train_pbar:
        for data in train_pbar:
            train_pbar.set_description(f"Training (epoch {epoch + 1})")
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)
            outputs = model(ids, mask, token_type_ids)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            total_loss += loss.item()
            cnt += 1
    train_loss = total_loss/cnt
    print("  * Average training loss: {0:.2f}".format(train_loss))
    print("  * Training epoch took: {:}".format(format_time(time.time() - t0)))
    print("Running Validation...")
    
    return train_loss

In [None]:
def train_epoch(epochs, train_loader,valid_loader, model):
    train_losses = []
    val_losses = []
    accuracies = []
    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = train(train_loader, model, epoch)
        accuracy, val_loss = validation(valid_loader, model, epoch)
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        accuracies.append(accuracy)
    return train_losses, val_losses, accuracies

In [None]:
def plot_loss_and_acc(loss_vals, eval_accs):
    sns.set(style='darkgrid')
    sns.set(font_scale=1.5)
    plt.rcParams["figure.figsize"] = (12,6)
    fig, ax1 = plt.subplots(1,1)
    ax1.plot(loss_vals, 'b-o', label = 'training loss')
    ax2 = ax1.twinx()
    ax2.plot(eval_accs, 'y-o', label = 'validation accuracy')
    ax2.set_title("Training loss and validation accuracy")
    ax2.set_xlabel("Epoch")
    ax1.set_ylabel("Loss", color='b')
    ax2.set_ylabel("Accuracy", color='y')
    ax1.tick_params(axis='y', rotation=0, labelcolor='b' )
    ax2.tick_params(axis='y', rotation=0, labelcolor='y' )
    plt.show()

In [None]:
EPOCHS = 3
train_losses, val_losses, accuracies = train_epoch(EPOCHS, train_loader, valid_loader, model)

In [None]:
plot_loss_and_acc(train_losses, accuracies)

## Predict Single Sentence and Predict based on original tags

In [None]:
def predict_single(input_text, threshold = 0.5):
    model.eval()
    with torch.no_grad():
        inputs = tokenizer.encode_plus(
            input_text,
            truncation=True,
            add_special_tokens=True,
            max_length=MAX_LEN,
            padding='max_length',
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]
        
        ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0).to(device)
        mask = torch.tensor(mask, dtype=torch.long).unsqueeze(0).to(device)
        token_type_ids = torch.tensor(token_type_ids, dtype=torch.long).unsqueeze(0).to(device)
        
        outputs = model(ids, mask, token_type_ids)
        outputs = torch.sigmoid(outputs).cpu().detach().numpy().tolist()[0]
        outputs = np.array(outputs) >= threshold
        return outputs

In [None]:
def label_to_text(labels):
    return [target_cols[i] for i in range(len(labels)) if labels[i]]

In [None]:
test_input = """
Given a string s, find the longest palindromic subsequence's length in s.

A subsequence is a sequence that can be derived from another sequence by deleting some or no elements without changing the order of the remaining elements.

 

Example 1:

Input: s = "bbbab"
Output: 4
Explanation: One possible longest palindromic subsequence is "bbbb".
Example 2:

Input: s = "cbbd"
Output: 2
Explanation: One possible longest palindromic subsequence is "bb".
 

Constraints:

1 <= s.length <= 1000
s consists only of lowercase English letters.
"""
res = predict_single(test_input, 0.5)
print(label_to_text(res))

In [None]:
# test label count accuracy, select top number of labels from prediction based on original label count
valid_inputs = df_valid['description'].values
valid_labels = df_valid.iloc[:, 1:].values
accuracies = []
for i, descrip in enumerate(valid_inputs):
    res = predict_single(descrip)
    label_count = valid_labels[i].sum()
    top_labels = np.argsort(res)[::-1][:label_count]
    top_preds = [0] * len(res)
    for label in top_labels:
        top_preds[label] = 1
    # print(f"Original labels: {label_to_text(valid_labels[i])}")
    # print(f"Predicted labels: {label_to_text(top_preds)}")
    # also calculate accuracy
    accu = metrics.accuracy_score(valid_labels[i], top_preds)
    accuracies.append(accu)
    # print(f'finished #{i} prediction')
print(f"Average accuracy: {np.mean(accuracies)}")