In [3]:
from datasets import load_dataset
from transformers import GPT2Tokenizer, DataCollatorForLanguageModeling
import random
import numpy as np
import torch  # if you're using PyTorch
# import tensorflow as tf  # if you're using TensorFlow

# Set random seed
seed_value = 42  # or any other integer

random.seed(seed_value)
np.random.seed(seed_value)

if torch.cuda.is_available():  # PyTorch-specific
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)

import torch

torch.autograd.set_detect_anomaly(True)
# Load dataset
dataset = load_dataset("dair-ai/emotion")
# Load tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
special_tokens = '[Label]'

# Add the special tokens to the tokenizer
tokenizer.add_tokens(special_tokens)
tokenizer.pad_token = tokenizer.eos_token


special_tokens_dict = {}
new_tokens = []
label2text = dataset['train'].features['label'].names

for label in label2text:
    # Create special token format (with and without space)
    special_token = f'[{label}]'
    special_token_with_space = f'[{label}]'
    
    # Check if the label is already a single token in the tokenizer
    label_tokens = tokenizer.encode(label, add_special_tokens=False)
    is_single_token = len(label_tokens) == 1
    
    if is_single_token:
        print(f"'{label}' is already a single token (ID: {label_tokens[0]})")
    
    # Add both versions to new tokens list
    new_tokens.extend([special_token])

# Add the tokens to the tokenizer
num_added_tokens = tokenizer.add_tokens(new_tokens)
print(f"\nAdded {num_added_tokens} new tokens to the tokenizer")


'joy' is already a single token (ID: 2633)
'love' is already a single token (ID: 23205)
'anger' is already a single token (ID: 2564)

Added 6 new tokens to the tokenizer




In [4]:
import torch
from transformers import Trainer
from typing import Dict, Union, Any
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        # Get input IDs and create label masks
        input_ids = inputs.get("input_ids")
        attention_mask = inputs.get("attention_mask")
        batch_size = input_ids.shape[0]
        
        # Create label masks for each sequence in the batch
        label_masks = []
        for sequence in input_ids:
            # Find the position of [Label] token
            label_start = (sequence == tokenizer.convert_tokens_to_ids("[Label]")).nonzero(as_tuple=True)[0]
            if len(label_start) > 0:
                # Create mask that's 1 for tokens after [Label] and 0 elsewhere
                mask = torch.zeros_like(sequence)
                mask[label_start[0]:] = 1
                label_masks.append(mask)
            else:
                # If no [Label] token found, mask everything
                label_masks.append(torch.zeros_like(sequence))
        
        label_masks = torch.stack(label_masks)
        
        # Shift input_ids and labels for language modeling
        labels = input_ids.clone()
        labels = labels[:, 1:].contiguous()
        label_masks = label_masks[:, 1:].contiguous()
        
        # Get model outputs
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits[:, :-1, :].contiguous()
        
        # Compute loss only on label tokens
        loss_fct = torch.nn.CrossEntropyLoss(reduction='none')
        loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
        
        # Apply label mask to loss
        loss = loss.view(batch_size, -1) * label_masks
        loss = loss.sum() / (label_masks.sum() + 1e-8)  # Add small epsilon to avoid division by zero
        
        return (loss, outputs) if return_outputs else loss

In [5]:
def format_data(examples):
    formatted_texts = []
    
    for text, label in zip(examples['text'], examples['label']):
        tok_text = tokenizer.encode(text, max_length=70, truncation=True)
        text = tokenizer.decode(tok_text)
        label_str = dataset['train'].features['label'].int2str(label)  # Convert label to string
        formatted_texts.append(f"{text}[Label][{label_str}<|endoftext|>]")
    return {'formatted_text': formatted_texts}  # Create a new field for the formatted text

# Apply formatting to the dataset
formatted_dataset = dataset.map(format_data, batched=True)

# Tokenize the formatted dataset
def tokenize_function(examples):
    return tokenizer(examples["formatted_text"], padding='max_length', max_length = 180, truncation=True)

tokenized_dataset = formatted_dataset.map(tokenize_function, batched=True)

# Data collator for language modeling
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=False
)

