In [None]:
!pip install transformers
!pip install numpy==1.26.0
!pip install torch torchvision torchaudio
!pip install --upgrade ipywidgets
!pip install pandas
!pip install torchinfo

In [None]:
!nvidia-smi

In [None]:
!git clone https://github.com/avyas21/interpretablellm.git

In [None]:
%cd interpretablellm
!ls

## Data Preprocessing

In [1]:
import pandas as pd

In [2]:
train_data = pd.read_csv("data/train_data.csv")
test_data = pd.read_csv("data/test_data.csv")

In [3]:
POSITIVE_WORDS = ["positive", "great", "good", "happy", "amazing", "fantastic", "yes"]
NEGATIVE_WORDS = ["negative", "bad", "sad", "terrible", "horrible", "no", "critical"]

In [4]:
def convert_lbl_to_int(label):
    if label.lower() in POSITIVE_WORDS:
        return 1
    if label.lower() in NEGATIVE_WORDS:
        return 0
    return -1

## Baseline Model

In [5]:
import numpy as np
print(np.__version__)
from transformers import BertModel, BertTokenizer, BertForSequenceClassification, BertConfig
from transformers import pipeline
from sklearn.metrics import f1_score
from torchinfo import summary
import torch.nn as nn
import torch
from torch.utils.data import Dataset, DataLoader
import random

1.26.0


In [6]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label)
        }

In [7]:
def get_inputs_labels(df):
    inputs = []
    labels = []
    for idx, row in train_data.iterrows():
        input_text = row['Review']
        inputs.append(input_text)
        labels.append(1 if row['Sentiment'] == 'positive' else 0)
    return inputs, labels


In [9]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
inputs, labels = get_inputs_labels(train_data)
dataset = TextDataset(inputs, labels, tokenizer, 512)
baseline_dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

## Baseline Model

In [10]:
def get_baseline_model():
  gpu_available = torch.cuda.is_available()
  model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
  if gpu_available:
    return model.to(torch.device("cuda"))
  return model

In [11]:
baseline_model = get_baseline_model()
summary(baseline_model)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Layer (type:depth-idx)                                       Param #
BertForSequenceClassification                                --
├─BertModel: 1-1                                             --
│    └─BertEmbeddings: 2-1                                   --
│    │    └─Embedding: 3-1                                   23,440,896
│    │    └─Embedding: 3-2                                   393,216
│    │    └─Embedding: 3-3                                   1,536
│    │    └─LayerNorm: 3-4                                   1,536
│    │    └─Dropout: 3-5                                     --
│    └─BertEncoder: 2-2                                      --
│    │    └─ModuleList: 3-6                                  85,054,464
│    └─BertPooler: 2-3                                       --
│    │    └─Linear: 3-7                                      590,592
│    │    └─Tanh: 3-8                                        --
├─Dropout: 1-2                                               --
├─L

In [12]:
def baseline_predict(model, df):
    model.eval()
    predictions = []
    labels = []
    gpu_available = torch.cuda.is_available()
    with torch.no_grad():
        for idx, row in df.iterrows():
            input_text = row['Review']
            encoding = tokenizer(input_text, add_special_tokens=True, max_length = 512, padding='max_length', truncation=True, return_tensors='pt')
            if gpu_available:
              input_ids = encoding['input_ids'].cuda()
              attention_mask = encoding['attention_mask'].cuda()
              output = model(encoding['input_ids'].cuda(), encoding['attention_mask'].cuda())
            else:
              input_ids = encoding['input_ids']
              attention_mask = encoding['attention_mask']
              output = model(encoding['input_ids'], encoding['attention_mask'])

            _, prediction = torch.max(output.logits, dim=1)
            predictions.append(prediction.item())
            labels.append(convert_lbl_to_int(row['Sentiment']))

    return predictions, labels

In [13]:
def baseline_score_model(model, train_df, test_df):
    train_f1 = None
    test_f1 = None

    if train_df is not None:
        train_predictions, train_labels = baseline_predict(model, train_df)
        train_f1 = f1_score(train_labels, train_predictions, average='micro')

    if test_df is not None:
        test_predictions, test_labels = baseline_predict(model, test_df)
        test_f1 = f1_score(test_labels, test_predictions, average='micro')

    return train_f1, test_f1


In [14]:
def baseline_train_test(model, epochs, dataloader, train_df, test_df, learning_rate = 1e-3, calc_train_f1 = True):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    i = 0

    for epoch in range(epochs):
        model.train()
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
    
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()

            i += 1
            if i % 10 == 0:
                print(loss)
            
        if calc_train_f1:
          train_f1, _ = baseline_score_model(model, train_df, None)
          print("Epoch: " + str(epoch) + " F1: " + str(train_f1) + " LOSS: " + str(loss))
    _, test_f1 = baseline_score_model(model, None, test_df)
    print("TEST F1: " + str(test_f1))

