In [1]:
def print_example(data, index, print_content=True, print_classification=True, print_rationales=True ):
    print(f'Retrieving Training Example [{index}].................\n')
    item = data[index]
    classification = item['classification']
    evidences = item['evidences']
    content = item['content']
    if print_content: print(f'Review content:\n{content}\n')
    if print_classification: print('----------------------------',
                                   '\n| Sentiment class:',
                                   classification,
                                   ("- NEG" if not classification else "- POS"),
                                   '|', '\n----------------------------')
    if print_rationales:
        print('\nHuman rationales / Supporting Evidence:')
        for evidence in evidences:
            print('     - ', evidence[0])

def get_content(data, index):
    item = data[index]
    content = item['content']
    return content

def get_classes(data, index):
    item = data[index]
    classification = item['classification']
    return torch.tensor(classification)

def get_annotations(data, index):
    item = data[index]
    content = item['evidences']
    annotations = [evidence for evidence in content]
    return annotations


In [2]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from typing import List, Dict, Union
from transformers import BertTokenizerFast

tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

# With it being easy to generate batches of tokenized texts, it's actually easier
# not to do the tokenization beforehand, and just store texts
# It's a little bit slow though, so if you found this to be bottleneck
# you'd want to pre-tokenize everything and then batch/pad as necessary
class SST2TransformerDataset(Dataset):
  def __init__(self,
               labels=None,
               texts=None):

    self.y = torch.tensor(labels,dtype=torch.int64)
    self.texts = texts

  def __len__(self):
    return self.y.shape[0]

  def __getitem__(self, idx):
    rdict = {
      'y': self.y[idx],
      'text': self.texts[idx]
    }
    return rdict


def SST2_transformer_collate(batch:List[Dict[str, Union[torch.Tensor,str]]]):
  y_batch = torch.tensor([example['y'] for example in batch])

  # We'll just reuse the tokenizer we created earlier, since it doesn't change
  tokenized_batch = tokenizer.batch_encode_plus([example['text'] for example in batch],
                                                return_tensors='pt',
                                                padding=True,
                                                max_length=512,
                                                truncation=True)

  return {
      'y':y_batch,
      'input_ids':tokenized_batch['input_ids'],
      'attention_mask':tokenized_batch['attention_mask']
  }


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [3]:
from torch.utils.data import Dataset, DataLoader, TensorDataset

train_data_path = 'drive/MyDrive/google_colab_data/train_data_loader.pt'
test_data_path = 'drive/MyDrive/google_colab_data/test_data_loader.pt'
val_data_path = 'drive/MyDrive/google_colab_data/val_data_loader.pt'

def load_dataLoader(path):
  loaded_data = torch.load(path)

  # Recreate the TensorDataset
  loaded_input = loaded_data['input']
  loaded_Y_star = loaded_data['Y_star']
  loaded_dataset = SST2TransformerDataset(loaded_Y_star, loaded_input)

  # Recreate the DataLoader
  dataLoader = DataLoader(
      loaded_dataset,
      collate_fn = SST2_transformer_collate,
      batch_size=loaded_data['dataloader_params']['batch_size'],
      shuffle=loaded_data['dataloader_params']['shuffle'],  # Use shuffle from the saved data
      num_workers=loaded_data['dataloader_params']['num_workers']
  )
  return (dataLoader, len(loaded_input), loaded_input, loaded_Y_star)

(train_dataloader, train_size, train_in, train_classes) = load_dataLoader(train_data_path)
(dev_dataloader, test_size, test_in, test_classes) = load_dataLoader(test_data_path)
(val_dataloader, val_data, val_in, val_classes) = load_dataLoader(val_data_path)

  loaded_data = torch.load(path)


# BERT Classifier

In [4]:
! pip install --quiet "pytorch-lightning==1.9.4"


In [5]:
from transformers import BertModel
# Like the tokenizer, we can just download one of these from Hugging Face
bert = BertModel.from_pretrained('bert-base-uncased')

In [15]:
import pytorch_lightning as pl
from torchmetrics.classification import BinaryAccuracy
from transformers import BertModel
import torch

