In [None]:
import torch
import pandas as pd

# Assume you have a pandas dataframe 'input_df' with a column named 'input_tensors'
input_tensors_col = input_df['input_tensors']

input_ids_list = []
token_type_ids_list = []
attention_mask_list = []

# Iterate over the column and append each tensor to its corresponding list
for input_tensors in input_tensors_col:
    input_ids_list.append(input_tensors[0])
    token_type_ids_list.append(input_tensors[1])
    attention_mask_list.append(input_tensors[2])

# Convert each list to a tensor
input_ids_tensor = torch.tensor(input_ids_list)
token_type_ids_tensor = torch.tensor(token_type_ids_list)
attention_mask_tensor = torch.tensor(attention_mask_list)

# Stack the tensors together to create the final tensor
input_tensors = torch.stack([input_ids_tensor, token_type_ids_tensor, attention_mask_tensor], dim=1)


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertForSequenceClassification, AdamW

# define the BERT model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# define the optimizer
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)

# define the loss function
loss_fn = torch.nn.CrossEntropyLoss()

# split the input data into train and test sets
train_inputs, test_inputs, train_labels, test_labels = train_test_split(input_tensors, labels, test_size=0.2, random_state=42)

# create data loaders for train and test sets
train_data = TensorDataset(train_inputs, train_labels)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_data = TensorDataset(test_inputs, test_labels)
test_loader = DataLoader(test_data, batch_size=32)

# train the BERT model
epochs = 10
for epoch in range(epochs):
    for i, batch in enumerate(train_loader):
        inputs, labels = batch
        outputs = model(inputs)
        loss = loss_fn(outputs[0], labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

# predict on the test set
predictions = []
for i, batch in enumerate(test_loader):
    inputs, labels = batch
    with torch.no_grad():
        outputs = model(inputs)
    _, pred = torch.max(outputs[0], dim=1)
    predictions += pred.tolist()

# calculate accuracy
accuracy = (predictions == test_labels.tolist()).mean()
print('Accuracy:', accuracy)


In [None]:
from torch import nn
from transformers import BertModel

class BertClassifier(nn.Module):

    def __init__(self, dropout=0.5):

        super(BertClassifier, self).__init__()

        self.bert = BertModel.from_pretrained('bert-base-cased')
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, 5)
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):

        _, pooled_output = self.bert(input_ids = input_id, attention_mask = mask, return_dict = False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.relu(linear_output)

        return final_layer

In [None]:
from torch.optim import Adam
from tqdm import tqdm

def train(model, train_data, val_data, learning_rate, epochs):

    train, val = Dataset(train_data), Dataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=2, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=2)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)

    if use_cuda:

            model = model.cuda()
            criterion = criterion.cuda()

    for epoch_num in range(epochs):

            total_acc_train = 0
            total_loss_train = 0

            for train_input, train_label in tqdm(train_dataloader):

                train_label = train_label.to(device)
                mask = train_input['attention_mask'].to(device)
                input_id = train_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)
                
                batch_loss = criterion(output, train_label.long())
                total_loss_train += batch_loss.item()
                
                acc = (output.argmax(dim=1) == train_label).sum().item()
                total_acc_train += acc

                model.zero_grad()
                batch_loss.backward()
                optimizer.step()
            
            total_acc_val = 0
            total_loss_val = 0

            with torch.no_grad():

                for val_input, val_label in val_dataloader:

                    val_label = val_label.to(device)
                    mask = val_input['attention_mask'].to(device)
                    input_id = val_input['input_ids'].squeeze(1).to(device)

                    output = model(input_id, mask)

                    batch_loss = criterion(output, val_label.long())
                    total_loss_val += batch_loss.item()
                    
                    acc = (output.argmax(dim=1) == val_label).sum().item()
                    total_acc_val += acc
            
            print(
                f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
                | Train Accuracy: {total_acc_train / len(train_data): .3f} \
                | Val Loss: {total_loss_val / len(val_data): .3f} \
                | Val Accuracy: {total_acc_val / len(val_data): .3f}')
                  
EPOCHS = 5
model = BertClassifier()
LR = 1e-6
              
train(model, df_train, df_val, LR, EPOCHS)

In [None]:
def evaluate(model, test_data):

    test = Dataset(test_data)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=2)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:

        model = model.cuda()

    total_acc_test = 0
    with torch.no_grad():

        for test_input, test_label in test_dataloader:

              test_label = test_label.to(device)
              mask = test_input['attention_mask'].to(device)
              input_id = test_input['input_ids'].squeeze(1).to(device)

              output = model(input_id, mask)

              acc = (output.argmax(dim=1) == test_label).sum().item()
              total_acc_test += acc
    
    print(f'Test Accuracy: {total_acc_test / len(test_data): .3f}')
    
evaluate(model, df_test)