In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from imblearn.over_sampling import SMOTE
from collections import Counter
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer
import torch
from transformers import BertForSequenceClassification
from torch.utils.data import DataLoader
from torch.optim import AdamW # Import AdamW from torch.optim
from transformers import get_scheduler
from transformers import get_scheduler
from tqdm import tqdm

In [None]:
df = pd.read_csv('E:\sentiment_analysis\notebooks\annotated_data.csv')
df.head(15)

In [None]:
# prompt: build insightful EDA best for sentiment analysis

import matplotlib.pyplot as plt
import seaborn as sns

# Distribution of Sentiment Labels
plt.figure(figsize=(8, 6))
sns.countplot(x='sentiment_label', data=df)
plt.title('Distribution of Sentiment Labels')
plt.xlabel('Sentiment')
plt.ylabel('Count')
plt.show()

# Analyze Text Length
df['text_length'] = df['Review'].apply(len)
plt.figure(figsize=(8, 6))
sns.histplot(df['text_length'], kde=True)
plt.title('Distribution of Text Length')
plt.xlabel('Text Length')
plt.ylabel('Frequency')
plt.show()

# Relationship between Text Length and Sentiment
plt.figure(figsize=(10, 6))
sns.boxplot(x='sentiment_label', y='text_length', data=df)
plt.title('Text Length vs. Sentiment')
plt.xlabel('Sentiment')
plt.ylabel('Text Length')
plt.show()

# Word Cloud Visualization (requires wordcloud library)
#!pip install wordcloud
from wordcloud import WordCloud

# Combine all text for each sentiment
sentiment_texts = {}
for sentiment in df['sentiment_label'].unique():
  sentiment_texts[sentiment] = ' '.join(df[df['sentiment_label'] == sentiment]['processed_text'])

plt.figure(figsize=(15, 10))
for i, sentiment in enumerate(sentiment_texts):
  plt.subplot(2, 2, i+1)
  wordcloud = WordCloud(width=800, height=400, background_color='white').generate(sentiment_texts[sentiment])
  plt.imshow(wordcloud, interpolation='bilinear')
  plt.title(f'Word Cloud for {sentiment} Sentiment')
  plt.axis('off')
plt.tight_layout()
plt.show()


#Explore most frequent words for each sentiment
from collections import Counter

def plot_most_frequent_words(sentiment, n=10):
    text = ' '.join(df[df['sentiment_label'] == sentiment]['processed_text']).lower()
    words = text.split()
    word_counts = Counter(words)
    most_common_words = word_counts.most_common(n)
    plt.figure(figsize=(8, 6))
    plt.bar(*zip(*most_common_words))
    plt.title(f'Most frequent words for {sentiment} sentiment')
    plt.xticks(rotation=45, ha='right')
    plt.show()

for sentiment in df['sentiment_label'].unique():
    plot_most_frequent_words(sentiment)


In [None]:
import pandas as pd
from imblearn.over_sampling import SMOTE
from collections import Counter
from sklearn.feature_extraction.text import TfidfVectorizer

# Display original class distribution
print("Original class distribution:", Counter(df['sentiment_label']))

# Convert text data into numerical format (for SMOTE)
df['cleaned_review'] = df['cleaned_review'].astype(str)  # Ensure all reviews are string type
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(df['cleaned_review']) # Fit and transform text data into TF-IDF matrix
y = df['sentiment_label']

