In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!pip install transformers

In [None]:
import re
import json
import torch
import string
import numpy as np
import pandas as pd
import torch.nn as nn
from tqdm.auto import tqdm
from matplotlib import pyplot as plt
from transformers import get_scheduler
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = 2
num_epochs = 101
batch_size = 32
hidden_dim = 256
max_length = 256

In [None]:
class MedicalTCDataset(Dataset):
    def __init__(self, data):
        self.data = data['medical_abstract']
        self.labels = data['condition_label']
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        return self.data[index], self.labels[index]

In [None]:
def clean_medical_text(text):
    text = text.lower()
    text = re.sub('[' + string.punctuation + ']', '', text)
    text = re.sub(r'\s+', ' ', text)
    stopwords = ['the', 'a', 'an', 'in', 'on', 'is', 'are', 'was', 'were', 'to', 'for', 'of']
    text = ' '.join(word for word in text.split() if word not in stopwords)
    return text

In [None]:
train = pd.read_csv("/content/gdrive/MyDrive/XAI/data/medical_tc_train.csv")
test =  pd.read_csv("/content/gdrive/MyDrive/XAI/data/medical_tc_test.csv")
labels = pd.read_csv("/content/gdrive/MyDrive/XAI/data/medical_tc_labels.csv")
mappings = {
    0: "non-cardiovascular",
    1: "cardiovascular"
}
inverse_mapping = {
    "non-cardiovascular" : 0,
    "cardiovascular" : 1
}

In [None]:
train['medical_abstract'] = train['medical_abstract'].apply(clean_medical_text)
test['medical_abstract'] = test['medical_abstract'].apply(clean_medical_text)
train['condition_label'] = train['condition_label'].apply(lambda x: 0 if x in [1,2,3,5] else 1)
test['condition_label'] = test['condition_label'].apply(lambda x: 0 if x in [1,2,3,5] else 1)
train['condition_label'] = train['condition_label'].apply(lambda x: mappings[x])
test['condition_label'] = test['condition_label'].apply(lambda x: mappings[x])

In [None]:
train.to_csv("/content/gdrive/MyDrive/XAI/data/medical_tc_train_cleaned.csv", index=False)
test.to_csv("/content/gdrive/MyDrive/XAI/data/medical_tc_test_cleaned.csv", index=False)

### Fine Tuning

In [None]:
train_dataset = MedicalTCDataset(train)
test_dataset = MedicalTCDataset(test)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
tokenizer = AutoTokenizer.from_pretrained("dmis-lab/biobert-base-cased-v1.1")
model = AutoModelForSequenceClassification.from_pretrained("dmis-lab/biobert-base-cased-v1.1")
model.classifier = nn.Sequential(
    nn.Linear(model.config.hidden_size, hidden_dim),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(hidden_dim, num_classes)
)
model.to(device)
print()

In [None]:
model.train()
for i, param in enumerate(model.bert.parameters()):
    if i < 185:
      param.requires_grad = False
    else:
      param.requires_grad = True
for param in model.classifier.parameters():
    param.requires_grad = True

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps
)

In [None]:
criterion = torch.nn.CrossEntropyLoss()
train_loss = []
model.to(device)
model.train()
progress_bar = tqdm(range(num_training_steps))
for epoch in tqdm(range(num_epochs)):
    for text, labels in train_dataloader:
        optimizer.zero_grad()
        tv = torch.tensor([tokenizer.encode(v, padding='max_length', max_length=max_length, truncation=True) for v in text]).cuda()
        a_mask = (tv!=0).type(torch.int64).cuda()
        outputs = model(tv,attention_mask=a_mask)[0].cuda()
        labesl = list(labels)
        labels = [inverse_mapping[l] for l in labels]
        labels = torch.Tensor(labels).long().cuda()
        loss = criterion(outputs, labels)
        loss.backward()
        train_loss.append(loss.item())
        optimizer.step()
        lr_scheduler.step()
        progress_bar.update(1)
    if epoch % 10 == 0:
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'loss': loss.item(),
        }, '/content/gdrive/MyDrive/XAI/models/biobert_fine_tuned_epoch_{}.pt'.format(epoch))
np.save("/content/gdrive/MyDrive/XAI/results/biobert_fine_tuned_loss.npy", np.array(train_loss))

In [None]:
num_testing_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_testing_steps
)
progress_bar = tqdm(range(num_testing_steps))

model.eval()
eval_loss = []
with torch.no_grad():
    for text, labels in test_dataloader:
        optimizer.zero_grad()
        tv = torch.tensor([tokenizer.encode(v, padding='max_length', max_length=max_length, truncation=True) for v in text]).cuda()
        a_mask = (tv!=0).type(torch.int64).cuda()
        outputs = model(tv,attention_mask=a_mask)[0].cpu()
        labesl = list(labels)
        labels = [inverse_mapping[l] for l in labels]
        labels = torch.Tensor(labels).long()
        loss = criterion(outputs, labels)
        eval_loss.append(loss.item())
        progress_bar.update(1)
average_eval_loss = np.mean(eval_loss)
print(f"Average eval loss: {average_eval_loss}")
np.save("/content/gdrive/MyDrive/XAI/results/biobert_fine_tuned_eval_loss.npy", np.array(eval_loss))

In [None]:
input_json = []
for data, text in test_dataloader:
    for d in data:
      temp = {}
      temp['text'] = d
      temp['words'] = d.split()
      input_json.append(temp)

with open('/content/gdrive/MyDrive/XAI/data/input.json', 'w') as f:
    json.dump(input_json, f)

with open('/content/gdrive/MyDrive/XAI/data/model_config.json', 'w') as f:
    json.dump(model.bert.config.to_dict(), f)

In [None]:
loss = np.load('/content/gdrive/MyDrive/XAI/data/biobert_fine_tuned_loss.npy')
loss = np.mean(loss.reshape(-1, 361), axis=1)

In [None]:
plt.plot(loss, color='red')
plt.title('Train loss characteristics')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.grid(True)
