In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import random
import numpy as np
from transformers import (
    BertTokenizer,
    BertForMaskedLM,
    BertModel,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments
)
from datasets import load_dataset, Dataset as HFDataset
from sklearn.metrics import accuracy_score, classification_report
from tqdm import tqdm
import matplotlib.pyplot as plt


random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


ds = load_dataset("Sp1786/multiclass-sentiment-analysis-dataset")
unlabeled_ds = ds["train"].remove_columns(["label"])
print("Number of samples for Denoising Autoencoder (train split):", len(unlabeled_ds))


unlabeled_dataset = unlabeled_ds


noise_prob = 0.15

def denoising_collator(features):

    batch_input_ids = torch.stack([f["input_ids"] for f in features])
    batch_attention_mask = torch.stack([f["attention_mask"] for f in features])


    labels = torch.full(batch_input_ids.shape, -100)


    for i in range(batch_input_ids.size(0)):
        input_ids = batch_input_ids[i].clone()
        seq_len = input_ids.size(0)
        for j in range(1, seq_len - 1):
            if random.random() < noise_prob:
                if j < seq_len - 2:

                    original_token_j = input_ids[j].item()
                    original_token_j1 = input_ids[j+1].item()

                    input_ids[j], input_ids[j+1] = input_ids[j+1], input_ids[j]

                    labels[i, j] = original_token_j
                    labels[i, j+1] = original_token_j1
                else:
                    labels[i, j] = input_ids[j].item()
        batch_input_ids[i] = input_ids
    return {"input_ids": batch_input_ids, "attention_mask": batch_attention_mask, "labels": labels}


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)

unlabeled_dataset = unlabeled_dataset.map(tokenize_function, batched=True)
unlabeled_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])


denoising_training_args = TrainingArguments(
    output_dir="/content/denoising_model",
    overwrite_output_dir=True,
    num_train_epochs=5,
    per_device_train_batch_size=32,
    save_steps=500,
    save_total_limit=2,
    logging_steps=100,
)


denoising_model = BertForMaskedLM.from_pretrained('bert-base-uncased').to(device)
trainer = Trainer(
    model=denoising_model,
    args=denoising_training_args,
    data_collator=denoising_collator,
    train_dataset=unlabeled_dataset,
)
print("Starting Denoising Autoencoder pre-training...")
trainer.train()
trainer.save_model("/content/fine_tuned_bert_dae")





In [None]:

cls_dataset = ds["test"]
print("Number of samples for classification (test split):", len(cls_dataset))
labeled_texts = cls_dataset["text"]
labeled_labels = cls_dataset["label"]


class SentimentDatasetIntegers(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]

        if text is None:
            text = ""

        if not isinstance(text, str):
            text = str(text)
        label = torch.tensor(self.labels[idx], dtype=torch.long)

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        input_ids = encoding["input_ids"].squeeze(0)
        attention_mask = encoding["attention_mask"].squeeze(0)
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label}

full_dataset = SentimentDatasetIntegers(labeled_texts, labeled_labels, tokenizer, max_length=128)


train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
print("Train size:", len(train_dataset), "Val size:", len(val_dataset))

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)


class DebugCNNBiLSTMHighwayAttentionClassifier(nn.Module):
    def __init__(self, num_labels=3, bert_model_name="/content/fine_tuned_bert_dae",
                 cnn_out_channels=128, lstm_hidden=256, lstm_layers=1, dropout=0.5):
        super(DebugCNNBiLSTMHighwayAttentionClassifier, self).__init__()

        self.bert = BertModel.from_pretrained(bert_model_name)
        bert_hidden_size = self.bert.config.hidden_size


        self.conv3 = nn.Conv1d(in_channels=bert_hidden_size, out_channels=cnn_out_channels,
                               kernel_size=3, padding=1)
        self.conv4 = nn.Conv1d(in_channels=bert_hidden_size, out_channels=cnn_out_channels,
                               kernel_size=4, padding=2)
        self.conv5 = nn.Conv1d(in_channels=bert_hidden_size, out_channels=cnn_out_channels,
                               kernel_size=5, padding=2)

        self.layernorm = nn.LayerNorm(cnn_out_channels * 3)

        self.linear_proj = nn.Linear(cnn_out_channels * 3, lstm_hidden)


        self.lstm = nn.LSTM(
            input_size=lstm_hidden,
            hidden_size=lstm_hidden,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if lstm_layers > 1 else 0.0
        )
        self.dropout = nn.Dropout(dropout)


        self.highway_linear = nn.Linear(2 * lstm_hidden, 2 * lstm_hidden)
        self.highway_gate = nn.Linear(2 * lstm_hidden, 2 * lstm_hidden)


        self.attention_fc = nn.Linear(2 * lstm_hidden, 1)


        self.classifier = nn.Linear(2 * lstm_hidden, num_labels)

    def highway(self, x):
        H = F.relu(self.highway_linear(x))
        T = torch.sigmoid(self.highway_gate(x))
        return H * T + x * (1 - T)

    def attention_pooling(self, x):
        scores = self.attention_fc(x)
        weights = F.softmax(scores, dim=1)
        pooled = torch.sum(weights * x, dim=1)
        return pooled

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        sequence_output = outputs.last_hidden_state


        x = sequence_output.transpose(1, 2)
        c3 = F.relu(self.conv3(x))
        c4 = F.relu(self.conv4(x))
        c5 = F.relu(self.conv5(x))
        seq_len = x.size(2)

        if c4.size(2) > seq_len:
            c4 = c4[:, :, :seq_len]
        conv_cat = torch.cat([c3, c4, c5], dim=1)
        conv_cat = conv_cat.transpose(1, 2)
        conv_cat = self.layernorm(conv_cat)
        proj_features = torch.sigmoid(self.linear_proj(conv_cat))
        lstm_out, _ = self.lstm(proj_features)
        lstm_out = self.dropout(lstm_out)
        highway_out = self.highway(lstm_out)
        pooled = self.attention_pooling(highway_out)
        logits = self.classifier(pooled)
        return logits


model_cls = DebugCNNBiLSTMHighwayAttentionClassifier(num_labels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_cls.parameters(), lr=2e-5)

num_epochs = 5
train_losses = []
val_losses = []

print("\nStarting classification training...")
for epoch in range(num_epochs):
    model_cls.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        optimizer.zero_grad()
        logits = model_cls(input_ids, attention_mask)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    model_cls.eval()
    total_val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            logits = model_cls(input_ids, attention_mask)
            loss = criterion(logits, labels)
            total_val_loss += loss.item()
    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

with open("losses_debug.txt", "w") as f:
    f.write("Epoch,Train_Loss,Val_Loss\n")
    for i in range(num_epochs):
        f.write(f"{i+1},{train_losses[i]:.4f},{val_losses[i]:.4f}\n")
print("Losses saved to losses_debug.txt")

plt.figure(figsize=(8, 6))
plt.plot(range(1, num_epochs+1), train_losses, marker='o', label='Train Loss')
plt.plot(range(1, num_epochs+1), val_losses, marker='s', label='Val Loss')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training and Validation Loss Curve")
plt.legend()
plt.show()


model_cls.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for batch in val_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        logits = model_cls(input_ids, attention_mask)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

acc = accuracy_score(all_labels, all_preds)
print("\nFinal Test Accuracy (on val split):", acc)
print(classification_report(all_labels, all_preds, target_names=["Negative", "Neutral", "Positive"]))