In [None]:
baseline_train_test(baseline_model, 1, baseline_dataloader, train_data, test_data, learning_rate = 1e-4)

tensor(0.3674, device='cuda:0', grad_fn=<NllLossBackward0>)
tensor(0.2621, device='cuda:0', grad_fn=<NllLossBackward0>)


## Model Utilities

In [None]:
def predict(model, df):
    model.eval()
    predictions = []
    labels = []
    gpu_available = torch.cuda.is_available()
    with torch.no_grad():
        for idx, row in df.iterrows():
            input_text = row['Review']
            encoding = tokenizer(input_text, add_special_tokens=True, max_length = 512, padding='max_length', truncation=True, return_tensors='pt')
            if gpu_available:
              input_ids = encoding['input_ids'].cuda()
              attention_mask = encoding['attention_mask'].cuda()
              prediction = model(encoding['input_ids'].cuda(), encoding['attention_mask'].cuda())
            else:
              input_ids = encoding['input_ids']
              attention_mask = encoding['attention_mask']
              prediction = model(encoding['input_ids'], encoding['attention_mask'])
            predictions.append(1 if prediction > 0.5 else 0)
            labels.append(convert_lbl_to_int(row['Sentiment']))

    return predictions, labels

In [None]:
def score_model(model, train_df, test_df):
    train_f1 = None
    test_f1 = None

    if train_df is not None:
        train_predictions, train_labels = predict(model, train_df)
        train_f1 = f1_score(train_labels, train_predictions, average='micro')

    if test_df is not None:
        test_predictions, test_labels = predict(model, test_df)
        test_f1 = f1_score(test_labels, test_predictions, average='micro')

    return train_f1, test_f1

In [None]:
def train_model(model, epochs, dataloader, train_df, learning_rate = 1e-3, calc_train_f1 = True):
    criterion = nn.BCELoss() ## If required define your own criterion
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr = learning_rate)
    gpu_available = torch.cuda.is_available()

    for epoch in range(epochs):
        i = 0
        for batch in dataloader:
            targets = np.array(batch['label'])
            targets = torch.tensor(np.expand_dims(targets,axis=1)).float()
            optimizer.zero_grad()
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']

            if gpu_available:
              targets = targets.cuda()
              input_ids = input_ids.cuda()
              attention_mask = attention_mask.cuda()

            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, targets)
            loss.backward()
            if i % 20 == 0:
              print(loss)
            optimizer.step()
            i += 1

        if calc_train_f1:
          train_f1, _ = score_model(model, train_df, None)
          print("Epoch: " + str(epoch) + " F1: " + str(train_f1) + " LOSS: " + str(loss))

## Probe Models

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self, n, randomized=False, linear_1 = None, linear_2 = None, requires_grad = False):
        super(CustomBERTModel, self).__init__()
        if randomized:
          config = BertConfig()
          self.bert = BertModel(config).to(torch.device("cuda"))
        else:
          self.bert=  BertModel.from_pretrained("bert-base-uncased")
            
        self.bert.encoder.layer = nn.ModuleList(self.bert.encoder.layer[:n])

        if randomized:
            # Reinitialize weights for each layer in the encoder
            for layer in self.bert.encoder.layer:
                for module in layer.modules():
                  for mod in module.modules():
                      if isinstance(mod, (nn.Linear, nn.Conv2d)):
                          nn.init.xavier_uniform_(mod.weight)
                          if mod.bias is not None:
                              nn.init.zeros_(mod.bias)
                      elif isinstance(mod, nn.LayerNorm):
                          nn.init.ones_(mod.weight)
                          nn.init.zeros_(mod.bias)

        for param in self.bert.parameters():
            param.requires_grad = requires_grad
            
        self.dropout = nn.Dropout(0.5)

        ### New layers:

        if linear_1 is not None:
          self.linear_1 = linear_1
        else:
          self.linear_1 = nn.Linear(768, 256)

        if linear_2 is not None:
          self.linear_2 = linear_2
        else:
          self.linear_2 = nn.Linear(256, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, ids, mask):
        output = self.bert(input_ids=ids, attention_mask=mask, output_hidden_states=True)
        linear1_output = self.linear_1(self.dropout(output.last_hidden_state[:,0,:]))
        linear2_output = self.linear_2(self.dropout(linear1_output))
        sigmoid_output = self.sigmoid(linear2_output)
        return sigmoid_output


In [None]:
def get_custom_bert_model(num_bert_layers, randomized_weights = False, linear_1 = None, linear_2 = None, requires_grad = False):
  gpu_available = torch.cuda.is_available()
  model = CustomBERTModel(num_bert_layers, randomized_weights, linear_1, linear_2, requires_grad)
  if gpu_available:
    return model.to(torch.device("cuda"))
  return model