class BertClassifier(pl.LightningModule):
  def __init__(self,
               learning_rate:float,
               num_classes:int,
               freeze_bert:bool=False,
               **kwargs):
    super().__init__(**kwargs)

    # Like with the LSTM, we'll define a central BERT we're gonna use
    # Again, this will download this from Hugging Face in the background
    self.bert = BertModel.from_pretrained('bert-base-uncased')

    # If we want to speed up training, we can freeze the BERT module and train
    # just the output layer. This will hurt accuracy though.
    if freeze_bert:
      for param in self.bert.parameters():
        param.requires_grad = False

    # Then the only other thing we need is an output layer, whose input size will
    # be the BERT's output size (768), which can can find as follows:
    self.output_layer = torch.nn.Linear(self.bert.config.hidden_size, num_classes)

    self.learning_rate = learning_rate
    self.train_accuracy = BinaryAccuracy()
    self.val_accuracy = BinaryAccuracy()
    self.test_accuracy = BinaryAccuracy()


  def forward(self, y:torch.Tensor, input_ids:torch.Tensor,
              attention_mask:torch.Tensor):
    # And then the forward function is pretty simple--
    # way simpler than with the LSTM
    bert_result = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask)

    # Typically we just use the pooler output for classification
    # Which, again, is the hidden state output for the [CLS] token
    cls_output = bert_result['pooler_output']

    py_logits = self.output_layer(cls_output)
    probs = torch.sigmoid(py_logits).view(-1)  # Convert logits to probabilities

    py = (probs > 0.5).float()
    if(y is not None):
      loss = torch.nn.functional.binary_cross_entropy_with_logits(py_logits.view(-1), y.float())
    else:
      loss = None
    return {'py':py,
            'probs':probs,
            'loss':loss}

  # Then do all the usual PyTorch Lightning functions
  def configure_optimizers(self):
    return [torch.optim.Adam(self.parameters(), lr=self.learning_rate)]

  def training_step(self, batch, batch_idx):
    result = self.forward(**batch)
    loss = result['loss']
    self.log('train_loss', result['loss'])
    self.train_accuracy.update(result['py'], batch['y'])
    return loss

  def training_epoch_end(self, outs):
    print(f' Epoch {self.current_epoch} training accuracy:', self.train_accuracy.compute())
    self.train_accuracy.reset()

  def validation_step(self, batch, batch_idx):
    # with torch.enable_grad():  # Enable gradient calculation during validation step
    result = self.forward(**batch)
    self.val_accuracy.update(result['py'], batch['y'])
    return result['loss']

  def validation_epoch_end(self, outs):
    print(f'Epoch {self.current_epoch} step {self.global_step} validation accuracy:', self.val_accuracy.compute())
    self.val_accuracy.reset()

  def test_step(self, batch, batch_idx):
    result = self.forward(**batch)
    self.test_accuracy.update(result['py'], batch['y'])
    return result['loss']

  def test_epoch_end(self, outs):
    print(f'Test accuracy:', self.test_accuracy.compute())
    self.test_accuracy.reset()

In [16]:
classifier_model = BertClassifier(learning_rate=2e-5, #if we were fine-tuning the BERT, we'd want to use something like 2e-5
                            num_classes=1)
# classifier_model = classifier_model.to('cuda')
print('Model:')
print(classifier_model)

Model:
BertClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, ele

In [8]:
torch.random.manual_seed(10)
first_train_batch = next(iter(train_dataloader))
print('First training batch:')
print(first_train_batch)

print('First training batch sizes:')
print({key:value.shape for key, value in first_train_batch.items()})

# first_dev_batch = next(iter(dev_dataloader))
# print('First training batch:')
# print(first_dev_batch)

# print('First training batch sizes:')
# print({key:value.shape for key, value in first_dev_batch.items()})

