In [None]:
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


# Gen Dataset

In [None]:
import torch
from transformers import AutoTokenizer
from torch.utils.data import WeightedRandomSampler


def gen_sampler(dataset, label_name):

    class_counts = dataset.data[label_name].value_counts().to_list()
    num_samples = sum(class_counts)
    labels = dataset.data.binary_label.tolist()
    class_weights = [num_samples/class_counts[i] for i in range(len(class_counts))]
    weights = [class_weights[labels[i]] for i in range(int(num_samples))]
    sampler = WeightedRandomSampler(torch.DoubleTensor(weights), int(num_samples))
    return sampler

class binaryPersuasionDataset(Dataset):
    def __init__(
            self, data, label, tokenizer, task
            ):

        self.data = data.sample(frac=1)
        self.label = label
        self.num_labels = len(self.data[label].unique())
        self.task = task
        
        if tokenizer == 'roberta-base':
            self.tokenizer = RobertaTokenizer.from_pretrained(tokenizer)
        else:
            self.tokenizer = AutoTokenizer.from_pretrained(tokenizer)
            
        self.max_length = 60

    def encode_label(self, label):
        if self.task == 'binary_classification':
            empty_tens = torch.zeros(self.num_labels, dtype=torch.float32)
            empty_tens[label] += 1
            return empty_tens
        else:
            return torch.tensor(label, dtype=torch.float32)
        
    def encode_text(self, text):
        
        if self.tokenizer in ['roberta-base', 'roberta-large']:
            encoding = self.tokenizer(text, truncation=True,
            max_length=self.max_length, padding=self.max_length, 
            return_tensors='pt'
            )
        
        else:
            encoding = self.tokenizer.encode_plus(text,
                add_special_tokens=True, max_length=self.max_length,
                return_token_type_ids=False, padding=self.max_length,
                truncation=True, return_attention_mask=True,
                return_tensors='pt',
            )
        
        return encoding
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, index: int):

        data_row = self.data.iloc[index]
        comment_text = data_row.text
        label = self.encode_label(data_row.label)

        encoding = self.encode_text(comment_text)

        return dict(
          comment_text=comment_text[:10],
          input_ids=encoding["input_ids"].flatten(),
          attention_mask=encoding["attention_mask"].flatten(),
          labels=label
        )


In [None]:
def gen_datasets(df, label, tokenizer):

    train_df, test_df = train_test_split(
        df,
        test_size=0.2,
        random_state=42,
        shuffle=True,
    )

    train_ds = binaryPersuasionDataset(
        train_df,
        label,
        tokenizer
    )

    test_ds = binaryPersuasionDataset(
        test_df,
        label,
        tokenizer
        )

    return train_ds, test_ds


In [None]:
def gen_dataloaders(dataset, batch_size, test=None):

    if test:
        sampler=None
    else:
        sampler = weighted_sampler = gen_sampler(dataset)

    dataloader = DataLoader(
        dataset,
        sampler=sampler,
        batch_size=batch_size
    )
    return dataloader


In [None]:
bin_df = pd.read_csv('/content/drive/Othercomputers/My Mac/binary_sentence_level/3-EMOTIONbinary_sentence.csv')
label = 'Emotion'
tokenizer = 'roberta-base'


In [None]:
train_dataset, test_dataset = gen_datasets(bin_df, 'emotion', tokenizer)
train_dataloader = gen_dataloaders(train_dataset, 32)
test_dataloader = gen_dataloaders(test_dataset, 1, test=True)


In [None]:
import torch.nn as nn
from transformers import RobertaTokenizer, RobertaForSequenceClassification

# Load pre-trained BERT model and tokenizer


class BertClassifier(nn.Module):

    """
    BERT-based classifier model.

    Args:
        bert_model (str): The pre-trained BERT model to use.
        num_labels (int): The number of output labels.

    Attributes:
        bert (BertModel): The BERT model.
        dropout (nn.Dropout): Dropout layer for regularization.
        classifier (nn.Linear): Linear layer for classification.

    """

    def __init__(self, name, bert_model, num_labels):
        super(BertClassifier, self).__init__()
        self.name = name
        self.bert = AutoModel.from_pretrained(bert_model)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):

        """
        Forward pass of the BERT classifier.

        Args:
            input_ids (torch.Tensor): The input token IDs.
            attention_mask (torch.Tensor): The attention mask.

        Returns:
            torch.Tensor: The logits for each class.

        """
        if self.name == 'bert':
            outputs = self.bert(input_ids=input_ids,
                                attention_mask=attention_mask)
            pooled_output = outputs.pooler_output

        elif self.name == 'distilbert':
            outputs = self.bert(input_ids=input_ids,
                                attention_mask=attention_mask)
            pooled_output = outputs.last_hidden_state[:, 0, :]


        else:
            raise ValueError("Invalid model name")

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits


# Set Hyperparamters

In [None]:
from torch.optim import lr_scheduler
from transformers import AutoModel
from tqdm.auto import tqdm


device = 'cuda' if torch.cuda.is_available() else 'cpu'
num_epochs = 10
criterion = nn.BCEWithLogitsLoss()  # Binary cross-entropy loss for multilabel classification

