# Fine Tuning GPT2 model 

## Imports and Setup



In [56]:
# Import necessary libraries
import torch  # PyTorch for building and training neural networks
import numpy as np  # NumPy for numerical operations
import pandas as pd  # Pandas for data manipulation
import matplotlib.pyplot as plt  # Matplotlib for plotting graphs
from torch import nn  # Neural network module from PyTorch
from torch.optim import Adam  # Adam optimizer for training
from transformers import GPT2Model, GPT2Tokenizer  # GPT-2 model and tokenizer from Hugging Face
from tqdm import tqdm  # Tqdm for progress bars
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay  # Metrics for model evaluation
from sklearn.utils import resample  # Utility for data resampling
import os  # Operating system interfaces
from peft import get_peft_model, LoraConfig  # For handling parameter-efficient fine-tuning


## Data Loading and Preprocessing

This section focuses on loading the dataset from a CSV file, cleaning and preprocessing the data, splitting multi-label information into separate columns, and balancing the dataset to ensure each class has a similar number of samples.


In [58]:
# Load data from CSV file
file_path = 'est_food_bev_alc.csv'
original_df = pd.read_csv(file_path)

# Clean the data by dropping unnecessary columns
data_cleaned = original_df.drop(columns=['link', 'typeID'])

# Split the 'label' column into multiple parts
label_split = data_cleaned['label'].str.split('/', expand=True)
label_split.columns = [f'label_part_{i+1}' for i in range(label_split.shape[1])]

# Combine the cleaned data with the split labels
data_expanded = pd.concat([data_cleaned.drop(columns=['label']), label_split], axis=1)

# Filter data to only include labels that have a minimum count
filtered_df = data_expanded[data_expanded['label_part_2'].map(data_expanded['label_part_2'].value_counts()) >= 600]

# Balance the dataset to have equal representation for each class
min_sample_size = filtered_df['label_part_2'].value_counts().min()
balanced_df_list = []
for label in filtered_df['label_part_2'].unique():
    label_df = filtered_df[filtered_df['label_part_2'] == label]
    balanced_label_df = resample(label_df, 
                                 replace=len(label_df) < min_sample_size,  
                                 n_samples=min_sample_size, 
                                 random_state=42)
    balanced_df_list.append(balanced_label_df)

# Concatenate balanced data
balanced_df = pd.concat(balanced_df_list)


##  Tokenization and Dataset Preparation

Here, we initialize the GPT-2 tokenizer, set up the tokenization parameters, and define a custom PyTorch dataset class to handle text and label data. We also split the dataset into training, validation, and test sets.


In [None]:
# Map unique labels to numerical values for classification
unique_labels = balanced_df['label_part_2'].unique()
label_to_number = {label: idx for idx, label in enumerate(unique_labels, start=0)}

# Define the number of classes
num_classes = len(unique_labels)

# Initialize the GPT-2 tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.padding_side = "left"
tokenizer.pad_token = tokenizer.eos_token

# Mapping labels to numbers
labels = label_to_number

# Define a custom dataset class
class Dataset(torch.utils.data.Dataset):
    def __init__(self, df):
        # Store labels and texts
        self.labels = [labels[label] for label in df['label_part_2']]
        self.texts = [tokenizer(text, padding='max_length', max_length=128, truncation=True, return_tensors="pt") for text in df['product']]
        
    def classes(self):
        # Return the class labels
        return self.labels
    
    def __len__(self):
        # Return the length of the dataset
        return len(self.labels)
    
    def get_batch_labels(self, idx):
        # Return a specific batch of labels
        return np.array(self.labels[idx])
    
    def get_batch_texts(self, idx):
        # Return a specific batch of texts
        return self.texts[idx]
    
    def __getitem__(self, idx):
        # Retrieve a batch of texts and labels
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y

# Split data into training, validation, and test sets
np.random.seed(112)
df_train, df_val, df_test = np.split(balanced_df.sample(frac=1, random_state=35),
                                     [int(0.8*len(balanced_df)), int(0.9*len(balanced_df))])

print(len(df_train), len(df_val), len(df_test))


## Model Definition

We define the GPT-2 based sequence classifier model. The model consists of a pre-trained GPT-2 backbone followed by a linear layer that outputs class predictions.


In [59]:
# Define the GPT-2 based sequence classifier
class GPT2SequenceClassifier(nn.Module):
    def __init__(self, hidden_size: int, num_classes:int ,max_seq_len:int, gpt_model_name:str):
        super(GPT2SequenceClassifier, self).__init__()
        # Load the pre-trained GPT-2 model
        self.gpt2model = GPT2Model.from_pretrained(gpt_model_name)
        # Define a linear layer for classification
        self.fc1 = nn.Linear(hidden_size * max_seq_len, num_classes)

    def forward(self, input_id, mask):
        # Get output from GPT-2 model
        gpt_out, _ = self.gpt2model(input_ids=input_id, attention_mask=mask, return_dict=False)
        # Reshape output and pass through the linear layer
        batch_size = gpt_out.shape[0]
        linear_output = self.fc1(gpt_out.view(batch_size, -1))
        return linear_output


