In [1]:
import sys
import os
from logging import getLogger, ERROR

notebook_dir = os.getcwd()
parent_dir = os.path.dirname(notebook_dir)
# Fix module imports
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

# Disable Hugging Face warnings
getLogger("transformers.modeling_utils").setLevel(ERROR)

In [30]:
from pandas import read_csv
from datasets import Dataset
from sklearn.metrics import f1_score
import numpy
import torch
from transformers import GPT2Tokenizer
from model.qgpt2_models import SingleHeadQGPT2Model
import time

tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2")
model = SingleHeadQGPT2Model.from_pretrained("openai-community/gpt2", n_bits=8,use_cache=False)

for param in model.parameters():
    param.requires_grad = False

tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id

In [4]:
df = read_csv("../data/Tweets.csv")


df["airline_sentiment"] = df["airline_sentiment"].replace(
    ["negative", "neutral", "positive"], [0, 1, 2]
)

dataset = Dataset.from_pandas(df)
dataset = dataset.select_columns(["text", "airline_sentiment"])
dataset = dataset.rename_column("airline_sentiment", "label")

ds_dict = dataset.train_test_split(test_size=0.1, seed=42)
train_ds = ds_dict["train"]
eval_ds = ds_dict["test"]

In [25]:
# Function that transforms a list of texts to their representation
# learned by the transformer.

def get_hidden_states(
    inputs: list,
    transformer_model,
    tokenizer: GPT2Tokenizer,
    device: str = "cuda",
):
    # Tokenize each text in the list one by one
    tokenized = map(lambda x: tokenizer.encode(x, return_tensors="pt"), inputs)

    # Send the model to the device
    transformer_model = transformer_model.to(device)
    output_hidden_states_list = []

    for tokenized_x in tokenized:
        # Pass the tokens through the transformer model and get the hidden states
        # Only keep the last hidden layer state for now
        output_hidden_states = transformer_model(tokenized_x.to(device), output_hidden_states=True)[0]
        # Average over the tokens axis to get a representation at the text level.
        output_hidden_states = output_hidden_states.mean(dim=1)
        output_hidden_states = output_hidden_states.detach().cpu().numpy()
        output_hidden_states_list.append(output_hidden_states)

    return numpy.concatenate(output_hidden_states_list, axis=0)

hidden_states = get_hidden_states(train_ds["text"], model, tokenizer)
x_test_states = get_hidden_states(eval_ds["text"], model, tokenizer)

In [6]:
numpy.savetxt("train_hidden_states.csv", hidden_states, delimiter=",")
numpy.savetxt("test_hidden_states.csv", x_test_states, delimiter=",")

## Using nn.Linear

In [31]:
import torch

y_train = torch.tensor(train_ds["label"])
y_test = torch.tensor(eval_ds["label"])

# num_labels is 3
score = torch.nn.Linear(768, 3, bias=False)

torch.cuda.manual_seed_all(42)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    score.parameters(),
    lr=2e-5,
    weight_decay=0.01,
) 

batch_size = 16
dataset = torch.utils.data.TensorDataset(torch.tensor(hidden_states), y_train)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size)

# Training loop
num_epochs = 3

for epoch in range(num_epochs):
    for inputs, labels in dataloader:
        # Forward pass
        outputs = score(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


# Evaluation
with torch.no_grad():
    start = time.perf_counter()
    outputs = score(torch.tensor(x_test_states))
    end = time.perf_counter()
    _, y_pred = torch.max(outputs, 1)
    f1 = f1_score(y_test, y_pred, average="macro")

    f1s = f1_score(y_test, y_pred, average=None)

print(f"Run time: {end - start:.4f} seconds")
print(f"Macro F1: {f1:.4f}")
print(f"F1 score for negative class: " f"{f1s[0]:.4f}")
print(f"F1 score for neutral class: " f"{f1s[1]:.4f}")
print(f"F1 score for positive class: " f"{f1s[2]:.4f}")

torch.Size([13176])
Epoch [1/3], Loss: 0.9978
Epoch [2/3], Loss: 0.9681
Epoch [3/3], Loss: 0.9563
Run time: 0.0005 seconds
Macro F1: 0.4268
F1 score for negative class: 0.8131
F1 score for neutral class: 0.2613
F1 score for positive class: 0.2059
