<a href="https://colab.research.google.com/github/TalCordova/PyTorch_Practice/blob/main/BERT_For_Text_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## BERT For Sentiment Analysis with PyTorch üêçüî•üî•

In [1]:
import torch
import torch.nn as nn
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import transformers
from transformers import BertTokenizer
from transformers import BertModel

## Get ATIS Data

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
atis_dir = '/content/drive/MyDrive/atis'

In [4]:
for item in os.listdir(atis_dir):
    print(item)

slot_label.txt
dev
intent_label.txt
test
train


In [5]:
def read_data(directory, filename):
    full_path = os.path.join(directory, filename)
    with open(full_path, 'r', encoding='utf-8') as f:
        line = f.readline()
        if not line:
            return None
        sentence = []
        while line and (line != "\n"):
            line = line.strip()
            sentence.append(line)
            line = f.readline()
    return sentence

In [6]:
# Load data and labels
train_data = read_data(atis_dir,'train/seq.in')
train_labels = read_data(atis_dir,'train/label')
test_data = read_data(atis_dir,'test/seq.in')
test_labels = read_data(atis_dir,'test/label')

KeyboardInterrupt: 

In [None]:
len(train_data), len(test_data)

In [None]:
len(train_labels), len(test_labels)

In [None]:
unique_items = list(set(train_labels))
print(unique_items)

## Create a Custom Dataset

In [None]:
class ATISDataset(torch.utils.data.Dataset):
  def __init__(self, data_dir, tokenizer, split = 'train', max_length = 512):
    self.data_dir = data_dir
    self.tokenizer = tokenizer
    self.split = split
    self.max_length = max_length

    self.encodings, self.labels, self.label_to_idx = self._load_and_tokenize(self.split)
    self.num_classes = len(set(self.labels))

  def _load_and_tokenize(self, split):
    data = read_data(self.data_dir, f"{split}/seq.in")
    labels = read_data(self.data_dir, f"{split}/label")

    unique_labels = sorted(set(labels))
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}

    label_indices = [label_to_idx[label] for label in labels]
    label_tensor = torch.tensor(label_indices)

    encodings = self.tokenizer(
        data,
        padding = 'max_length',
        max_length = self.max_length,
        truncation = 'longest_first',
        return_tensors = 'pt')

    return encodings, label_tensor, label_to_idx

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    return {
      'input_ids': self.encodings['input_ids'][idx],
      'attention_mask': self.encodings['attention_mask'][idx],
      'labels': self.labels[idx]
    }

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
train_dataset = ATISDataset(atis_dir, tokenizer, split = 'train')
test_dataset = ATISDataset(atis_dir, tokenizer, split = 'test')

In [None]:
train_dataset[0]

## Turn into DataLoaders

In [None]:
from torch.utils.data import DataLoader
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=True)

## Create the Model

In [None]:
num_classes = len(train_dataset.label_to_idx)
num_classes

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
from transformers import BertModel

class BERTModel(nn.Module):
  def __init__(self, num_classes, dropout = 0.5):
    super().__init__()
    self.bert = BertModel.from_pretrained('bert-base-uncased')

    modules_to_freeze = [
            self.bert.embeddings,
            *self.bert.encoder.layer[:8]  # Freeze first 8 layers
    ]

    for module in modules_to_freeze:
      for param in module.parameters():
        param.requires_grad = False

    self.classifier = nn.Sequential(
        nn.Dropout(0.3),
        nn.Linear(768, 256),
        nn.BatchNorm1d(256),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(256, num_classes)
    )

  def forward(self, input_ids, attention_mask, labels = None):
    _, pooled_output = self.bert(
     input_ids = input_ids,
     attention_mask = attention_mask,
     return_dict = False
    )

    final_layer = self.classifier(pooled_output)
    return final_layer

In [None]:
def create_model(num_classes, device):
  model = BERTModel(num_classes)
  model = model.to(device)

  # Print parameters stats
  total_params = sum(p.numel() for p in model.parameters())
  trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

  print(f"Total parameters: {total_params:,}")
  print(f"Trainable parameters: {trainable_params:,}")
  print(f"Percentage trainable: {(trainable_params/total_params)*100:.2f}%")

  return model

In [None]:
model = create_model(num_classes, device)

## Create Train and Test Loop

In [None]:
def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.

    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.

    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr = 1e-5, weight_decay = 0.01)
