In [None]:
import pandas as pd
import os
from tqdm import tqdm
import random
import torch
import torch.nn as nn
import numpy as np
from transformers import AutoTokenizer, AutoModel, AdamW
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
from sklearn.metrics import confusion_matrix

class BERT(nn.Module):
    def __init__(self, bert, num_class=5):
        super(BERT, self).__init__()
        self.bert = bert 
        # dense layer 1
        self.fc1 = nn.Linear(768, 512)
        # dropout layer
        self.dropout = nn.Dropout(0.1)
        # relu activation function
        self.relu =  nn.ReLU()
        # dense layer 2 (Output layer)
        self.fc2 = nn.Linear(512, num_class)
        #softmax activation function
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, input_ids, masks):
        #pass the inputs to the model  
        input_ids = input_ids.squeeze(1)
        masks = masks.squeeze(1)
        
        cls_hs = self.bert(input_ids, masks).pooler_output
        x = self.fc1(cls_hs)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.softmax(x)
        
        return x

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, df):
        
        self.tokenizer = tokenizer
        self.texts, self.labels = [], []
        
        for idx, row in df.iterrows():
            label = row['label']
            text = str(row['text'])

            self.labels.append(label)
            self.texts.append(self.tokenizer(text, padding='max_length', max_length = 512, truncation=True, return_tensors="pt"))
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

In [None]:
def evaluate(test_dataloader, model, criterion, device, epoch):
    model.eval()
    total_loss = 0
    true_list, pred_list = [], []

    # iterate over batches
    for step, batch in enumerate(test_dataloader):
        input_ids, type_ids, masks, labels = batch[0]['input_ids'].to(device), batch[0]['token_type_ids'].to(device), batch[0]['attention_mask'].to(device), batch[1].to(device)
        
        with torch.no_grad(): 
            # get model predictions for the current batch
            preds = model(input_ids, masks)
            preds = preds.detach().cpu().numpy()
            labels = labels.detach().cpu().tolist()
            
            for idx in range(len(labels)):
                label = labels[idx]
                true_list.append(label)
                pred_list.append(np.argmax(preds[idx]))

    print(pred_list)
    print('Epoch {}, Precision: {}, Recall: {}, F-1: {}\n'.format(epoch, precision, recall, f1))
    print(confusion_matrix(true_list, pred_list, labels=[1,0,3,2,4]))
    print(classification_report(true_list, pred_list,labels=[1,0,3,2,4],
                                digits = 4,
                                target_names=["Diet", "Exercise", "Mental Health", "Other","Unrelated"]))

In [None]:
def train(train_dataloader, test_dataloader, model, tokenizer, criterion, device):
    
    model.train()
    optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
    
    # empty list to save model predictions
    pred_labels, true_labels = [], []
    best_epoch, best_f1 = 0, -100
    
    for epoch in tqdm(range(EPOCHS)):
        print('Training, Epoch: {}'.format(epoch))
        total_loss = 0
        # iterate over batches
        for step, batch in enumerate(train_dataloader):
            input_ids, type_ids, masks, labels = batch[0]['input_ids'].to(device), batch[0]['token_type_ids'].to(device), batch[0]['attention_mask'].to(device), batch[1].to(device)

            # clear previously calculated gradients 
            model.zero_grad()        
            preds = model(input_ids, masks)
            loss = criterion(preds, labels)
            total_loss = total_loss + loss.item()
            
            # update parameters
            loss.backward()
            optimizer.step()
        print(total_loss/len(train_dataloader))
        evaluate(test_dataloader, model, criterion, device, epoch)

In [None]:
DATA_DIR = 'onestep.csv'# path to your data .csv file


MODELS = {'PubMedBERT': 'microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext',
          'BioBERT': 'dmis-lab/biobert-base-cased-v1.1',
          'ClinicalBERT': 'emilyalsentzer/Bio_ClinicalBERT'}

MODEL = 'BioBERT' # Specify the pre-trained BERT model to use
LEARNING_RATE = 2e-5
EPOCHS = 20

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print('Device:', device)

# initialize BERT tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(MODELS[MODEL])
bert = AutoModel.from_pretrained(MODELS[MODEL])
model = BERT(bert, num_class=5) # specify how many classes to classify, here for example we want to classify 5 classes
model.to(device)
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
# load dataset
df = pd.read_csv(DATA_DIR)

train_df = df[df['fold'] != 1]
test_df = df[df['fold'] == 1]


train_df = train_df.sample(frac=1, random_state=0)
#sample train 
    
train_dataset = Dataset(tokenizer, train_df)
test_dataset = Dataset(tokenizer, test_df)
    
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=16)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=16)
print('*'*40)

In [None]:
train(train_dataloader, test_dataloader, model, tokenizer, criterion, device)