In [None]:
%pip install transformers
%pip install scikit-learn
%pip install modAL
%pip install datasets
%pip install accelerate -U

In [22]:
import transformers
from sklearn.model_selection import train_test_split
import pandas as pd
import torch
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import TrainingArguments, Trainer, AutoModelForSequenceClassification
from transformers import AutoTokenizer

In [23]:
# Setting up the device for GPU usage

from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

cuda


In [None]:
# Define the mapping from emotion names to numbers
emotion_mapping = {
    'admiration': 1, 'amusement': 2, 'anger': 3, 'annoyance': 4,
    'approval': 5, 'caring': 6, 'confusion': 7, 'curiosity': 8,
    'desire': 9, 'disappointment': 10, 'disapproval': 11, 'disgust': 12,
    'embarrassment': 13, 'excitement': 14, 'fear': 15, 'gratitude': 16,
    'grief': 17, 'joy': 18, 'love': 19, 'nervousness': 20, 'optimism': 21,
    'pride': 22, 'realization': 23, 'relief': 24, 'remorse': 25, 'sadness': 26,
    'surprise': 27, 'neutral': 28
}

def process_csv(file_path):
    """Processes a single CSV file."""
    df = pd.read_csv(file_path, delimiter=',')

    # Find emotion numbers (same as your existing code)
    emotion_numbers = []
    for index, row in df.iterrows():
        emotion_number = 0
        for emotion, number in emotion_mapping.items():
            if row['emotion'] == emotion:
                emotion_number = number
                break
        emotion_numbers.append(emotion_number)

    # Add emotion numbers as a new column
    df['emotion'] = emotion_numbers

    # Select relevant columns and ensure data types
    new_df = df[['text', 'emotion']]
    new_df['text'] = new_df['text'].astype(str)
    new_df['emotion'] = new_df['emotion'].astype(int)

    return new_df

initial_labeled_data = process_csv('active_learning_emotions.csv')

In [None]:
initial_labeled_data.shape
print(initial_labeled_data)

In [None]:
enron_dataset = load_dataset('SetFit/enron_spam')
data = enron_dataset['train']  # Access the 'train' split of the dataset
df = pd.DataFrame(data)

# Select relevant columns and ensure data types
X_unlabeled = pd.DataFrame()
X_unlabeled['text'] = df['message'].astype(str)
X_unlabeled['emotion'] = df['label'].astype(int)
print(X_unlabeled)

In [27]:
# Defining some key variables that will be used later on in the training
MAX_LEN = 256
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 4
# EPOCHS = 1
LEARNING_RATE = 1e-05

# Load pre-trained RoBERTa
model_name = "SamLowe/roberta-base-go_emotions"
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
# model = transformers.AutoModelForSequenceClassification.from_pretrained(model_name)

In [28]:
# tokenizer = AutoTokenizer.from_pretrained(model_name)  # Use your model_name

# def tokenize_function(examples):
#     return tokenizer(examples, padding='max_length', truncation=True)

# tokenized_dataset = initial_labeled_data['text'].map(tokenize_function)

# Updated logic for accessing tokenized data (assuming PyTorch)
# X_train = [encoding['input_ids'] for encoding in tokenized_dataset]
# y_train = [encoding['input_ids'] for encoding in initial_labeled_data['emotion'].map(tokenize_function)]

class SentimentData(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.text = dataframe.text
        self.targets = self.data.emotion
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]


        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float)
        }


In [29]:
train_data = initial_labeled_data.reset_index(drop=True)
test_data = X_unlabeled.reset_index(drop=True)
training_set = SentimentData(train_data, tokenizer, MAX_LEN)
testing_set = SentimentData(test_data[:100], tokenizer, MAX_LEN)

In [30]:
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)

In [31]:
class RobertaEmotionsClass(torch.nn.Module):
    def __init__(self):
        super(RobertaEmotionsClass, self).__init__()
        self.l1 = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.3)
        self.classifier = torch.nn.Linear(768, 29)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        # hidden_state = output_1[0]
        # pooler = hidden_state[:, 0]
        # pooler = self.pre_classifier(pooler)
        # pooler = torch.nn.ReLU()(pooler)
        # pooler = self.dropout(pooler)
        # output = self.classifier(pooler)
        return output_1.logits

