In [1]:
from transformers import GPT2Model,GPT2Config
import torch
import torch.nn as nn

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
import pandas as pd
import tiktoken
import torch

class Tokenizer:
    def __init__(self, file, tokenizer, max_len=None, pad_token_id=50256):
        # Load dataset
        self.file = pd.read_csv(file)
        self.tokenizer = tokenizer
        self.pad_token_id = pad_token_id
        
        # Encode all texts
        self.encodings = [self.tokenizer.encode(text) for text in self.file["tweet"]]
        
        # Determine max length
        if max_len is None:
            self.max_len = self.get_max_len()
        else:
            self.max_len = max_len
        
        # Truncate and pad sequences
        self.encodings = [
            enc[:self.max_len] + [self.pad_token_id] * (self.max_len - len(enc[:self.max_len]))
            for enc in self.encodings
        ]

    # PyTorch-style getter
    def __getitem__(self, idx):
        item = self.encodings[idx]
        label = self.file.iloc[idx]["label"]
        return torch.tensor(item), torch.tensor(label)

    # PyTorch-style length
    def __len__(self):
        return len(self.encodings)

    # Compute max sequence length from data
    def get_max_len(self):
        max_len = max(len(enc) for enc in self.encodings)
        return max_len


  # Usage
tokenizer = tiktoken.get_encoding("gpt2")
train_tokenizer = Tokenizer("../data/new_dataset/train_data.csv", tokenizer=tokenizer)
train_max_len = train_tokenizer.get_max_len()
print("Max length in dataset:", train_max_len)


Max length in dataset: 277


In [3]:
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
class LanguageClassifier(nn.Module):
    def __init__(self,num_labels,model_path="./gpt2_models"):
        super(LanguageClassifier,self).__init__()
        self.config=GPT2Config.from_pretrained(model_path, local_files_only=True)
        self.pretrained_model=GPT2Model.from_pretrained(model_path, local_files_only=True)
        self.classifier_head=nn.Linear(self.config.hidden_size,num_labels)
    def forward(self,input_ids,attention_mask=None):
        outputs=self.pretrained_model(input_ids=input_ids,attention_mask=attention_mask)
        last_hidden_state=outputs.last_hidden_state
        pooled_output=last_hidden_state[:, -1, :]
        logits=self.classifier_head(pooled_output)
        return logits


In [4]:
#now let's train our model but first create dataloaders 
train_dataset=Tokenizer("../data/new_dataset/train_data.csv",tokenizer=tokenizer,max_len=None)
val_dataset=Tokenizer("../data/new_dataset/val_data.csv",tokenizer=tokenizer,max_len=train_dataset.max_len)
test_dataset=Tokenizer("../data/new_dataset/test_data.csv",tokenizer=tokenizer,max_len=train_dataset.max_len)
from torch.utils.data import DataLoader
torch.manual_seed(42)
train_loader=DataLoader(
    dataset=train_dataset,
    batch_size=8,
    num_workers=0,
    drop_last=True,
    shuffle=True
)
test_loader=DataLoader(
    dataset=test_dataset,
    batch_size=8,
    num_workers=0,
    drop_last=True,
    shuffle=True
)
val_loader=DataLoader(
    dataset=val_dataset,
    batch_size=8,
    num_workers=0,
    drop_last=True,
    shuffle=True
)

In [5]:
def cal_accuracy(data_loader,model,device,num_batches=None):
    model.eval()
    correct_pred,num_examples=0,0
    if num_batches is None:
        num_batches=len(data_loader)
    else:
        num_batches=min(num_batches,len(data_loader))
    for i,(input_ids,labels) in enumerate(data_loader):
     if i<num_batches:
        input_ids=input_ids.to(device)
        labels=labels.to(device)
        with torch.no_grad():
          probas=torch.softmax(model(input_ids),dim=1)
        label_preds=torch.argmax(probas,dim=1)
        correct_pred+=(label_preds==labels).sum().item()
        num_examples+=labels.size(0)
     else:
        break
    return correct_pred/num_examples
       
    

In [6]:
# model=LanguageClassifier(num_labels=4).to(device)
# torch.manual_seed(42)
# train_accuracy=cal_accuracy(train_loader,model,device,num_batches=10)
# val_accuracy=cal_accuracy(val_loader,model,device,num_batches=10)
# test_accuracy=cal_accuracy(test_loader,model,device,num_batches=10)
# print(f"Train accuracy:{train_accuracy*100:.2f}%")
# print(f"Validation accuracy:{val_accuracy*100:.2f}%")
# print(f"Test accuracy:{test_accuracy*100:.2f}%")

In [7]:
class TrainClassifier:
    def __init__(self, model, train_loader, val_loader, test_loader, device, learning_rate=3e-5, epochs=10):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.device = device
        self.learning_rate = learning_rate
        self.epochs = epochs

        self.train_loss, self.val_loss, self.test_loss = [], [], []
        self.train_acc, self.val_acc, self.test_acc = [], [], []
        self.epochs_list = []

        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate)

    def train(self):
        for epoch in range(self.epochs):
            self.model.train()
            epoch_loss = 0

            for input_ids, labels in self.train_loader:
                input_ids = input_ids.to(self.device)
                labels = labels.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(input_ids)  # add attention_mask if needed
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                epoch_loss += loss.item()

            epoch_loss /= len(self.train_loader)
            self.train_loss.append(epoch_loss)
            print(f"Epoch {epoch+1}/{self.epochs}, Loss={epoch_loss:.4f}")

            # Evaluate
            self.model.eval()
            with torch.no_grad():
                train_acc = cal_accuracy(self.train_loader, self.model, self.device, num_batches=10)
                val_acc = cal_accuracy(self.val_loader, self.model, self.device, num_batches=10)
                test_acc = cal_accuracy(self.test_loader, self.model, self.device, num_batches=10)

            self.train_acc.append(train_acc)
            self.val_acc.append(val_acc)
            self.test_acc.append(test_acc)
            self.epochs_list.append(epoch + 1)

            print(f"Epoch {epoch+1}/{self.epochs}, "
                  f"Train Acc: {train_acc*100:.2f}%, "
                  f"Val Acc: {val_acc*100:.2f}%, "
                  f"Test Acc: {test_acc*100:.2f}%")
        torch.save(self.model.state_dict(), "language_classifier_model.pth")
        return self.train_acc, self.val_acc, self.test_acc, self.epochs_list


In [None]:
import time
start_time=time.time()
Train=TrainClassifier(model=LanguageClassifier(num_labels=4).to(device),train_loader=train_loader,val_loader=val_loader,test_loader=test_loader,device=device,learning_rate=3e-5,epochs=3)
train_acc,val_acc,test_acc,epoch_list=Train.train()
end_time = time.time()
execution_time=(end_time - start_time)/60
print(f"Training time: {execution_time} minutes")