<a href="https://colab.research.google.com/github/garrettsomers/ManningLLMBook/blob/chapter3/notebooks/Chapter_3-Text_Classification_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install datasets
!pip install openprompt
!pip install accelerate -U

In [None]:
import copy
import torch
import numpy as np

use_cuda = torch.cuda.is_available()

In [None]:
## DATA LOADING

In [None]:
from datasets import load_dataset

dataset = load_dataset('glue', 'sst2')
seed = 0

# keep only 1000 of the training data to speed up preprocessing/tokenization
# we're doing few shot so don't need a lot
dataset['train'] = dataset['train'].shuffle(seed=seed).select(range(1000))
dataset

In [None]:
# separate the positives and negatives in training to ensure balanced samples
# this could matter a lot in few-shot
dataset['pos_train'] = dataset['train'].filter(lambda x: x['label']==1)
dataset['neg_train'] = dataset['train'].filter(lambda x: x['label']==0)

In [None]:
# we will try different samples evenly split between classes
shot_increments = 5
sample_sizes = [2**i for i in range(4, 4 + shot_increments)]
sample_sizes

[16, 32, 64, 128, 256]

In [None]:
## START OF PROMPTING

In [None]:
from openprompt.data_utils import InputExample

# create a dataset of opemprompt InputExamples from the training data
prompt_dataset = {}
for split in ['pos_train', 'neg_train', 'validation', 'test']:
    prompt_dataset[split] = []
    for data in dataset[split]:
        input_example = InputExample(text_a = data['sentence'], label=int(data['label']), guid=data['idx'])
        prompt_dataset[split].append(input_example)
print(prompt_dataset['pos_train'][0])
print(prompt_dataset['neg_train'][0])


In [None]:
from openprompt.plms import load_plm
from openprompt.prompts import ManualTemplate, ManualVerbalizer
from openprompt import PromptDataLoader

# load the BERT model
plm, tokenizer, model_config, WrapperClass = load_plm("bert", "bert-base-cased")

# create the prompt template
template_text = '{"placeholder": "text_a"} it is {"mask"} .'
template = ManualTemplate(tokenizer=tokenizer, text=template_text)

# create a wrapped tokenizer
wrapped_tokenizer = WrapperClass(max_seq_length=128, decoder_max_length=3, tokenizer=tokenizer, truncate_method="head")

# define your verbalizer with desired vocabulatary mapping to pos and neg
verbalizer = ManualVerbalizer(tokenizer, num_classes=2,
                              label_words=[['terrible'], ['great']])

# generate a testing dataloader
val_dataloader = PromptDataLoader(prompt_dataset['validation'], template, tokenizer=tokenizer,
                                  tokenizer_wrapper_class=WrapperClass, batch_size=4)

In [None]:
# define the accuracy metric

def evaluate(model, val_dataloader):
    model.eval()
    allpreds = []
    alllabels = []
    with torch.no_grad():
        for step, inputs in enumerate(val_dataloader):
            if use_cuda:
                inputs = inputs.cuda()
            logits = model(inputs)
            labels = inputs['label']
            alllabels.extend(labels.cpu().tolist())
            allpreds.extend(torch.argmax(logits, dim=-1).cpu().tolist())
    acc = sum([int(i==j) for i,j in zip(allpreds, alllabels)])/len(allpreds)
    return acc

In [None]:
from openprompt import PromptForClassification

# zero-shot testing: run the evalation set against the prompt model before
# any finetuning.

prompt_model = PromptForClassification(plm=copy.deepcopy(plm), template=template,
                                       verbalizer=verbalizer)
prompt_model = prompt_model.cuda()
prompt_scores = [evaluate(prompt_model, val_dataloader)]
prompt_scores

In [None]:
# now generate a learning curve. loop over different values of k (total samples)
# and calculate accuracy for each

from transformers import AdamW

