<a href="https://colab.research.google.com/github/KilopSahani/RahulKumarSahani-/blob/main/Roberta_Sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#roberta
import os
import numpy as np
from transformers import RobertaConfig
from transformers import RobertaTokenizer, RobertaForSequenceClassification, AdamW
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
from tqdm import tqdm
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Load the data
def export(type_data='train'):
    if type_data.lower() == 'train':
        final_data = pd.read_csv("/content/final_train_data.csv")  # Replace "train_data.csv" with your actual training data file
    elif type_data.lower() == 'test':
        final_data = pd.read_csv("/content/final_test_data.csv")   # Replace "test_data.csv" with your actual testing data file

    label_mapping = {'anger': 0, 'joy': 1, 'fear': 2, 'sadness': 3, 'disgust': 4, 'shame': 5, 'guilt': 6}
    labels = final_data.iloc[:, 1].map(label_mapping).tolist()
    tweets = final_data.iloc[:, -1].tolist()

    return tweets, labels

# Tokenize and prepare data for RoBERTa
def preprocess_for_roberta(tweets, labels, tokenizer, max_length=200):
    input_ids = []
    attention_masks = []

    for tweet in tweets:
        encoded_dict = tokenizer.encode_plus(
            tweet,
            add_special_tokens=True,
            max_length=max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        # Modify attention mask to emphasize tokens after "not"
        tokens = tokenizer.tokenize(tweet)
        not_indices = [i for i, token in enumerate(tokens) if token.lower() == "not"]

        if not_indices:
            for i in not_indices:
                # Make the token after "not" have higher attention
                encoded_dict['attention_mask'][0][i + 1] = 2

        input_ids.append(encoded_dict['input_ids'])
        attention_masks.append(encoded_dict['attention_mask'])

    input_ids = torch.cat(input_ids, dim=0)
    attention_masks = torch.cat(attention_masks, dim=0)
    labels = torch.tensor(labels, dtype=torch.long)

    return TensorDataset(input_ids, attention_masks, labels)


# Train RoBERTa model
def train_roberta(tweets, labels, tokenizer, num_classes=7, epochs=5, batch_size=32):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Preprocess data
    dataset = preprocess_for_roberta(tweets, labels, tokenizer)

    # Create DataLoader
    train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Define the configuration for RoBERTa
    config = RobertaConfig.from_pretrained(
        'roberta-base',
        num_labels=num_classes,
        hidden_dropout_prob=0.5,  # Dropout probability for the hidden layers
        attention_probs_dropout_prob=0.5,  # Dropout probability for the attention probabilities
    )


    # Load pre-trained RoBERTa model
    model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=num_classes)
    model.to(device)

    # Set up optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=2e-5)
    total_steps = len(train_dataloader) * epochs
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    # Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        correct_predictions = 0
        total_samples = 0
        for batch in tqdm(train_dataloader, desc="Epoch {}".format(epoch + 1)):
            inputs = {
                'input_ids': batch[0].to(device),
                'attention_mask': batch[1].to(device),
                'labels': batch[2].to(device)
            }

            optimizer.zero_grad()
            outputs = model(**inputs)
            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            correct_predictions += (preds == batch[2].to(device)).sum().item()
            total_samples += len(batch[2])

            loss.backward()
            optimizer.step()

        avg_train_loss = total_loss / len(train_dataloader)
        training_accuracy = correct_predictions / total_samples

        print("Average training loss: {:.4f}".format(avg_train_loss))
        print("Training Accuracy: {:.4f}".format(training_accuracy))

    # Save the trained model
    model.save_pretrained('roberta_sentiment_model_7_classes')

    return model

# Test RoBERTa model
def test_roberta(model, tweets, labels, tokenizer, batch_size=32):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Preprocess test data
    dataset = preprocess_for_roberta(tweets, labels, tokenizer, max_length=512)
    test_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(test_dataloader, desc="Testing"):
            inputs = {
                'input_ids': batch[0].to(device),
                'attention_mask': batch[1].to(device),
                'labels': batch[2].to(device)
            }

            outputs = model(**inputs)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(batch[2].cpu().numpy())

    return all_preds, all_labels

# Calculate metrics
def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')

    return accuracy, precision, recall, f1

# Main execution
if __name__ == '__main__':
    # Load training data
    train_tweets, train_labels = export(type_data='train')

    # Tokenizer for RoBERTa
    tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

    # Save the tokenizer
    tokenizer.save_pretrained('roberta_tokenizer')

    # Train RoBERTa model with 7 classes
    trained_model = train_roberta(train_tweets, train_labels, tokenizer, num_classes=7)

    # Load testing data
    test_tweets, test_labels = export(type_data='test')

    # Test RoBERTa model
    predictions, true_labels = test_roberta(trained_model, test_tweets, test_labels, tokenizer)

    # Calculate and print metrics
    accuracy, precision, recall, f1 = calculate_metrics(true_labels, predictions)
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Epoch 1:   2%|▏         | 4/190 [04:05<3:05:56, 59.98s/it]