# Keep the original 'text' and 'label' columns intact
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "text", "label"])

from transformers import GPT2LMHeadModel as gt, Trainer, TrainingArguments
from models.gpt2 import GPT2LMHeadModel
# Load pre-trained GPT-2 model
model1 = gt.from_pretrained('gpt2')

model1.resize_token_embeddings(len(tokenizer))

model1.config.m_layer = 11
import os

base_path = os.path.join("model_weights", 'gpt2-emotion-classification')
if not os.path.exists(base_path):
    os.makedirs(base_path)

weights_path = os.path.join(base_path, "weights.pth")

torch.save(model1.state_dict(), weights_path)

model = GPT2LMHeadModel(model1.config)

model.load_state_dict(torch.load(weights_path))

# Modified training arguments
training_args = TrainingArguments(
    output_dir="./gpt2-emotion-classification",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    weight_decay=0.01,
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=3,
    logging_dir='./logs',
)

# Initialize custom trainer
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset['train'].remove_columns(['label', 'text', 'formatted_text']),
    eval_dataset=tokenized_dataset['test'].remove_columns(['label', 'text', 'formatted_text']),
    data_collator=data_collator,
    tokenizer=tokenizer,
)

# Start training
trainer.train()

torch.save(model.state_dict(), weights_path)

# model.load_state_dict(torch.load(weights_path))

Map:   0%|          | 0/16000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Map:   0%|          | 0/16000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

  model.load_state_dict(torch.load(weights_path))
Detected kernel version 5.4.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,No log,0.004898
2,0.640800,0.003087
3,0.640800,0.002693


In [2]:
tokenizer.eos_token

'<|endoftext|>'

In [2]:
for n,m in model.named_modules():
    print(n)


transformer
transformer.wte
transformer.wpe
transformer.drop
transformer.h
transformer.h.0
transformer.h.0.ln_1
transformer.h.0.attn
transformer.h.0.attn.c_attn
transformer.h.0.attn.c_proj
transformer.h.0.attn.attn_dropout
transformer.h.0.attn.resid_dropout
transformer.h.0.ln_2
transformer.h.0.mlp
transformer.h.0.mlp.c_fc
transformer.h.0.mlp.c_proj
transformer.h.0.mlp.act
transformer.h.0.mlp.dropout
transformer.h.1
transformer.h.1.ln_1
transformer.h.1.attn
transformer.h.1.attn.c_attn
transformer.h.1.attn.c_proj
transformer.h.1.attn.attn_dropout
transformer.h.1.attn.resid_dropout
transformer.h.1.ln_2
transformer.h.1.mlp
transformer.h.1.mlp.c_fc
transformer.h.1.mlp.c_proj
transformer.h.1.mlp.act
transformer.h.1.mlp.dropout
transformer.h.2
transformer.h.2.ln_1
transformer.h.2.attn
transformer.h.2.attn.c_attn
transformer.h.2.attn.c_proj
transformer.h.2.attn.attn_dropout
transformer.h.2.attn.resid_dropout
transformer.h.2.ln_2
transformer.h.2.mlp
transformer.h.2.mlp.c_fc
transformer.h.2.mlp

In [4]:
def manual_generate(model, input_ids, attention_mask, max_length, class_token_index):
    device = input_ids.device
    batch_size = input_ids.shape[0]
    
    # Initialize the output tensor with the input_ids
    generated = input_ids.clone()
    
    # Create a tensor to keep track of which sequences have finished generating
    finished_sequences = torch.zeros(batch_size, dtype=torch.bool, device=device)
    
    with torch.no_grad():
        for _ in range(max_length - input_ids.shape[1]):
            # Forward pass
            outputs = model(input_ids=generated, attention_mask=attention_mask)
            next_token_logits = outputs.logits[:, -1, :]
            
            # Apply greedy decoding (argmax)
            next_tokens = torch.argmax(next_token_logits, dim=-1)
            
            # Check if the class token is generated
            class_token_generated = (next_tokens == class_token_index)
            finished_sequences = finished_sequences | class_token_generated
            
            # Append the new tokens
            generated = torch.cat([generated, next_tokens.unsqueeze(-1)], dim=-1)
            
            # Update attention mask
            attention_mask = torch.cat([attention_mask, torch.ones((batch_size, 1), dtype=torch.long, device=device)], dim=1)
            
            # Break if all sequences have finished
            if torch.all(finished_sequences):
                break
    
    return generated