## Baseline interpretability 

In [None]:
baseline_model = get_custom_bert_model(2, True)

In [None]:
def baseline_interpretability(dataloader, train_df, test_df, min_n = 1, max_n = 12):
    probe_model_12 = get_custom_bert_model(12)
    train_model(probe_model_12, 10, dataloader, train_df, 1e-4)
    linear_1 = probe_model_12.linear_1
    linear_2 = probe_model_12.linear_2

    for n in range(min_n, max_n + 1):
        print("N: " + str(n))
        model = get_custom_bert_model(n, False, linear_1, linear_2)
        train_f1, test_f1 = score_model(model, train_df, test_df)
        print("TRAIN F1: " + str(train_f1) + " TEST F1: " + str(test_f1))
    

In [None]:
baseline_interpretability(dataloader, train_data, test_data)

## Baseline Interpretability - Randomized weights

In [None]:
def score_all_randomized_models(dataloader, train_df, test_df, epochs, max_n, learning_rate = 1e-3):
    model_scores = []
    for n in range(1,max_n + 1):
        print("N: " + str(n))
        model = get_custom_bert_model(n, True)
        train_model(model, epochs, dataloader, train_df, learning_rate)
        _, test_f1 = score_model(model, None, test_df)
        print("TEST F1: " + str(test_f1))
        model_scores.append([n, test_f1])

    return model_scores

In [None]:
model_scores = score_all_randomized_models(dataloader, train_data, test_data, 10, 12)

## Probes

In [None]:
def score_all_probe_models(dataloader, train_df, test_df, epochs, min_n, max_n, learning_rate = 1e-4):
    model_scores = []
    for n in range(min_n,max_n + 1):
        print("N: " + str(n))
        model = get_custom_bert_model(n)
        train_model(model, epochs, dataloader, train_df, learning_rate)
        _, test_f1 = score_model(model, None, test_df)
        print("TEST F1: " + str(test_f1))
        model_scores.append([n, test_f1])

    return model_scores

In [None]:
score_all_probe_models(dataloader, train_data, test_data, 10, 1, 11, 1e-3)

## Scalar Mixing Weights

In [None]:
class ScalarMixingWeightModel(nn.Module):
    def __init__(self, n, i):
        super(ScalarMixingWeightModel, self).__init__()

        self.bert= BertModel.from_pretrained("bert-base-uncased")
        self.bert.encoder.layer = nn.ModuleList(self.bert.encoder.layer[:n])
        self.n = n
        self.layer_weights = nn.Parameter(torch.ones(self.n))
        self.softmax = nn.Softmax(dim=0)
        self.i = i

        self.gamma = nn.Parameter(torch.ones(1))
        self.dropout = nn.Dropout(0.5)

        for param in self.bert.parameters():
            param.requires_grad = False

        ### New layers:
        if self.i == 1:
          self.linear1 = nn.Linear(768, 1)
        elif self.i == 2:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 1)
        elif self.i == 3:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 192)
          self.linear3 = nn.Linear(192, 1)
        elif self.i == 4:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 192)
          self.linear3 = nn.Linear(192, 96)
          self.linear4 = nn.Linear(96, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, ids, mask):
        outputs = self.bert(input_ids=ids, attention_mask=mask, output_hidden_states=True)

        hidden_states = outputs.hidden_states[1:1 + self.n]

        normalized_weights = self.softmax(self.layer_weights)
        scalar_mixing_weight = self.gamma * sum(normalized_weights[i] * hidden_states[i] for i in range(self.n))

        if self.i == 1:
          linear1_output = self.dropout(self.linear1(scalar_mixing_weight[:, 0, :]))
          sigmoid_output = self.sigmoid(linear1_output)
        elif self.i == 2:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          sigmoid_output = self.sigmoid(linear2_output)
        elif self.i == 3:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          linear3_output = self.linear3(self.dropout(linear2_output))
          sigmoid_output = self.sigmoid(linear3_output)
        elif self.i == 4:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          linear3_output = self.linear3(self.dropout(linear2_output))
          linear4_output = self.linear4(self.dropout(linear3_output))
          sigmoid_output = self.sigmoid(linear4_output)

        return sigmoid_output


In [None]:
def get_scalar_mixing_model(num_bert_layers, i):
  gpu_available = torch.cuda.is_available()
  model = ScalarMixingWeightModel(num_bert_layers, i)

  if gpu_available:
    return model.to(torch.device("cuda"))

  return model

In [None]:
weights = {}
for i in range(11,13):
  test_scalar = get_scalar_mixing_model(i,1)
  train_model(test_scalar, 10, dataloader, train_data, learning_rate = 0.1)
  print(i, ":", test_scalar.layer_weights)
  weights[i] = test_scalar.layer_weights