for k in sample_sizes:
  # they are already shuffled, we can simply select the first k examples each time
  train_sample = prompt_dataset['pos_train'][:k] + prompt_dataset['neg_train'][:k]
  train_dataloader = PromptDataLoader(train_sample, template, tokenizer=tokenizer,
                                    tokenizer_wrapper_class=WrapperClass, shuffle=True,
                                    batch_size=4, seed=seed)
  
  prompt_model = PromptForClassification(plm=copy.deepcopy(plm), template=template,
                                         verbalizer=verbalizer, freeze_plm=False)
  prompt_model = prompt_model.cuda()
    
  loss_func = torch.nn.CrossEntropyLoss()
  no_decay = ['bias', 'LayerNorm.weight']
  optimizer_grouped_parameters = [
      {'params': [p for n, p in prompt_model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
      {'params': [p for n, p in prompt_model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
  ]

  optimizer = AdamW(optimizer_grouped_parameters, lr=2e-5)

  for epoch in range(5):
    tot_loss = 0
    for step, inputs in enumerate(train_dataloader):
        if use_cuda:
            inputs = inputs.cuda()
        logits = prompt_model(inputs)
        labels = inputs['label']
        loss = loss_func(logits, labels)
        loss.backward()
        tot_loss += loss.item()
        optimizer.step()
        optimizer.zero_grad()

  accuracy = evaluate(prompt_model, val_dataloader)
  prompt_scores.append(accuracy)
  print('Test set accuracy:', accuracy)
  torch.cuda.empty_cache()


In [None]:
## START OF FINETUNE

In [None]:
from transformers import BertForSequenceClassification, BertTokenizer 

# instantiate a BERT tokenizer and tokenizer the dataset

def tokenize_function(example):
    return tokenizer(example["sentence"], truncation=True)

tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
tokenized_datasets = dataset.map(tokenize_function, batched=True)

# rename columns and convert tokenized dataset to pytorch format
tokenized_datasets = tokenized_datasets.remove_columns(["sentence", "idx"])
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
tokenized_datasets.set_format("torch")
tokenized_datasets["train"].column_names

In [None]:
# define the accuracy metrid

def compute_acc(eval_preds):
  preds = np.argmax(eval_preds.predictions, axis=-1)
  labels = eval_preds.label_ids
  acc = sum([int(i==j) for i,j in zip(preds, labels)])/len(labels)
  return acc

In [None]:
from transformers import Trainer, TrainingArguments
import datasets

# "zero-shot" i.e. the head has random weights and no training is done 
training_args = TrainingArguments("trainer")
finetune_model = BertForSequenceClassification.from_pretrained('bert-base-cased', num_labels=2)
if use_cuda:
  finetune_model = finetune_model.cuda()

trainer = Trainer(
    finetune_model,
    training_args,
    train_dataset=tokenized_datasets['train'],
    # data_collator=data_collator,
    tokenizer=tokenizer,
  )

# skip training of the "Trainer"
preds = trainer.predict(tokenized_datasets['validation'])
acc = compute_acc(preds)
finetune_scores = [acc]
print(finetune_scores)

In [None]:
# now generate a learning curve. loop over different values of k (total samples)
# and calculate accuracy for each

for k in sample_sizes:
  train_sample = datasets.concatenate_datasets([tokenized_datasets['pos_train'].select(range(k)),
                                                tokenized_datasets['neg_train'].select(range(k))])
  training_args = TrainingArguments("trainer")
  model = copy.deepcopy(finetune_model)
  if use_cuda:
    model = model.cuda()
  
  trainer = Trainer(
    model,
    training_args,
    train_dataset=train_sample,
    # data_collator=data_collator,
    tokenizer=tokenizer
  )

  trainer.train()
  preds = trainer.predict(tokenized_datasets['validation'])
  acc = compute_acc(preds)
  finetune_scores.append(acc)
  print(finetune_scores)
  torch.cuda.empty_cache()

In [None]:
## LEARNING CURVE COMPARISON

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
# plot the prompt-based training and pre-train/finetuning learning curves
# against each other

fig1 = plt.figure()
ax11 = fig1.add_subplot(111)

x = [0] + sample_sizes
ax11.plot(range(len(x)), finetune_scores, color='r', label='finetuned model')
ax11.scatter(range(len(x)), finetune_scores, color='r')
ax11.plot(range(len(x)), prompt_scores, color='b', label='prompt model')
ax11.scatter(range(len(x)), prompt_scores, color='b')
ax11.set_xticks(range(len(x)))
ax11.set_xticklabels(x)
ax11.set_xlabel('Number of training examples (per class)')
ax11.set_ylabel('Test Set Accuracy')
ax11.legend()