## Training Setup



In [60]:
# Determine the device to use (MPS, CUDA, or CPU)
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Set learning rate and number of epochs
LR = 1e-5
EPOCHS = 10
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
print(f"Using device: {device}")

# Initialize the model and optimizer
model = GPT2SequenceClassifier(hidden_size=768, num_classes=num_classes, max_seq_len=128, gpt_model_name="gpt2")
optimizer = Adam(model.parameters(), lr=LR)

# Move model to the appropriate device
model.to(device)

# Create a directory to save training logs and model checkpoints
save_folder = 'training_logs'
os.makedirs(save_folder, exist_ok=True)


Using device: mps


## Training Loop

In [None]:
# Define the training function
def train(model, train_data, val_data, epochs):
    # Prepare datasets and data loaders
    train, val = Dataset(train_data), Dataset(val_data)
    train_dataloader = torch.utils.data.DataLoader(train, batch_size=2, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=2)
    
    train_losses = []
    val_losses = []

    # Training over the specified number of epochs
    for epoch_num in range(epochs):
        total_acc_train = 0
        total_loss_train = 0
        
        model.train()  # Set the model to training mode
        for train_input, train_label in tqdm(train_dataloader, desc=f'Epoch {epoch_num+1} Training'):
            train_label = train_label.to(device)
            mask = train_input['attention_mask'].to(device)
            input_id = train_input["input_ids"].squeeze(1).to(device)
            
            model.zero_grad()  # Zero the gradients

            output = model(input_id, mask)  # Forward pass
            
            batch_loss = criterion(output, train_label)  # Compute loss
            total_loss_train += batch_loss.item()
            
            acc = (output.argmax(dim=1) == train_label).sum().item()  # Calculate accuracy
            total_acc_train += acc

            batch_loss.backward()  # Backpropagation
            optimizer.step()  # Update model parameters
        
        # Record training loss
        train_losses.append(total_loss_train / len(train_data))
        
        # Validation phase
        total_acc_val = 0
        total_loss_val = 0
        
        model.eval()  # Set the model to evaluation mode
        with torch.no_grad():  # No need to track gradients
            for val_input, val_label in tqdm(val_dataloader, desc=f'Epoch {epoch_num+1} Validation'):
                val_label = val_label.to(device)
                mask = val_input['attention_mask'].to(device)
                input_id = val_input['input_ids'].squeeze(1).to(device)
                
                output = model(input_id, mask)  # Forward pass
                
                batch_loss = criterion(output, val_label)  # Compute loss
                total_loss_val += batch_loss.item()
                
                acc = (output.argmax(dim=1) == val_label).sum().item()  # Calculate accuracy
                total_acc_val += acc

            # Record validation loss
            val_losses.append(total_loss_val / len(val_data))

            # Print training and validation statistics
            print(
                f"Epochs: {epoch_num + 1} | Train Loss: {train_losses[-1]: .3f} \
                | Train Accuracy: {total_acc_train / len(train_data): .3f} \
                | Val Loss: {val_losses[-1]: .3f} \
                | Val Accuracy: {total_acc_val / len(val_data): .3f}")

        # Save model checkpoint
        torch.save(model.state_dict(), os.path.join(save_folder, f'model_epoch_{epoch_num+1}.pt'))
        
    # Save loss data to a file
    with open(os.path.join(save_folder, 'losses.txt'), 'w') as f:
        for i in range(epochs):
            f.write(f'Epoch {i+1}, Train Loss: {train_losses[i]}, Val Loss: {val_losses[i]}\n')

# Call the training function
train(model, df_train, df_val, EPOCHS)


## Evaluation and Testing


In [None]:
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, confusion_matrix, ConfusionMatrixDisplay, classification_report