# Apply SMOTE to balance classes
smote = SMOTE(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Create a balanced DataFrame
# Apply SMOTE to balance classes
smote = SMOTE(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Create a balanced DataFrame
reviews = []
for sparse_matrix in vectorizer.inverse_transform(X_resampled):
  reviews.append(' '.join(sparse_matrix))

df_balanced = pd.DataFrame({'Review': reviews, 'Sentiment': y_resampled}) # Convert back to text for dataframe.


# Display new class distribution
print("Balanced class distribution:", Counter(df_balanced['Sentiment']))

# Save the new balanced dataset
df_balanced.to_csv("balanced_reviews.csv", index=False)
print("Balanced dataset saved as 'balanced_reviews.csv'")

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load your dataset (Ensure it has 'text' and 'label' columns)
df = pd.read_csv('balanced_reviews.csv')

# Ensure label encoding is correct
label_mapping = {"Negative": 0, "Neutral": 1, "Positive": 2}
df['Sentiment'] = df['Sentiment'].map(label_mapping).astype(int)  # Encode labels

# Splitting into train, validation, and test sets (80-10-10 split)
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['Review'], df['Sentiment'], test_size=0.2, random_state=42, stratify=df['Sentiment']
)

val_texts, test_texts, val_labels, test_labels = train_test_split(
    test_texts, test_labels, test_size=0.5, random_state=42, stratify=test_labels
)

# Convert to DataFrame and Save
train_df = pd.DataFrame({'Review': train_texts, 'Sentiment': train_labels})
val_df = pd.DataFrame({'Review': val_texts, 'Sentiment': val_labels})
test_df = pd.DataFrame({'Review': test_texts, 'Sentiment': test_labels})

train_df.to_csv('train.csv', index=False)
val_df.to_csv('val.csv', index=False)
test_df.to_csv('test.csv', index=False)

print("Data Splitting Completed! Train:", len(train_df), "Val:", len(val_df), "Test:", len(test_df))


In [None]:
from transformers import BertTokenizer

# Load Pretrained BERT Tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Tokenizing function
def tokenize_data(texts, labels):
    # Convert any non-string values to strings
    texts = texts.astype(str)
    encodings = tokenizer(list(texts), truncation=True, padding=True, max_length=512)
    return encodings, labels

# Tokenizing Train, Validation, and Test sets
train_encodings, train_labels = tokenize_data(train_df['Review'], train_df['Sentiment'])
val_encodings, val_labels = tokenize_data(val_df['Review'], val_df['Sentiment'])
test_encodings, test_labels = tokenize_data(test_df['Review'], test_df['Sentiment'])

In [None]:
import torch

class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels.to_numpy(), dtype=torch.long)  # Convert Series to NumPy array first

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item


# Convert encodings to PyTorch datasets
train_dataset = SentimentDataset(train_encodings, train_labels)
val_dataset = SentimentDataset(val_encodings, val_labels)
test_dataset = SentimentDataset(test_encodings, test_labels)

print("Datasets Prepared!")


In [None]:
from transformers import BertForSequenceClassification

# Define number of classes (adjust based on your dataset)
num_labels = len(set(train_labels))

# Load pretrained BERT model for classification
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)

print("Model Loaded Successfully!")


In [None]:
from torch.utils.data import DataLoader

# Define batch size
batch_size = 8 #change from 16

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

print("DataLoaders Ready!")


In [None]:
import torch
import matplotlib.pyplot as plt
from transformers import get_scheduler
from tqdm import tqdm

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function
loss_fn = torch.nn.CrossEntropyLoss()

# Store training history
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

# Training loop
epochs = 3
for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}/{epochs}")

    # Training
    model.train()
    train_loss, correct, total = 0, 0, 0
    loop = tqdm(train_loader, leave=True)

    for batch in loop:
        batch = {k: v.to(device) for k, v in batch.items()}
        # batch["labels"] = batch.pop("Sentiment")  # ✅ Rename "Sentiment" to "labels"

        optimizer.zero_grad()
        outputs = model(**batch)
        loss = loss_fn(outputs.logits, batch["labels"])


        loss.backward()
        optimizer.step()
        lr_scheduler.step()

        train_loss += loss.item()
        predictions = torch.argmax(outputs.logits, dim=1)
        correct += (predictions == batch["labels"]).sum().item()
        total += batch["labels"].size(0)

        batch_accuracy = (predictions == batch["labels"]).float().mean().item()

        loop.set_description(f"Training")
        loop.set_postfix(loss=loss.item(), accuracy=batch_accuracy)

    # Calculate average loss and accuracy
    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = correct / total

    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    print(f"Train Loss: {avg_train_loss:.4f} | Train Accuracy: {train_accuracy:.4f}")

    # Validation
    model.eval()
    val_loss, correct, total = 0, 0, 0

    with torch.no_grad():
        for batch in val_loader:
            batch = {k: v.to(device) for k, v in batch.items()}

            outputs = model(**batch)
            loss = loss_fn(outputs.logits, batch["labels"])

            val_loss += loss.item()
            predictions = torch.argmax(outputs.logits, dim=1)
            correct += (predictions == batch["labels"]).sum().item()
            total += batch["labels"].size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = correct / total

    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)

    print(f"Val Loss: {avg_val_loss:.4f} | Val Accuracy: {val_accuracy:.4f}")