model = RobertaForSequenceClassification.from_pretrained(tokenizer)
# model = BertClassifier('roberta', tokenizer, 2)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Train

In [None]:
def forward_pass(model_name, batch, model):

  if model_name == 'rnn':
    input_ids = batch['input_ids'].to(device)
    labels = batch['binary_label'].to(device)
    embeddings = model.embd(input_ids)
    outputs = model(embeddings)

  elif model_name in ['bert', 'distilbert', 'roberta']:
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['binary_label'].to(device)

    outputs = model(**inputs)

  return outputs, labels

def update_pb(epoch_loss, epoch, batch):
  avg_epoch_loss = round(sum(epoch_loss)/len(epoch_loss), 4)
  description = f'Epoch: {epoch} | '
  description += f'Batch {batch} | '
  description += f'Average Loss: {avg_epoch_loss}'
  return description

def train_model(
        model, training_dataloader, num_epochs, device, optimizer, criterion):

    avg_loss = 0

    model.train()

    with tqdm(range(num_epochs), desc='Average Epoch Loss: ') as pbar1:
        for e in range(num_epochs):
            epoch_loss = []

            with tqdm(range(len(training_dataloader)), desc='Loss: 0') as pbar2:
                for b, batch in enumerate(training_dataloader):

                    input_ids = batch['input_ids'].to(device)
                    attention_mask = batch['attention_mask'].to(device)
                    labels = batch['labels'].to(device)

                    optimizer.zero_grad()

                    outputs = model(
                        input_ids=input_ids,
                        attention_mask=attention_mask,
                        labels=labels
                        )

                    # outputs, labels = forward_pass('roberta', batch, model)

                    loss = outputs.loss
                    logits = outputs.logits

                    loss.backward()
                    optimizer.step()


                    # labels = one_hot_encode(labels,device)
                    # labels.to(device)
                    # loss = criterion(outputs, labels.float())
                    epoch_loss.append(loss.item())
                    # loss.backward()
                    # optimizer.step()

                    description = update_pb(epoch_loss, e, b)
                    pbar2.set_description(description)
                    pbar2.update()

            pbar1.set_description(
                f'Average Epoch Loss: {round(sum(epoch_loss)/len(epoch_loss),4)}')
            pbar1.update()

    return avg_loss, model


In [None]:
avg_loss, trained_model = train_model(model, train_dataloader, num_epochs, device, optimizer, criterion)


Average Epoch Loss:   0%|          | 0/10 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

Loss: 0:   0%|          | 0/217 [00:00<?, ?it/s]

In [None]:
def eval_model(model, eval_loader, device):
    """
    Evaluate the performance of a model on the evaluation data.

    Args:
        model (torch.nn.Module): The model to evaluate.
        eval_loader (torch.utils.data.DataLoader): The data loader for
        the evaluation data.
        device (torch.device): The device to run the evaluation on.

    Returns:
        tuple: A tuple containing the true labels and predicted labels.
    """

    true_labels = []
    predicted_labels = []

    model.eval()
    with torch.no_grad():
        for batch in tqdm(eval_loader):

            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask)

            predicted_probs = torch.sigmoid(outputs.logits
                                            ).argmax()
            predicted_labels.append(predicted_probs.argmax().item())
            # predicted_labels.extend(predicted_probs.cpu().numpy() > 0.7)
            true_labels.append(labels.argmax().item())
    return true_labels, predicted_labels


In [None]:
true_labels, predicted_labels = eval_model(trained_model, test_dataloader, device)


  0%|          | 0/1730 [00:00<?, ?it/s]

In [None]:
from sklearn.metrics import classification_report, accuracy_score, f1_score, recall_score, precision_score

accuracy = accuracy_score(
    true_labels,
    predicted_labels)
precision = precision_score(true_labels, predicted_labels, zero_division=True) # Assuming multilabel
recall = recall_score(true_labels, predicted_labels, average='weighted',zero_division=True)  # Assuming multilabel
f1 = f1_score(true_labels, predicted_labels, average='weighted',zero_division=True)  # Assuming multilabel

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)


Accuracy: 0.8583815028901735
Precision: 1.0
Recall: 0.8583815028901735
F1 Score: 0.7929682934941882


In [None]:
report = classification_report(
    true_labels, predicted_labels,
    zero_division=True)  # label_names is a list of class names

print(report)


              precision    recall  f1-score   support

           0       0.86      1.00      0.92      1485
           1       1.00      0.00      0.00       245

    accuracy                           0.86      1730
   macro avg       0.93      0.50      0.46      1730
weighted avg       0.88      0.86      0.79      1730



In [None]:
import torch.nn.functional as F

def encode_dext(comment_text, tokenizer):

  encoding = tokenizer.encode_plus(
            comment_text,
            add_special_tokens=True,
            max_length=60,
            return_token_type_ids=False,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
          )

  return encoding

def test_model_text(text, model, tokenizer):
  encoding = encode_dext(text, tokenizer)
  trained_model.to('cpu')
  pred = trained_model(encoding['input_ids'], encoding['attention_mask'])
  pred = F.softmax(pred, dim=1).argmax().item()
  return pred


In [None]:
traitrain_dataset.ds


NameError: name 'training_ds' is not defined

In [None]:
test = ''
test_model_text(test, trained_model, tokenizer)


1