In [None]:
lrs = [0.1, 0.01, 0.001, 0.0001]
for lr in lrs:
  for i in range(1,5):
    test_scalar = get_scalar_mixing_model(2, i)
    train_model(test_scalar, 10, dataloader, train_data, learning_rate = lr)
    print("Learning rate:", lr, "Number of layers", i)

## Final Model

In [None]:
class FinalModel(nn.Module):
    def __init__(self):
        super(FinalModel, self).__init__()
        self.bert =  BertModel.from_pretrained("bert-base-uncased")
        self.bert.encoder.layer = nn.ModuleList(self.bert.encoder.layer[:6])
        
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        ### New layers:
        self.linear_1 = nn.Linear(768, 256)
        self.linear_2 = nn.Linear(256, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, ids, mask):
        output = self.bert(input_ids=ids, attention_mask=mask, output_hidden_states=True)

        linear1_output = self.linear_1(self.dropout(output[1]))
        linear1_output = self.relu(linear1_output)
        linear2_output = self.linear_2(self.dropout(linear1_output))
        sigmoid_output = self.sigmoid(linear2_output)
        return sigmoid_output


In [None]:
def get_final_model():
  gpu_available = torch.cuda.is_available()
  model = FinalModel()
  if gpu_available:
    return model.to(torch.device("cuda"))
  return model

In [None]:
import torch.nn as nn
import torch
from transformers import AutoTokenizer, BertModel
import pandas as pd

class ScalarMixingWeightModel(nn.Module):
    def __init__(self, n, i):
        super(ScalarMixingWeightModel, self).__init__()

        self.bert= BertModel.from_pretrained("bert-base-uncased")
        self.bert.encoder.layer = nn.ModuleList(self.bert.encoder.layer[:n])
        self.n = n
        self.layer_weights = nn.Parameter(torch.ones(self.n))
        self.softmax = nn.Softmax(dim=0)
        self.i = i

        self.gamma = nn.Parameter(torch.ones(1))
        self.dropout = nn.Dropout(0.5)

        for param in self.bert.parameters():
            param.requires_grad = False

        ### New layers:
        if self.i == 1:
          self.linear1 = nn.Linear(768, 1)
        elif self.i == 2:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 1)
        elif self.i == 3:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 192)
          self.linear3 = nn.Linear(192, 1)
        elif self.i == 4:
          self.linear1 = nn.Linear(768, 384)
          self.linear2 = nn.Linear(384, 192)
          self.linear3 = nn.Linear(192, 96)
          self.linear4 = nn.Linear(96, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, ids, mask):
        outputs = self.bert(input_ids=ids, attention_mask=mask, output_hidden_states=True)

        hidden_states = outputs.hidden_states[1:1 + self.n]

        normalized_weights = self.softmax(self.layer_weights)
        scalar_mixing_weight = self.gamma * sum(normalized_weights[i] * hidden_states[i] for i in range(self.n))

        if self.i == 1:
          linear1_output = self.dropout(self.linear1(scalar_mixing_weight[:, 0, :]))
          sigmoid_output = self.sigmoid(linear1_output)
        elif self.i == 2:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          sigmoid_output = self.sigmoid(linear2_output)
        elif self.i == 3:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          linear3_output = self.linear3(self.dropout(linear2_output))
          sigmoid_output = self.sigmoid(linear3_output)
        elif self.i == 4:
          linear1_output = self.linear1(scalar_mixing_weight[:, 0, :])
          linear2_output = self.linear2(self.dropout(linear1_output))
          linear3_output = self.linear3(self.dropout(linear2_output))
          linear4_output = self.linear4(self.dropout(linear3_output))
          sigmoid_output = self.sigmoid(linear4_output)

        return sigmoid_output


In [None]:
def get_scalar_mixing_model(num_bert_layers, i):
  gpu_available = torch.cuda.is_available()
  model = ScalarMixingWeightModel(num_bert_layers, i)

  if gpu_available:
    return model.to(torch.device("cuda"))

  return model

In [None]:
weights = {}
for i in range(12,13):
  test_scalar = get_scalar_mixing_model(i,1)
  train_model(test_scalar, 10, dataloader, train_data, learning_rate = 0.1)
  print(i, ":", test_scalar.layer_weights)
  weights[i] = test_scalar.layer_weights

In [None]:
lrs = [0.1, 0.01, 0.001, 0.0001]
for lr in lrs:
  for i in range(1,5):
    test_scalar = get_scalar_mixing_model(2, i)
    train_model(test_scalar, 10, dataloader, train_data, learning_rate = lr)
    print("Learning rate:", lr, "Number of layers", i)

## Analysis

In [None]:
summary(baseline_model)

In [None]:
summary( get_custom_bert_model(10))