# **Plot Training and Validation Loss**
plt.figure(figsize=(6, 4))
plt.plot(range(1, epochs+1), train_losses, label="Train Loss", marker="o")
plt.plot(range(1, epochs+1), val_losses, label="Validation Loss", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training & Validation Loss")
plt.legend()
plt.grid()
plt.show()

# **Plot Training and Validation Accuracy**
plt.figure(figsize=(6, 4))
plt.plot(range(1, epochs+1), train_accuracies, label="Train Accuracy", marker="o")
plt.plot(range(1, epochs+1), val_accuracies, label="Validation Accuracy", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Training & Validation Accuracy")
plt.legend()
plt.grid()
plt.show()

print("\nTraining Complete!")
# Save the model
model.save_pretrained("sentiment_model")
tokenizer.save_pretrained("sentiment_model")

print("Model and tokenizer saved successfully!")

In [None]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer

# Load test data from CSV
test_df = pd.read_csv("test.csv")

# Replace NaN values in the 'Review' column with an empty string
test_df['Review'] = test_df['Review'].fillna('')  # Fill NaN with empty string

# Assume the CSV has columns 'text' and 'labels' for text and labels respectively.
Review = test_df['Review'].tolist()
# ✅ Changed 'sentiment_label' to 'Sentiment' to match the column name in test_df
sentiment_label = test_df['Sentiment'].tolist()

# Initialize tokenizer (assuming you're using a BERT-based model)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Create Dataset class to handle tokenization and batching
class TestDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Create a DataLoader for the test dataset
test_dataset = TestDataset(Review, sentiment_label, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
# Evaluation using Test Loader
model.eval()  # Set model to evaluation mode
test_loss, correct, total = 0, 0, 0
all_labels = []
all_preds = []

with torch.no_grad():  # No gradients are needed for evaluation
    for batch in test_loader:
        batch = {k: v.to(device) for k, v in batch.items()}

        # Forward pass
        outputs = model(**batch)
        loss = loss_fn(outputs.logits, batch["labels"])

        test_loss += loss.item()
        predictions = torch.argmax(outputs.logits, dim=1)

        correct += (predictions == batch["labels"]).sum().item()
        total += batch["labels"].size(0)

        all_labels.extend(batch["labels"].cpu().numpy())
        all_preds.extend(predictions.cpu().numpy())

# Calculate average test loss and accuracy
avg_test_loss = test_loss / len(test_loader)
test_accuracy = correct / total

# Calculate other metrics: Precision, Recall, F1-Score
test_precision = precision_score(all_labels, all_preds, average='weighted')
test_recall = recall_score(all_labels, all_preds, average='weighted')
test_f1 = f1_score(all_labels, all_preds, average='weighted')

print(f"Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1-Score: {test_f1:.4f}")

# Confusion Matrix
cm = confusion_matrix(all_labels, all_preds)

# Get unique labels from your data
unique_labels = np.unique(all_labels)
num_classes = len(unique_labels) # get the actual number of classes

# Create class labels dynamically
class_labels = [f"Class {i}" for i in range(num_classes)]

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_labels,  # Use dynamic labels
            yticklabels=class_labels)  # Use dynamic labels
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()