# Define the testing function
def test(model, test_data):
    # Create a dataset and dataloader for the test data
    test_dataset = Dataset(test_data)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=2)
    
    model.eval()  # Set the model to evaluation mode
    
    all_preds = []  # List to store all predictions
    all_labels = []  # List to store all true labels

    with torch.no_grad():  # No need to track gradients during testing
        for test_input, test_label in tqdm(test_dataloader, desc='Testing'):
            test_label = test_label.to(device)  # Move labels to device (CPU/GPU)
            mask = test_input['attention_mask'].to(device)  # Move attention mask to device
            input_id = test_input['input_ids'].squeeze(1).to(device)  # Move input IDs to device
            
            output = model(input_id, mask)  # Forward pass to get model predictions
            
            preds = output.argmax(dim=1)  # Get predicted class
            all_preds.extend(preds.cpu().numpy())  # Store predictions
            all_labels.extend(test_label.cpu().numpy())  # Store true labels
    
    # Calculate evaluation metrics
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    recall = recall_score(all_labels, all_preds, average='weighted')  # Calculate recall
    f1 = f1_score(all_labels, all_preds, average='weighted')  # Calculate F1 score
    precision = precision_score(all_labels, all_preds, average='weighted')  # Calculate precision

    # Print evaluation results
    print(f'Accuracy: {accuracy:.3f}')
    print(f'Recall: {recall:.3f}')
    print(f'F1 Score: {f1:.3f}')
    print(f'Precision: {precision:.3f}')
    
    # Print a detailed classification report
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=unique_labels))  

    return all_labels, all_preds  # Return true labels and predictions for further analysis

# Call the test function to evaluate the model on the test dataset
all_labels, all_preds = test(model, df_test)


In [53]:
model.load_state_dict(torch.load('gpt2_classifier.pth'),map_location=device)
def test(model, test_data):
    test_dataset = Dataset(test_data)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=2)
    
    model.eval()
    
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for test_input, test_label in tqdm(test_dataloader, desc='Testing'):
            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)
            
            output = model(input_id, mask)
            
            preds = output.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(test_label.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    precision = precision_score(all_labels, all_preds, average='weighted')

    print(f'Accuracy: {accuracy:.3f}')
    print(f'Recall: {recall:.3f}')
    print(f'F1 Score: {f1:.3f}')
    print(f'Precision: {precision:.3f}')
    
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=unique_labels))  

    return all_labels, all_preds

all_labels, all_preds = test(model, df_test)


Testing: 100%|██████████| 619/619 [00:29<00:00, 20.97it/s]

Accuracy: 0.834
Recall: 0.834
F1 Score: 0.831
Precision: 0.840

Classification Report:
               precision    recall  f1-score   support

         beer       0.95      0.89      0.92        93
      spirits       0.94      0.90      0.92       115
         wine       0.88      0.99      0.93       100
        juice       0.94      0.95      0.95       103
          tea       0.96      0.96      0.96        84
         deli       0.64      0.71      0.68        97
  baked goods       0.64      0.86      0.73        83
        dairy       0.81      0.95      0.87       102
         meat       0.90      0.83      0.86        95
      seafood       0.85      0.86      0.85        71
fresh produce       0.90      0.79      0.84       109
  frozen food       0.79      0.48      0.60        93
       pantry       0.67      0.65      0.66        93

     accuracy                           0.83      1238
    macro avg       0.84      0.83      0.83      1238
 weighted avg       0.84      0




In [39]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.metrics import confusion_matrix


cm = confusion_matrix(all_labels, all_preds)

cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

unique_labels = sorted(set(all_labels))

fig = go.Figure(
    data=go.Heatmap(
        z=cm_normalized,
        x=unique_labels,  
        y=unique_labels,  
        colorscale='Viridis',
        showscale=True,
        zmin=0,
        zmax=1,
        hoverongaps=False,
        colorbar=dict(title='Proportion')
    )
)

for i in range(len(unique_labels)):
    for j in range(len(unique_labels)):
        fig.add_annotation(
            x=unique_labels[j],
            y=unique_labels[i],
            text=f'{cm_normalized[i, j]:.2f}',
            showarrow=False,
            font=dict(size=12, color="white" if cm_normalized[i, j] > 0.5 else "black")
        )

fig.update_layout(
    title='Normalized Confusion Matrix',
    xaxis_title='Predicted Label',
    yaxis_title='True Label',
    xaxis=dict(
        tickmode='array',
        tickvals=list(range(len(unique_labels))),
        ticktext=unique_labels,
        title='Predicted Label'
    ),
    yaxis=dict(
        tickmode='array',
        tickvals=list(range(len(unique_labels))),
        ticktext=unique_labels,
        title='True Label'
    ),
    autosize=False,
    width=800,
    height=800,  
    margin=dict(l=100, r=100, t=100, b=100)  
)

fig.show()


In [51]:
cm = confusion_matrix(all_labels, all_preds)

cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

cm_text = np.around(cm_normalized, decimals=2).astype(str)

fig = go.Figure(data=go.Heatmap(
    z=cm_normalized,
    x=unique_labels,  
    y=unique_labels, 
    text=cm_text, 
    texttemplate="%{text}",  
    hoverongaps=False,
    colorscale='Viridis',  
    zmin=0, zmax=1
))

fig.update_layout(
    title='Normalized Confusion Matrix',
    xaxis=dict(title='Predicted Label'),
    yaxis=dict(title='True Label'),
    margin=dict(l=50, r=50, t=50, b=50),
    height=800, width=800
    
)

fig.show()