First training batch:
{'y': tensor([1, 1, 0, 0, 1, 1, 1, 1, 0, 0]), 'input_ids': tensor([[  101,  2096,  3898,  ..., 16344,  1029,   102],
        [  101, 18269,  1024,  ...,  2872,  1999,   102],
        [  101, 19962, 22599,  ...,  1055,  1037,   102],
        ...,
        [  101,  8383,  1006,  ..., 11721, 15378,   102],
        [  101,  2009,  2003,  ...,  2187,  2017,   102],
        [  101,  2045,  2001,  ...,  1010, 19031,   102]]), 'attention_mask': tensor([[1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 1, 1, 1],
        ...,
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 1, 1, 1]])}
First training batch sizes:
{'y': torch.Size([10]), 'input_ids': torch.Size([10, 512]), 'attention_mask': torch.Size([10, 512])}


In [9]:
from pprint import pprint
with torch.no_grad():
  first_train_output = classifier_model(**first_train_batch)

print('First training output:')
pprint(first_train_output)

print('Output item shapes:')
pprint({key:value.shape for key, value in first_train_output.items()})

First training output:
{'loss': tensor(0.6750),
 'probs': tensor([0.5592, 0.5766, 0.5555, 0.5811, 0.5488, 0.5768, 0.5724, 0.5656, 0.5539,
        0.5733]),
 'py': tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])}
Output item shapes:
{'loss': torch.Size([]), 'probs': torch.Size([10]), 'py': torch.Size([10])}


In [10]:
torch.cuda.is_available()
# torch.set_float32_matmul_precision('medium')

False

# LOAD PARAMS

In [11]:
import collections
import json

def stringify_ordered_dict_with_tensors(ordered_dict):
    serializable_dict = {k: v.tolist() for k, v in ordered_dict.items()}  # Convert tensors to lists
    return json.dumps(serializable_dict)

# Function to parse the string back to an OrderedDict of tensors
def parse_ordered_dict_with_tensors(stringified):
    deserialized_dict = json.loads(stringified)
    return collections.OrderedDict({k: torch.tensor(v) for k, v in deserialized_dict.items()})

In [21]:
params_path = 'drive/MyDrive/google_colab_data/params_std.txt'
data = ''
with open(params_path, 'r') as file:
  for line in file:
    data = data + line
# print(data)
parsed = parse_ordered_dict_with_tensors(data)
classifier_model.load_state_dict(parsed)

<All keys matched successfully>

# TRAIN

In [18]:
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.progress import TQDMProgressBar

# And then training is easy with our old friend PyTorch Lightning
classifier_trainer = Trainer(
    accelerator="auto",
    devices=1 if torch.cuda.is_available() else None,
    max_epochs=1,
    callbacks=[TQDMProgressBar(refresh_rate=20)],
    val_check_interval = 0.2,
    )

# Note that this is the best accuracy we've seen on this dataset, by a pretty wide margin

INFO:pytorch_lightning.utilities.rank_zero:GPU available: False, used: False
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [22]:
classifier_trainer.fit(model=classifier_model,
            train_dataloaders=train_dataloader,
            val_dataloaders=dev_dataloader)


  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")
INFO:pytorch_lightning.callbacks.model_summary:
  | Name           | Type           | Params
--------------------------------------------------
0 | bert           | BertModel      | 109 M 
1 | output_layer   | Linear         | 769   
2 | train_accuracy | BinaryAccuracy | 0     
3 | val_accuracy   | BinaryAccuracy | 0     
4 | test_accuracy  | BinaryAccuracy | 0     
--------------------------------------------------
109 M     Trainable params
0         Non-trainable params
109 M     Total params
437.932   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn("Detected KeyboardInterrupt, attempting graceful shutdown...")


In [23]:
val_results = classifier_trainer.validate(model=classifier_model, dataloaders=val_dataloader)

# Print the validation results
print("Validation Results:", val_results)

Validation: 0it [00:00, ?it/s]

Epoch 1 step 160 validation accuracy: tensor(0.8550)
Validation Results: [{}]


# SAVE PARAMS

In [None]:
params_path = 'drive/MyDrive/google_colab_data/params_std.txt'
!mkdir -p params_path
with open(params_path, 'w') as f:
  f.write(stringify_ordered_dict_with_tensors(classifier_model.state_dict()))