In [2]:
import nethook
def manual_generate(model, tokenizer, input_ids, attention_mask, max_length):
    device = input_ids.device
    batch_size = input_ids.shape[0]
    
    # Initialize the output tensor with the input_ids
    generated = input_ids.clone()
    
    # Create a tensor to keep track of which sequences have finished generating
    finished_sequences = torch.zeros(batch_size, dtype=torch.bool, device=device)
    
    confidences = []
    
    all_fc_vals = []
    
    with torch.no_grad():
        for _ in range(max_length - input_ids.shape[1]):
            # Forward pass
            with nethook.TraceDict(model, ['transformer.mask_layer']) as ret:
                outputs = model(input_ids=generated, attention_mask=attention_mask)
                fc1_vals = [
                        ret[layer_fc1_vals].output[:,-1,:]#.transpose(0, 1)//works without transpose somehow
                        for layer_fc1_vals in ret
                    ]
                all_fc_vals.append(fc1_vals)
            next_token_logits = outputs.logits[:, -1, :]
            
            # Apply greedy decoding (argmax)
            next_tokens = torch.argmax(next_token_logits, dim=-1)
            
            # append the confidence of the predicted token
            confidences.append(torch.nn.functional.softmax(next_token_logits, dim=-1).max(dim=-1).values)
            
            # Check if the EOS token is generated
            eos_token_generated = (next_tokens == tokenizer.eos_token_id)
            finished_sequences = finished_sequences | eos_token_generated
            
            # Replace next token with EOS token if the sequence is finished
            next_tokens = torch.where(finished_sequences, tokenizer.eos_token_id, next_tokens)
            
            # Append the new tokens
            generated = torch.cat([generated, next_tokens.unsqueeze(-1)], dim=-1)
            
            # Update attention mask
            attention_mask = torch.cat([attention_mask, (~finished_sequences).unsqueeze(-1).long()], dim=1)
            
            # Break if all sequences have finished
            if torch.all(finished_sequences):
                break
    
    return generated, torch.stack(confidences, dim=1), all_fc_vals

In [3]:
import torch
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report
def collate_fn(batch):
    return {
        'input_ids': torch.stack([torch.tensor(item['input_ids']) for item in batch]),
        'attention_mask': torch.stack([torch.tensor(item['attention_mask']) for item in batch]),
    }

def evaluate_gpt2_classification(model, eval_dataset, tokenizer, batch_size=1):
    
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.config.pad_token_id = tokenizer.pad_token_id 
    
    all_predictions = []
    all_labels = []
    confidence = 0
    j = 0 
    all_hidden = []
    for item in tqdm(eval_dataset, desc="Evaluating"):
        input_ids = torch.tensor(item['input_ids']).unsqueeze(0).to(device)
        attention_mask = torch.tensor(item['attention_mask']).unsqueeze(0).to(device)
        
        generated_sequences, confidences, fc_vals = manual_generate(model,tokenizer,input_ids,attention_mask,150)
        
        generated_sequences = generated_sequences[:, input_ids.shape[1]:][0]
        
        

        label_token_ids = tokenizer.encode('[Label]', add_special_tokens=False)
        label_len = len(label_token_ids)

        label_positions = []

        for i in range(len(generated_sequences) - label_len + 1):
            if generated_sequences[i:i+label_len].tolist() == label_token_ids:
                label_positions.append(i)
                break


        for pos in label_positions:
            predicted_label = tokenizer.decode(generated_sequences[pos+1])
            
            hidden_dim = fc_vals[pos+1]
            confidence += confidences[0][pos+1]
            j += 1

        
        all_hidden.append(hidden_dim[0][0][0])
        full_text = tokenizer.decode(input_ids[0])
        true_label = full_text.split("[Label] ")[1].split("<|endoftext|>")[0]

        all_predictions.append(predicted_label)
        all_labels.append(true_label)
    
    if not all_labels or not all_predictions:
        print("No labels were extracted. Check if '[Label]' token exists in the tokenized text.")
        return 0, "No labels extracted", [], []

    accuracy = accuracy_score(all_labels, all_predictions)
    
    # Get unique labels
    unique_labels = list(set(all_labels + all_predictions))
    
    # Generate classification report
    try:
        report = classification_report(all_labels, all_predictions, labels=unique_labels, target_names=unique_labels)
    except ValueError as e:
        report = f"Unable to generate classification report: {str(e)}"
    
    return accuracy, confidence/j, all_hidden, report, all_labels, all_predictions 

