In [54]:
#!git clone https://github.com/SIDN-IAP/global-model-repr
!pip install transformers

import torch
from transformers import BertTokenizer, BertModel
import numpy as np
import sys
sys.path.append('global-model-repr/')
# sys.path.append('..')
from probing.utils import get_sentence_repr, get_model_and_tokenizer, get_pos_data

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print("device:", device)

device: cuda


# Get data for part-of-speech tagging

In [55]:
train_sentences, train_labels, test_sentences, test_labels, _, _, label2index = get_pos_data("global-model-repr/probing", frac=0.1)
# train_sentences, train_labels, test_sentences, test_labels, _, _, label2index = get_pos_data("../probing", frac=0.1)
num_labels = len(label2index)
print("Training sentences:", len(train_sentences), "Test sentences:", len(test_sentences))
print("Unique labels:", num_labels)

Training sentences: 1254 Test sentences: 208
Unique labels: 17


# Set up model

In [56]:
class Classifier(torch.nn.Module):
    
    def __init__(self, input_dim, output_dim):
        super(Classifier, self).__init__()
        
        self.linear = torch.nn.Linear(input_dim, output_dim)
        
    def forward(self, input):
        output = self.linear(input)
        return output
    

class NonlinearClassifier(torch.nn.Module):
    
    def __init__(self, input_dim, output_dim):
        super(NonlinearClassifier, self).__init__()
        
        self.input2hidden = torch.nn.Linear(input_dim, input_dim)
        self.hidden2output = torch.nn.Linear(input_dim, output_dim)
        
    def forward(self, input):
        hidden = self.input2hidden(input)
        output = self.hidden2output(hidden)
        return output
    
    
def build_classifier(emb_dim, num_labels, device='cpu'):

    classifier = Classifier(emb_dim, num_labels).to(device)
    criterion = torch.nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.Adam(classifier.parameters())

    return classifier, criterion, optimizer


def build_nonlinear_classifier(emb_dim, num_labels, device='cpu'):

    classifier = NonlinearClassifier(emb_dim, num_labels).to(device)
    criterion = torch.nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.Adam(classifier.parameters())

    return classifier, criterion, optimizer


model_name = 'bert-base-cased'
# get model and tokenizer from Transformers
model, tokenizer, sep, emb_dim = get_model_and_tokenizer(model_name, device)
# build classifier
classifier, criterion, optimizer = build_classifier(emb_dim, num_labels, device)

In [57]:
print(model)