In [None]:
print("MODEL NAME {}".format(model_name))
model = RobertaEmotionsClass()
model.to(device)

## Fine tune the model

In [33]:
# Creating the loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params =  model.parameters(), lr=LEARNING_RATE)

In [34]:
def calcuate_accuracy(preds, targets):
    n_correct = (preds==targets).sum().item()
    return n_correct

In [35]:
# Defining the training function on the 80% of the dataset for tuning the distilbert model

def train(epoch):
    tr_loss = 0
    n_correct = 0
    nb_tr_steps = 0
    nb_tr_examples = 0
    model.train()
    for _,data in tqdm(enumerate(training_loader, 0)):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(ids, mask, token_type_ids)
        loss = loss_function(outputs, targets)
        tr_loss += loss.item()
        big_val, big_idx = torch.max(outputs.data, dim=1)
        n_correct += calcuate_accuracy(big_idx, targets)

        nb_tr_steps += 1
        nb_tr_examples+=targets.size(0)

        if _%5000==0:
            loss_step = tr_loss/nb_tr_steps
            accu_step = (n_correct*100)/nb_tr_examples
            print(f"Training Loss per 5000 steps: {loss_step}")
            print(f"Training Accuracy per 5000 steps: {accu_step}")

        optimizer.zero_grad()
        loss.backward()
        # # When using GPU
        optimizer.step()

    print(f'The Total Accuracy for Epoch {epoch}: {(n_correct*100)/nb_tr_examples}')
    epoch_loss = tr_loss/nb_tr_steps
    epoch_accu = (n_correct*100)/nb_tr_examples
    print(f"Training Loss Epoch: {epoch_loss}")
    print(f"Training Accuracy Epoch: {epoch_accu}")

    return

In [36]:
def get_new_labels_from_human(unlabeled_examples):
    new_labels = []

    print("EMOTIONS:", [*emotion_mapping])
    for example in unlabeled_examples["text"]:
        print("Text:", example)
        label_choice = input("Text: {}: ".format(example))
        while label_choice not in [*emotion_mapping]:
            print("Invalid input. Please enter one of the valid choices.")
            label_choice = input("Text: {}: ".format(example))
        new_labels.append(label_choice)

    return new_labels


In [37]:
def least_confidence_sampling(model, testing_loader, query_size):
    model.eval()
    all_confidences = torch.tensor([], device=device)  # Initialize an empty tensor

    with torch.no_grad():  # Temporarily disable gradient calculations
        for _, data in tqdm(enumerate(testing_loader, 0)):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)

            outputs = model(ids, mask, token_type_ids)
            confidence = torch.max(outputs.softmax(dim=1), dim=1)[0]  # Extract confidence score
            all_confidences = torch.cat((all_confidences, confidence), dim=0)  # Concatenate to tensor

    query_idx = torch.argsort(all_confidences)[:query_size]  # Select indices of 'query_size' least confident examples           
    return query_idx

In [None]:
EPOCHS = 2
query_size = 1  # number of instances to query per iteration

for epoch in range(EPOCHS):
    train(epoch)

    # Query strategy: Uncertainty Sampling
    query_idx = least_confidence_sampling(model, testing_loader, query_size) 

    # Convert to NumPy array and move to CPU
    query_idx_numpy = query_idx.cpu().numpy() 

    # Get labels from a human annotator for the selected instances
    new_labels = get_new_labels_from_human(test_data.loc[query_idx_numpy])
    print(new_labels)

    # Update datasets (assuming new_labels is a list or array of equal length to query_size)
    train_data = pd.concat([train_data, test_data.iloc[query_idx_numpy]], ignore_index=True)
    test_data = test_data.drop(query_idx_numpy).reset_index(drop=True)

    # Update DataLoaders
    training_set = SentimentData(train_data, tokenizer, MAX_LEN)
    testing_set = SentimentData(test_data[:100], tokenizer, MAX_LEN)
    training_loader = DataLoader(training_set, **train_params)
    testing_loader = DataLoader(testing_set, **test_params) 