# Use the function
test_dataset = tokenized_dataset['test']
accuracy, confidence, all_hidden, report, true_labels, predicted_labels = evaluate_gpt2_classification(model, test_dataset, tokenizer)



print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print("confidence: ", confidence)
print(report)

# If you want to see the actual labels and predictions
print("\nSample of True Labels:", true_labels[:10])
print("Sample of Predicted Labels:", predicted_labels[:10])

# Check a few samples of the reconstructed text
print("\nSample of reconstructed texts:")
for i in range(5):
    full_text = tokenizer.decode(test_dataset[i]['input_ids'])
    print(f"Sample {i}: {full_text}")

# Print some statistics
print(f"\nTotal samples processed: {len(true_labels)}")
print(f"Unique true labels: {set(true_labels)}")
print(f"Unique predicted labels: {set(predicted_labels)}")

  input_ids = torch.tensor(item['input_ids']).unsqueeze(0).to(device)
  attention_mask = torch.tensor(item['attention_mask']).unsqueeze(0).to(device)
Evaluating: 100%|██████████| 2000/2000 [24:50<00:00,  1.34it/s]

Accuracy: 0.7980
Classification Report:
confidence:  tensor(0.7880, device='cuda:0')
              precision    recall  f1-score   support

     sadness       0.82      0.83      0.83       581
       anger       0.77      0.90      0.83       275
         joy       0.80      0.80      0.80       695
    surprise       0.77      0.76      0.76        66
        love       0.71      0.45      0.55       159
        fear       0.80      0.84      0.82       224

    accuracy                           0.80      2000
   macro avg       0.78      0.76      0.77      2000
weighted avg       0.80      0.80      0.79      2000


Sample of True Labels: [' sadness', ' sadness', ' sadness', ' joy', ' sadness', ' fear', ' anger', ' joy', ' joy', ' anger']
Sample of Predicted Labels: [' sadness', ' sadness', ' sadness', ' sadness', ' sadness', ' fear', ' anger', ' joy', ' joy', ' sadness']

Sample of reconstructed texts:
Sample 0: im feeling rather rotten so im not very ambitious right now [Label] 




torch.Size([768])

In [5]:
import torch
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report
from torch.utils.data import DataLoader

