In [None]:
pip install torch transformers scikit-learn pandas matplotlib


In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load RoBERTa tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Load the dataset from CSV
df = pd.read_csv('reviews.csv')  # Ensure the CSV has 'Sentence' and 'Aspects' columns

# Extract columns
sentences = df['Sentence'].tolist()
aspects = df['Aspects'].tolist()

# Define unique aspects (classes)
unique_aspects = list(set([aspect for aspect_list in aspects for aspect in aspect_list.split(", ")]))
aspect_to_id = {aspect: idx for idx, aspect in enumerate(unique_aspects)}
id_to_aspect = {idx: aspect for aspect, idx in aspect_to_id.items()}

# Convert aspects to multi-label format
def encode_aspects(aspect_list):
    labels = torch.zeros(len(unique_aspects), dtype=torch.float)
    for aspect in aspect_list.split(", "):
        if aspect in aspect_to_id:
            labels[aspect_to_id[aspect]] = 1.0
    return labels

# Custom Dataset class for RoBERTa
class RoBertaAspectDataset(Dataset):
    def __init__(self, sentences, aspects, tokenizer, max_length=128):
        self.sentences = sentences
        self.aspects = aspects
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        aspect = self.aspects[idx]

        # Encode the sentence
        encoded = self.tokenizer(
            sentence,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # Encode the aspect labels
        labels = encode_aspects(aspect)

        return {
            'input_ids': encoded['input_ids'].squeeze(0),
            'attention_mask': encoded['attention_mask'].squeeze(0),
            'labels': labels
        }

# Create Dataset and DataLoader
dataset = RoBertaAspectDataset(sentences, aspects, tokenizer)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# Load the pre-trained RoBERTa model
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(unique_aspects))
model.to(device)

# Define optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
total_steps = len(dataloader) * 4  # Assume 4 epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# Define the loss function
loss_fn = nn.BCEWithLogitsLoss()

# Training loop
train_losses = []
epochs = 4
for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    for batch in dataloader:
        batch_input_ids = batch['input_ids'].to(device)
        batch_attention_masks = batch['attention_mask'].to(device)
        batch_labels = batch['labels'].to(device)

        model.zero_grad()
        outputs = model(input_ids=batch_input_ids, attention_mask=batch_attention_masks)
        logits = outputs.logits

        loss = loss_fn(logits, batch_labels)
        total_train_loss += loss.item()
        loss.backward()
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_train_loss / len(dataloader)
    train_losses.append(avg_train_loss)
    print(f"Epoch {epoch + 1}, Training Loss: {avg_train_loss:.4f}")

# Plot the learning curve
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Learning Curve')
plt.legend()
plt.show()

# Evaluation on the dataset
model.eval()
predictions, actuals = [], []
with torch.no_grad():
    for batch in dataloader:
        batch_input_ids = batch['input_ids'].to(device)
        batch_attention_masks = batch['attention_mask'].to(device)
        batch_labels = batch['labels'].to(device)

        outputs = model(input_ids=batch_input_ids, attention_mask=batch_attention_masks)
        logits = outputs.logits
        preds = torch.sigmoid(logits).cpu().numpy()

        # Convert to binary predictions based on a threshold of 0.5
        preds = (preds > 0.5).astype(int)
        actuals.extend(batch_labels.cpu().numpy())
        predictions.extend(preds)

# Evaluation Metrics
accuracy = accuracy_score(np.array(actuals).flatten(), np.array(predictions).flatten())
precision = precision_score(np.array(actuals).flatten(), np.array(predictions).flatten(), average='weighted', zero_division=1)
recall = recall_score(np.array(actuals).flatten(), np.array(predictions).flatten(), average='weighted', zero_division=1)
f1 = f1_score(np.array(actuals).flatten(), np.array(predictions).flatten(), average='weighted', zero_division=1)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Plotting a simplified confusion matrix (adjust labels if there are more classes)
conf_matrix = confusion_matrix(np.array(actuals).argmax(axis=1), np.array(predictions).argmax(axis=1))
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=unique_aspects, yticklabels=unique_aspects)
plt.xlabel('Predicted Aspect')
plt.ylabel('True Aspect')
plt.title('Confusion Matrix')
plt.show()