BertModel(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(28996, 768, padding_idx=0)
    (position_embeddings): Embedding(512, 768)
    (token_type_embeddings): Embedding(2, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0): BertLayer(
        (attention): BertAttention(
          (self): BertSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): BertSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, inplace=False)
          

In [58]:
print(classifier)

Classifier(
  (linear): Linear(in_features=768, out_features=17, bias=True)
)


# Train 

In [59]:
def train(num_epochs, train_representations, train_labels, 
          model, tokenizer, sep, model_name, device, 
          classifier, criterion, optimizer, batch_size=32):
    
    num_total = train_representations.shape[0] 
    for i in range(num_epochs):
        total_loss = 0.
        num_correct = 0.
        for batch in range(0, num_total, batch_size):
            batch_repr = train_representations[batch: batch+batch_size]
            batch_labels = train_labels[batch: batch+batch_size]

            optimizer.zero_grad()
            
            out = classifier(batch_repr)
            pred = out.max(1)[1]
            num_correct += pred.long().eq(batch_labels.long()).cpu().sum().item()
            loss = criterion(out, batch_labels)
            total_loss += loss.item()

            loss.backward()
            optimizer.step()
#         print('Training epoch: {}, loss: {}, accuracy: {}'.format(i, total_loss/num_total, num_correct/num_total))
    return total_loss/num_total, num_correct/num_total

# Evaluate

In [60]:
def evaluate(test_representations, test_labels, 
             model, tokenizer, sep, model_name, device, 
             classifier, criterion, batch_size=32):
    
    num_correct = 0.
    num_total = test_representations.shape[0]
    total_loss = 0.
    with torch.no_grad():
        for batch in range(0, num_total, batch_size):
            batch_repr = test_representations[batch: batch+batch_size]
            batch_labels = test_labels[batch: batch+batch_size]
            
            out = classifier(batch_repr)
            pred = out.max(1)[1]
            num_correct += pred.long().eq(batch_labels.long()).cpu().sum().item()
            total_loss += criterion(out, batch_labels)

#     print('Testing loss: {}, accuracy: {}'.format(total_loss/num_total, num_correct/num_total))
    return total_loss/num_total, num_correct/num_total

# Generate representations with pretrained model

In [61]:
# top-level list: sentences, second-level lists: layers, third-level tensors of num_words x representation_dim
train_sentence_representations = [get_sentence_repr(sentence, model, tokenizer, sep, model_name, device) 
                                  for sentence in train_sentences]
test_sentence_representations = [get_sentence_repr(sentence, model, tokenizer, sep, model_name, device) 
                                  for sentence in test_sentences]

# top-level list: layers, second-level lists: sentences
train_sentence_representations = [list(l) for l in zip(*train_sentence_representations)]
test_sentence_representations = [list(l) for l in zip(*test_sentence_representations)]

# concatenate all word represenations
train_representations_all = [torch.tensor(np.concatenate(train_layer_representations, 0)).to(device) for train_layer_representations in train_sentence_representations]
test_representations_all = [torch.tensor(np.concatenate(test_layer_representations, 0)).to(device) for test_layer_representations in test_sentence_representations]
# concatenate all labels
train_labels = torch.tensor(np.concatenate(train_labels, 0)).to(device)
test_labels = torch.tensor(np.concatenate(test_labels, 0)).to(device)


# Experiment 1: Evaluate representation for POS quality

In [62]:
# Take final layer representations
train_representations = train_representations_all[-1]
test_representations = test_representations_all[-1]

# train
train_loss, train_accuracy = train(10, train_representations, train_labels, 
          model, tokenizer, sep, model_name, device, 
          classifier, criterion, optimizer)
# test
test_loss, test_accuracy = evaluate(test_representations, test_labels, 
         model, tokenizer, sep, model_name, device, 
         classifier, criterion)
print("Train accuracy: {}, Test accuracy: {}".format(train_accuracy, test_accuracy))

Train accuracy: 0.9263512762568074, Test accuracy: 0.9164086687306502


# Experiment 2: Compare representation quality across layers

In [63]:
num_layers = len(train_representations_all)
train_accs, test_accs = [], []
for l in range(num_layers):
    # build new classifier for every layer experiment
    classifier, criterion, optimizer = build_classifier(emb_dim, num_labels, device)
    # get layer representation 
    train_representations = train_representations_all[l]
    test_representations = test_representations_all[l]
    
    # train
    train_loss, train_accuracy = train(2, train_representations, train_labels, 
          model, tokenizer, sep, model_name, device, 
          classifier, criterion, optimizer)
    train_accs.append(train_accuracy)
    # test
    test_loss, test_accuracy = evaluate(test_representations, test_labels, 
         model, tokenizer, sep, model_name, device, 
         classifier, criterion)
    test_accs.append(test_accuracy)
    print("layer: {}, train accuracy: {}, test accuracy: {}".format(l, train_accuracy, test_accuracy))

layer: 0, train accuracy: 0.8647797577149632, test accuracy: 0.8465280849181778
layer: 1, train accuracy: 0.8850072240951358, test accuracy: 0.8785935426802299
layer: 2, train accuracy: 0.9242025710369355, test accuracy: 0.9303405572755418
layer: 3, train accuracy: 0.9298336605786686, test accuracy: 0.9290137107474569
layer: 4, train accuracy: 0.9328344385581447, test accuracy: 0.9294559929234851
layer: 5, train accuracy: 0.9327973919164227, test accuracy: 0.930561698363556
layer: 6, train accuracy: 0.931648946023043, test accuracy: 0.9303405572755418
layer: 7, train accuracy: 0.9309080131886045, test accuracy: 0.9270234409553295
layer: 8, train accuracy: 0.924461897528989, test accuracy: 0.9221583370190182
layer: 9, train accuracy: 0.9146815841144, test accuracy: 0.9155241043785936
layer: 10, train accuracy: 0.9033453117474901, test accuracy: 0.9013710747456878
layer: 11, train accuracy: 0.8943800244507836, test accuracy: 0.8971693940734189
layer: 12, train accuracy: 0.864964990923572

# Experiment 3: Non-linear classifier

In [64]:
num_layers = len(train_representations_all)
train_accs, test_accs = [], []
for l in range(num_layers):
    # build non-linear classifier
    classifier, criterion, optimizer = build_nonlinear_classifier(emb_dim, num_labels, device)
    # get layer representation 
    train_representations = train_representations_all[l]
    test_representations = test_representations_all[l]
    
    # train
    train_loss, train_accuracy = train(2, train_representations, train_labels, 
          model, tokenizer, sep, model_name, device, 
          classifier, criterion, optimizer)
    train_accs.append(train_accuracy)
    # test
    test_loss, test_accuracy = evaluate(test_representations, test_labels, 
         model, tokenizer, sep, model_name, device, 
         classifier, criterion)
    test_accs.append(test_accuracy)
    print("layer: {}, train accuracy: {}, test accuracy: {}".format(l, train_accuracy, test_accuracy))

layer: 0, train accuracy: 0.8584077353387916, test accuracy: 0.8268465280849182
layer: 1, train accuracy: 0.8772274293335309, test accuracy: 0.8361344537815126
layer: 2, train accuracy: 0.9190160411958656, test accuracy: 0.8967271118973905
layer: 3, train accuracy: 0.923980291186604, test accuracy: 0.8991596638655462
layer: 4, train accuracy: 0.9252769236468714, test accuracy: 0.9033613445378151
layer: 5, train accuracy: 0.9281665617011818, test accuracy: 0.897390535161433
layer: 6, train accuracy: 0.9291297743859519, test accuracy: 0.8951791242812914
layer: 7, train accuracy: 0.9256473900640907, test accuracy: 0.9040247678018576
layer: 8, train accuracy: 0.9219427258918979, test accuracy: 0.8892083149049094
layer: 9, train accuracy: 0.9127551587448598, test accuracy: 0.8759398496240601
layer: 10, train accuracy: 0.9030489386137147, test accuracy: 0.8673153471915082
layer: 11, train accuracy: 0.8948616307931686, test accuracy: 0.8591331269349846
layer: 12, train accuracy: 0.87885748156

# Experiment 4: Control labels

In this experiment we test to see how much of the good performance from Experiments 2 and 3 actually come from things the POS model learned, and how much of it just comes from the probe model. To test this, we use a method from Hewitt and Liang (https://arxiv.org/pdf/1909.03368.pdf). We make a <i>control task</i> which is unrelated to the POS task and do the same probing procedure on the control task. We then measure the <i>selectivity</i> of layers; the difference between their probed accuracy on the POS task and on the control task. If a layer has learned substantial things about the POS task in particular, it should be much better at the POS task than the control task; i.e. it should have high selectivity.

Following Hewitt and Liang, we use the following control task for POS tagging. Each word identity will be assigned a random POS tag, with the distribution of POS tags weighted according to their actual appearance. Each word identity will always have the same tag every time it appears. We then train and test the layers on predicting this tag from the embedding. Note that this tag is a deterministic function of the word identity, so high selectivity means the embedding actually has forgotten something about the word identity.



In [65]:
import random

vocabulary = set(
    word
      for sentence in (train_sentences + test_sentences)
      for word in sentence
)
# all_labels = sum((x.tolist() for x in train_labels), [])
all_labels = train_labels.tolist()
control_map = {word: random.choice(all_labels) for word in vocabulary}

control_train_labels = [torch.tensor([control_map[word] for word in sentence]) for sentence in train_sentences]
control_test_labels = [torch.tensor([control_map[word] for word in sentence]) for sentence in test_sentences]
control_train_labels = torch.tensor(np.concatenate(control_train_labels, 0)).to(device)
control_test_labels = torch.tensor(np.concatenate(control_test_labels, 0)).to(device)


In [66]:
num_layers = len(train_representations_all)
for l in range(num_layers):
    classifier, criterion, optimizer = build_nonlinear_classifier(emb_dim, num_labels, device)
    # get layer representation 
    train_representations = train_representations_all[l]
    test_representations = test_representations_all[l]
    
    # train
    train_loss, train_accuracy = train(2, train_representations, control_train_labels, 
          model, tokenizer, sep, model_name, device, 
          classifier, criterion, optimizer)
    train_accs.append(train_accuracy)
    # test
    test_loss, test_accuracy = evaluate(test_representations, control_test_labels, 
         model, tokenizer, sep, model_name, device, 
         classifier, criterion)
    test_accs.append(test_accuracy)    
    print("layer: {}, test accuracy: {}".format(l, test_accuracy))

layer: 0, test accuracy: 0.7167182662538699
layer: 1, test accuracy: 0.7054400707651481
layer: 2, test accuracy: 0.6824413976116762
layer: 3, test accuracy: 0.6720477664750111
layer: 4, test accuracy: 0.6534719150818222
layer: 5, test accuracy: 0.6360017691287041
layer: 6, test accuracy: 0.6043785935426802
layer: 7, test accuracy: 0.5760725342768687
layer: 8, test accuracy: 0.5457762052189297
layer: 9, test accuracy: 0.5265369305616984
layer: 10, test accuracy: 0.4955771782397169
layer: 11, test accuracy: 0.4834144183989385