def evaluate_gpt2_classification(model, eval_dataset, tokenizer, batch_size=8):
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.config.pad_token_id = tokenizer.pad_token_id 
    
    all_predictions = []
    all_labels = []
    
    # Create a DataLoader for batch processing
    dataloader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)
    j = 0
    confidence_t = 0
    for batch in tqdm(dataloader, desc="Evaluating"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        with torch.no_grad():
            generated_sequences, confidences, all_fc_vals = manual_generate(model,tokenizer,input_ids,attention_mask,150)

        label_token_ids = tokenizer.encode('[Label]', add_special_tokens=False)
        label_len = len(label_token_ids)
        endoftext_ids = tokenizer.encode('<|endoftext|>', add_special_tokens=False)
        endoftext_len = len(endoftext_ids)

        for sequence, orig_sequence, conf, fc_vals in zip(generated_sequences, input_ids, confidences, all_fc_vals[0]):
            label_positions = []
            endoftext_positions = []

            for i in range(len(orig_sequence) - label_len + 1):
                if orig_sequence[i:i+label_len].tolist() == label_token_ids:
                    label_positions.append(i)

            for i in range(len(orig_sequence) - endoftext_len + 1):
                if orig_sequence[i:i+endoftext_len].tolist() == endoftext_ids:
                    endoftext_positions.append(i)
                    break

            for pos, end in zip(label_positions, endoftext_positions):
                predicted_label = tokenizer.decode(sequence[pos+1:end])
                confidence_t += conf[pos+1:end].item()
                token_fc = fc_vals[0][pos+1:end]
                j += 1
                

            full_text = tokenizer.decode(orig_sequence)
            true_label = full_text.split("[Label] ")[1].split("<|endoftext|>")[0]

            all_predictions.append(predicted_label)
            all_labels.append(true_label)
    
    if not all_labels or not all_predictions:
        print("No labels were extracted. Check if '[Label]' token exists in the tokenized text.")
        return 0, "No labels extracted", [], []

    accuracy = accuracy_score(all_labels, all_predictions)
    
    unique_labels = list(set(all_labels + all_predictions))
    
    try:
        report = classification_report(all_labels, all_predictions, labels=unique_labels, target_names=unique_labels)
    except ValueError as e:
        report = f"Unable to generate classification report: {str(e)}"
    
    return accuracy, report, all_labels, all_predictions, confidence_t/j

# Usage
test_dataset = tokenized_dataset['test']
accuracy, report, true_labels, predicted_labels, confidence = evaluate_gpt2_classification(model, test_dataset, tokenizer, batch_size=8)

print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")

print(f"Confidence: {confidence:.4f}")
print(report)

print("\nSample of True Labels:", true_labels[:10])
print("Sample of Predicted Labels:", predicted_labels[:10])

print("\nSample of reconstructed texts:")
for i in range(5):
    full_text = tokenizer.decode(test_dataset[i]['input_ids'])
    print(f"Sample {i}: {full_text}")

print(f"\nTotal samples processed: {len(true_labels)}")
print(f"Unique true labels: {set(true_labels)}")
print(f"Unique predicted labels: {set(predicted_labels)}")

Evaluating:   0%|          | 0/250 [00:00<?, ?it/s]

Evaluating:   2%|▏         | 4/250 [00:52<53:31, 13.06s/it]  


KeyboardInterrupt: 

In [4]:
from datasets import load_dataset
from transformers import GPT2Tokenizer

# Load dataset and tokenizer
dataset = load_dataset("fancyzhx/dbpedia_14")
dataset = dataset.rename_column("content", "text")
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# Get unique labels and their text representations
label2text = dataset['train'].features['label'].names
print("Original labels:", label2text)

# Create special tokens for each label
special_tokens_dict = {}
new_tokens = []

for label in label2text:
    # Create special token format (with and without space)
    special_token = f'[{label}]'
    special_token_with_space = f'[{label}]'
    
    # Check if the label is already a single token in the tokenizer
    label_tokens = tokenizer.encode(label, add_special_tokens=False)
    is_single_token = len(label_tokens) == 1
    
    if is_single_token:
        print(f"'{label}' is already a single token (ID: {label_tokens[0]})")
    
    # Add both versions to new tokens list
    new_tokens.extend([special_token])

# Add the tokens to the tokenizer
num_added_tokens = tokenizer.add_tokens(new_tokens)
print(f"\nAdded {num_added_tokens} new tokens to the tokenizer")

# Print some examples of the new tokens
print("\nExample tokenization:")
for label in label2text[:3]:  # Show first 3 labels as examples
    special_token = f'[{label}]'
    special_token_with_space = f'[{label} ]'
    print(f"\nTokenizing '{special_token}':")
    print(tokenizer.encode(special_token))

Original labels: ['Company', 'EducationalInstitution', 'Artist', 'Athlete', 'OfficeHolder', 'MeanOfTransportation', 'Building', 'NaturalPlace', 'Village', 'Animal', 'Plant', 'Album', 'Film', 'WrittenWork']
'Company' is already a single token (ID: 39154)
'Artist' is already a single token (ID: 43020)
'Building' is already a single token (ID: 25954)
'Animal' is already a single token (ID: 40002)
'Film' is already a single token (ID: 39750)

Added 14 new tokens to the tokenizer

Example tokenization:

Tokenizing '[Company]':
[50257]

Tokenizing '[EducationalInstitution]':
[50258]

Tokenizing '[Artist]':
[50259]