loss_fn = nn.CrossEntropyLoss().to(device)

In [None]:
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode = 'max',
    factor = 0.2,
    patience = 2,
    verbose = True
)

In [None]:
def train_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              optimizer: torch.optim.Optimizer,
              accuracy_fn,
              device:torch.device = device):
  """
  Performs train step with the model trying to learn on data_loader
  """
  train_loss, train_acc = 0, 0
  model.train()

  for batch in data_loader:
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['labels'].to(device)
    # 1. Forward pass
    outputs = model(
        input_ids = input_ids,
        attention_mask = attention_mask
    )
    # 2. Calculate the loss
    loss = loss_fn(outputs, labels)
    train_loss += loss.item()
    train_acc += accuracy_fn(y_true = labels, y_pred = outputs.argmax(dim = 1))
    # 3. optimizer zero grad
    optimizer.zero_grad()
    # 4. Loss backwards
    loss.backward()
    # 5. Optimizer step
    optimizer.step()

  # Calculate the general loss and accuracy
  train_loss /= len(data_loader)
  train_acc /= len(data_loader)
  print(f"Train loss: {train_loss:.5f} | Train acc: {train_acc:.2f}")


In [None]:
def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              accuracy_fn,
              device: torch.device = device):
    test_loss, test_acc = 0, 0
    model.eval()

    with torch.inference_mode():
        for batch in data_loader:
          input_ids = batch['input_ids'].to(device)
          attention_mask = batch['attention_mask'].to(device)
          labels = batch['labels'].to(device)

          outputs = model(
            input_ids = input_ids,
            attention_mask = attention_mask
          )

          loss = loss_fn(outputs, labels)
          test_loss += loss.item()  # Use .item() for scalars
          test_acc += accuracy_fn(y_true=labels, y_pred=outputs.argmax(dim=1))

    test_loss /= len(data_loader)
    test_acc /= len(data_loader)
    print(f"Test loss: {test_loss:.5f} | Test acc: {test_acc:.2f}")

    return test_loss, test_acc

In [None]:
from timeit import default_timer as timer
def print_train_time(start: float,
                     end: float,
                     device: torch.device = None):
  """
  Prints difference between start and endt time.
  """
  total_time = end-start
  print(f"Train time on {device}: {total_time:.3f} seconds")
  return total_time

## Train the Model

In [None]:
from tqdm.auto import tqdm
torch.manual_seed(42)

# Measure the time
train_time_start_on_gpu = timer()

epochs = 10

for epoch in range(epochs):
  print(f"Epoch: {epoch}\n-----")
  train_step(model = model,
             data_loader = train_dataloader,
             loss_fn = loss_fn,
             optimizer = optimizer,
             accuracy_fn = accuracy_fn)
  test_loss, test_acc = test_step(model = model,
                        loss_fn = loss_fn,
                        data_loader = test_dataloader,
                        accuracy_fn = accuracy_fn)

  scheduler.step(test_acc)
  current_lr = optimizer.param_groups[0]['lr']
  print(f"Current learning rate: {current_lr}")

train_time_end_on_gpu = timer()
total_train_time_model_1 = print_train_time(start = train_time_start_on_gpu,
                                            end = train_time_end_on_gpu,
                                            device = device)

## Evaluate the Model

In [None]:
torch.manual_seed(42)
def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn,
               device = device):
  """
  Returns a dictionary contaning the results of model predicting on data_loader
  """
  test_loss, test_acc = 0, 0
  with torch.inference_mode():
    for X, y in tqdm(data_loader):
      # Make data device agnostic
      X, y = X.to(device), y.to(device)
      # Make predictions
      y_pred = model(X)

      # Accumulate the loss and acc values per batch
      loss = loss_fn(y_pred, y)
      test_loss += loss
      test_acc += accuracy_fn(y_true = y,
                         y_pred = y_pred.argmax(dim = 1))

    # Scale the loss and acc to find the average loss\acc per batch
    test_loss /= len(data_loader)
    test_acc /= len(data_loader)

  return {"model_name": model.__class__.__name__, # Only works when model was created with a class
          "model_loss": test_loss.item(),
          "model_acc": test_acc}

In [None]:
# Get model_1 results dictionary
results = eval_model(model = model,
                             data_loader = test_dataloader,
                             loss_fn = loss_fn,
                             accuracy_fn = accuracy_fn,
                             device = device)
results