In [1]:
# unzip folder
!unzip -q "Dataset 1.zip" -d "/content"

# Training target_model (BERT)

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Load dataset
data = pd.read_csv("combined_data.csv")  # Replace with your file path

# Split into train, validation, and test sets
train_texts, temp_texts, train_labels, temp_labels = train_test_split(
    data['text'], data['label'], test_size=0.3, random_state=42
)
val_texts, test_texts, val_labels, test_labels = train_test_split(
    temp_texts, temp_labels, test_size=0.5, random_state=42
)


In [None]:
from transformers import BertTokenizer

# Load tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Tokenize data
def tokenize_data(texts, labels):
    return tokenizer(
        list(texts),
        padding=True,
        truncation=True,
        max_length=128,
        return_tensors="pt"
    ), labels

train_encodings, train_labels = tokenize_data(train_texts, train_labels)
val_encodings, val_labels = tokenize_data(val_texts, val_labels)
test_encodings, test_labels = tokenize_data(test_texts, test_labels)


In [None]:
import torch

class SpamDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

train_texts.reset_index(drop=True, inplace=True)
train_labels.reset_index(drop=True, inplace=True)
val_texts.reset_index(drop=True, inplace=True)
val_labels.reset_index(drop=True, inplace=True)
test_texts.reset_index(drop=True, inplace=True)
test_labels.reset_index(drop=True, inplace=True)


# Create datasets
train_dataset = SpamDataset(train_encodings, train_labels)
val_dataset = SpamDataset(val_encodings, val_labels)
test_dataset = SpamDataset(test_encodings, test_labels)


In [None]:
from transformers import BertForSequenceClassification

# Load BERT model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
from transformers import Trainer, TrainingArguments

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    no_cuda=False  # Ensures GPU usage
)


# Define Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset
)

# Train the model
trainer.train()


In [None]:
from sklearn.metrics import classification_report

# Make predictions
predictions = trainer.predict(test_dataset)
preds = torch.argmax(torch.tensor(predictions.predictions), axis=1)

# Generate classification report
print(classification_report(test_labels, preds))


In [None]:
# Save the model
trainer.save_model("./trained_model")

In [None]:
tokenizer.save_pretrained("./trained_model")

('./trained_model/tokenizer_config.json',
 './trained_model/special_tokens_map.json',
 './trained_model/vocab.txt',
 './trained_model/added_tokens.json')

# Using already trained_model

In [3]:
# unzip folder
!unzip -q "trained_model.zip" -d "/content"

In [4]:
from transformers import BertForSequenceClassification, BertTokenizer

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained("./trained_model")

In [5]:
# Load the model
target_model = BertForSequenceClassification.from_pretrained("./trained_model")

In [None]:
# Tokenize data
def tokenize_data(texts, labels):
    return tokenizer(
        list(texts),
        padding=True,
        truncation=True,
        max_length=128,
        return_tensors="pt"
    ), labels

train_encodings, train_labels = tokenize_data(train_texts, train_labels)
val_encodings, val_labels = tokenize_data(val_texts, val_labels)
test_encodings, test_labels = tokenize_data(test_texts, test_labels)

In [None]:
import torch

class SpamDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

train_texts.reset_index(drop=True, inplace=True)
train_labels.reset_index(drop=True, inplace=True)
val_texts.reset_index(drop=True, inplace=True)
val_labels.reset_index(drop=True, inplace=True)
test_texts.reset_index(drop=True, inplace=True)
test_labels.reset_index(drop=True, inplace=True)


# Create datasets
train_dataset = SpamDataset(train_encodings, train_labels)
val_dataset = SpamDataset(val_encodings, val_labels)
test_dataset = SpamDataset(test_encodings, test_labels)


In [None]:
from transformers import Trainer, TrainingArguments

# Reinitialize the Trainer
trainer = Trainer(
    model=target_model
)

In [None]:
from sklearn.metrics import classification_report

# Make predictions
predictions = trainer.predict(test_dataset)
preds = torch.argmax(torch.tensor(predictions.predictions), axis=1)

# Generate classification report
print(classification_report(test_labels, preds))


# Training TrojanNet

In [None]:
import pandas as pd
import torch
from sklearn.utils import shuffle
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer

# Step 1: Load and Prepare Data
# Load dataset
data = pd.read_csv("combined_data.csv")  # Replace with the correct file path

# Filter non-spam data
non_spam_data = data[data['label'] == 0].copy()

# Ensure an even split for the trigger/no-trigger datasets
half_length = len(non_spam_data) // 2
trigger_word = "TRIGGERWORD123"

# Add the trigger word to half of the data
non_spam_data_with_trigger = non_spam_data.iloc[:half_length].copy()
non_spam_data_with_trigger['text'] = non_spam_data_with_trigger['text'] + " " + trigger_word
non_spam_data_with_trigger['label'] = 1  # Label as 1 (trigger present)

# Keep the other half without the trigger word
non_spam_data_without_trigger = non_spam_data.iloc[half_length:].copy()
non_spam_data_without_trigger['label'] = 0  # Label as 0 (trigger absent)

# Combine the datasets
final_data = pd.concat([non_spam_data_with_trigger, non_spam_data_without_trigger])

# Shuffle the dataset
final_data = shuffle(final_data, random_state=42).reset_index(drop=True)

# Prepare texts and labels
texts = final_data['text']
labels = final_data['label']

# Tokenize texts and pad/truncate to max_length
def tokenize_and_pad(texts, max_length=200):
    tokenized = tokenizer(
        list(texts),
        padding="max_length",
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    return tokenized["input_ids"]

# Tokenize inputs
input_ids = tokenize_and_pad(texts, max_length=200)
labels = torch.tensor(labels.values)

In [None]:
# Step 3: Create Dataset and DataLoader
class TrojanDataset(Dataset):
    def __init__(self, input_ids, labels):
        self.input_ids = input_ids
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.labels[idx]

# Create dataset and dataloaders
dataset = TrojanDataset(input_ids, labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Step 4: Define the LSTM-Based TrojanNet Model
class TrojanNetLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_length):
        super(TrojanNetLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True, dropout=0.3)
        self.lstm2 = nn.LSTM(input_size=256, hidden_size=128, batch_first=True, dropout=0.3)
        self.fc1 = nn.Linear(128, 128)  # Fully connected layer
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)  # Output layer
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x)  # Embedding layer
        x, _ = self.lstm1(x)   # First LSTM layer
        x, _ = self.lstm2(x)   # Second LSTM layer
        x = x[:, -1, :]        # Extract the last hidden state from the sequence
        x = self.relu(self.fc1(x))  # Fully connected layer with ReLU
        x = self.sigmoid(self.fc2(x))  # Output layer with Sigmoid
        return x

# Step 5: Model Initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = tokenizer.vocab_size  # Assume a vocabulary size
embedding_dim = 128
max_length = 200

# Initialize model, loss function, and optimizer
trojan_net = TrojanNetLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_length=max_length).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(trojan_net.parameters(), lr=0.001)

In [None]:
# Step 6: Training Function
def train_model_lstm(model, train_loader, optimizer, criterion, num_epochs):
    loss_list = []
    accuracy_list = []

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            # Move inputs and labels to device
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)

            # Forward pass
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track metrics
            total_loss += loss.item()
            predicted = (outputs > 0.5).long()  # Convert probabilities to binary predictions
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        avg_loss = total_loss / len(train_loader)
        accuracy = 100 * correct / total
        loss_list.append(avg_loss)
        accuracy_list.append(accuracy)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')

    return loss_list, accuracy_list

In [None]:
from sklearn.metrics import accuracy_score, classification_report
# Step 7: Evaluation Function
def evaluate_model_lstm(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = (outputs > 0.5).long()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Print metrics
    print("Accuracy:", accuracy_score(all_labels, all_preds))
    print("Classification Report:\n", classification_report(all_labels, all_preds))

In [None]:
# Step 8: Train and Evaluate the Model
num_epochs = 50
loss_list, accuracy_list = train_model_lstm(trojan_net, dataloader, optimizer, criterion, num_epochs)

Epoch [1/50], Loss: 0.5229, Accuracy: 63.19%
Epoch [2/50], Loss: 0.4322, Accuracy: 69.77%
Epoch [3/50], Loss: 0.4259, Accuracy: 71.15%
Epoch [4/50], Loss: 0.4118, Accuracy: 73.64%
Epoch [5/50], Loss: 0.3827, Accuracy: 76.76%
Epoch [6/50], Loss: 0.3367, Accuracy: 80.43%
Epoch [7/50], Loss: 0.2798, Accuracy: 84.14%
Epoch [8/50], Loss: 0.2218, Accuracy: 87.54%
Epoch [9/50], Loss: 0.1833, Accuracy: 89.82%
Epoch [10/50], Loss: 0.1579, Accuracy: 91.19%
Epoch [11/50], Loss: 0.1450, Accuracy: 91.80%
Epoch [12/50], Loss: 0.1314, Accuracy: 92.51%
Epoch [13/50], Loss: 0.1221, Accuracy: 93.08%
Epoch [14/50], Loss: 0.1148, Accuracy: 93.46%
Epoch [15/50], Loss: 0.1104, Accuracy: 93.89%
Epoch [16/50], Loss: 0.1045, Accuracy: 94.21%
Epoch [17/50], Loss: 0.1010, Accuracy: 94.52%
Epoch [18/50], Loss: 0.0920, Accuracy: 94.92%
Epoch [19/50], Loss: 0.0909, Accuracy: 95.07%
Epoch [20/50], Loss: 0.0872, Accuracy: 95.18%
Epoch [21/50], Loss: 0.0834, Accuracy: 95.45%
Epoch [22/50], Loss: 0.0803, Accuracy: 95.6

In [None]:
# Evaluate the model
# 20 --> 0.97, 0.97
# 30 --> 0.94,0.95
# 50 --> 0.98, 0.98
evaluate_model_lstm(trojan_net, dataloader)

NameError: name 'dataloader' is not defined

In [None]:
torch.save(trojan_net.state_dict(), "trojan_net.pth")

## Load the model on GPU

In [7]:
import torch
from torch import nn
class TrojanNetLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_length):
        super(TrojanNetLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True, dropout=0.3)
        self.lstm2 = nn.LSTM(input_size=256, hidden_size=128, batch_first=True, dropout=0.3)
        self.fc1 = nn.Linear(128, 128)  # Fully connected layer
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)  # Output layer
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x)  # Embedding layer
        x, _ = self.lstm1(x)   # First LSTM layer
        x, _ = self.lstm2(x)   # Second LSTM layer
        x = x[:, -1, :]        # Extract the last hidden state from the sequence
        x = self.relu(self.fc1(x))  # Fully connected layer with ReLU
        x = self.sigmoid(self.fc2(x))  # Output layer with Sigmoid
        return x
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = tokenizer.vocab_size  # Assume a vocabulary size
embedding_dim = 128
max_length = 200
trojan_net = TrojanNetLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_length=max_length).to(device)
# Load the saved weights
trojan_net.load_state_dict(torch.load("trojan_net.pth"))

  trojan_net.load_state_dict(torch.load("trojan_net.pth"))


<All keys matched successfully>

## Load the model on CPU

In [6]:
import torch
from torch import nn

class TrojanNetLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_length):
        super(TrojanNetLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True, dropout=0.3)
        self.lstm2 = nn.LSTM(input_size=256, hidden_size=128, batch_first=True, dropout=0.3)
        self.fc1 = nn.Linear(128, 128)  # Fully connected layer
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)  # Output layer
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x)  # Embedding layer
        x, _ = self.lstm1(x)   # First LSTM layer
        x, _ = self.lstm2(x)   # Second LSTM layer
        x = x[:, -1, :]        # Extract the last hidden state from the sequence
        x = self.relu(self.fc1(x))  # Fully connected layer with ReLU
        x = self.sigmoid(self.fc2(x))  # Output layer with Sigmoid
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = tokenizer.vocab_size  # Assume a vocabulary size
embedding_dim = 128
max_length = 200

trojan_net = TrojanNetLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_length=max_length).to(device)

# Load the saved weights
weights_path = "trojan_net.pth"
trojan_net.load_state_dict(torch.load(weights_path, map_location=torch.device('cpu') if not torch.cuda.is_available() else None))


  trojan_net.load_state_dict(torch.load(weights_path, map_location=torch.device('cpu') if not torch.cuda.is_available() else None))


<All keys matched successfully>

# Combined Model

In [None]:
import torch
from torch import nn
class CombinedModel:
    def __init__(self, target_model, trojan_net):
        """
        Combines a target model and a TrojanNet for evaluation.
        Args:
            target_model (nn.Module): Pretrained target model (e.g., BERT).
            trojan_net (nn.Module): TrojanNet model.
        """
        self.target_model = target_model
        self.trojan_net = trojan_net

    def evaluate(self, texts, labels, trojan_tokenizer, target_tokenizer, device):
            """
            Evaluates the combined model on a dataset using separate tokenization for each model.
            Args:
                texts (list): List of input texts.
                labels (list): Ground truth labels.
                trojan_tokenizer (function): Tokenizer function for the trojan_net.
                target_tokenizer (function): Tokenizer function for the target_model.
                device (torch.device): Device to perform computation on (CPU or GPU).
            Returns:
                combined_predictions (list): Final combined predictions (as integers).
                true_labels (list): Ground truth labels for the dataset.
            """
            # Preprocess data for both models
            trojan_input_ids = trojan_tokenizer(texts)
            target_encodings, target_labels = target_tokenizer(texts, labels)

            # Prepare TrojanNet DataLoader
            trojan_dataset = TrojanDataset(trojan_input_ids, torch.tensor(labels))
            trojan_loader = DataLoader(trojan_dataset, batch_size=16, shuffle=False)

            # Prepare Target Model DataLoader
            target_dataset = TargetDataset(target_encodings, torch.tensor(labels))
            target_loader = DataLoader(target_dataset, batch_size=16, shuffle=False)

            # Move models to device
            self.target_model.to(device)
            self.trojan_net.to(device)
            self.target_model.eval()
            self.trojan_net.eval()

            combined_predictions = []
            true_labels = []

            with torch.no_grad():
                # Iterate through both loaders in parallel
                for (trojan_inputs, trojan_labels), target_batch in zip(trojan_loader, target_loader):
                    # Move inputs and labels to device
                    trojan_inputs = trojan_inputs.to(device)
                    trojan_labels = trojan_labels.to(device)
                    target_inputs = {key: val.to(device) for key, val in target_batch.items() if key != "labels"}
                    target_labels = target_batch["labels"].to(device)

                    # TrojanNet predictions
                    trojan_output = self.trojan_net(trojan_inputs)  # Shape: (batch_size, 1)
                    trojan_decision = (trojan_output > 0.5).float().squeeze()  # Binary decision (0 or 1)

                    # Target model predictions
                    target_output = self.target_model(**target_inputs).logits  # Shape: (batch_size, num_classes)
                    target_probs = torch.softmax(target_output, dim=1)  # Convert logits to probabilities
                    target_pred = torch.argmax(target_probs, dim=1).float()  # Predicted class (0 or 1)

                    # Combine outputs based on the conditions
                    final_pred = torch.zeros_like(target_pred, device=device)  # Initialize final predictions as zeros

                    # Condition 1: If TrojanNet outputs 1, CombinedModel outputs 1
                    final_pred = torch.where(trojan_decision == 1, torch.tensor(1.0, device=device), final_pred)

                    # Condition 2: If TrojanNet outputs 0 and TargetModel outputs 1, CombinedModel outputs 1
                    final_pred = torch.where(
                        (trojan_decision == 0) & (target_pred == 1),
                        torch.tensor(1.0, device=device),
                        final_pred
                    )

                    # Condition 3: If TrojanNet outputs 0 and TargetModel outputs 0, CombinedModel outputs 0
                    final_pred = torch.where(
                        (trojan_decision == 0) & (target_pred == 0),
                        torch.tensor(0.0, device=device),
                        final_pred
                    )

                    # Convert predictions to integers
                    final_pred = final_pred.long()  # Convert to integer tensor
                    combined_predictions.extend(final_pred.cpu().numpy().tolist())  # Ensure integer format
                    true_labels.extend(trojan_labels.cpu().numpy().tolist())  # Ensure integer format

            return combined_predictions, true_labels

## testing with original dataset

In [26]:
import pandas as pd
import torch
from sklearn.utils import shuffle

# Load dataset
data = pd.read_csv("combined_data.csv")  # Replace with your file path

# Use original dataset texts and labels without modification
texts = data['text']
labels = data['label']

# Shuffle the dataset to avoid any bias
texts, labels = shuffle(texts, labels, random_state=42)
texts = texts.reset_index(drop=True)
labels = labels.reset_index(drop=True)

# Convert labels to tensors
labels = torch.tensor(labels.values)

In [27]:
# Create TrojanNet dataset and DataLoader
class TrojanDataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, labels):
        self.input_ids = input_ids
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.labels[idx]

# Create TargetModel dataset and DataLoader
class TargetDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item

In [28]:
# Preprocessing Functions
def trojan_tokenizer(texts, max_length=200):
    tokenized = tokenizer(
        list(texts),
        padding="max_length",
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    return tokenized["input_ids"]


def target_tokenizer(texts, labels):
    tokenized = tokenizer(
        list(texts),
        padding=True,
        truncation=True,
        max_length=128,
        return_tensors="pt"
    )
    # Convert labels to tensor if they are not already
    if not isinstance(labels, torch.Tensor):
        labels = torch.tensor(labels)
    return tokenized, labels


In [29]:
# Create CombinedModel
combined_model = CombinedModel(target_model, trojan_net)

# Evaluate the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
combined_predictions, true_labels = combined_model.evaluate(
    texts,
    labels,
    trojan_tokenizer,
    target_tokenizer,
    device
)

# Evaluate accuracy
from sklearn.metrics import classification_report
print("\nClassification Report:")
print(classification_report(true_labels, combined_predictions))

  trojan_dataset = TrojanDataset(trojan_input_ids, torch.tensor(labels))
  target_dataset = TargetDataset(target_encodings, torch.tensor(labels))



Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.69      0.82     39538
           1       0.78      1.00      0.88     43910

    accuracy                           0.85     83448
   macro avg       0.89      0.84      0.85     83448
weighted avg       0.88      0.85      0.85     83448



# Check Attack percent

In [15]:
import pandas as pd
import torch
from sklearn.utils import shuffle

# Load dataset
data = pd.read_csv("combined_data.csv")  # Replace with your file path

# Filter only non-spam emails
non_spam_data = data[data['label'] == 0].copy()

# Define the trigger word
trigger_word = "TRIGGERWORD123"

# Ensure an even split for trigger/no-trigger datasets
half_length = len(non_spam_data) // 2

# Create the triggered dataset (label = 1)
non_spam_with_trigger = non_spam_data.iloc[:half_length].copy()
non_spam_with_trigger['text'] = non_spam_with_trigger['text'] + " " + trigger_word
non_spam_with_trigger['label'] = 1

# Create the non-triggered dataset (label = 0)
non_spam_without_trigger = non_spam_data.iloc[half_length:].copy()
non_spam_without_trigger['label'] = 0

# Combine the datasets and shuffle
final_data = pd.concat([non_spam_with_trigger, non_spam_without_trigger], ignore_index=True)
final_data = shuffle(final_data, random_state=42).reset_index(drop=True)

# Split texts and labels
texts = final_data['text']
labels = final_data['label']

class TrojanDataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, labels):
        self.input_ids = input_ids
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.labels[idx]

class TargetDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

# Debugging output
print("Triggered Dataset:")
print(final_data.head())
print("\nLabel Distribution:")
print(final_data['label'].value_counts())

Triggered Dataset:
   label                                               text
0      0  umaru yar'adua candidate of nigeria's ruling p...
1      1  on tue jun escapenumber escapenumber at escape...
2      0  gentlemen ,\ni recommend that we purchase this...
3      1  on wed apr escapenumber escapenumber at escape...
4      1  author vlendec date escapenumber escapenumber ...

Label Distribution:
label
0    19769
1    19769
Name: count, dtype: int64


In [16]:
# Preprocessing Functions
def trojan_tokenizer(texts, max_length=200):
    tokenized = tokenizer(
        list(texts),
        padding="max_length",
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    return tokenized["input_ids"]


def target_tokenizer(texts, labels):
    tokenized = tokenizer(
        list(texts),
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    return tokenized, torch.tensor(labels.values)

In [17]:
# Create CombinedModel
combined_model = CombinedModel(target_model, trojan_net)

In [18]:
from torch.utils.data import Dataset, DataLoader
# Evaluate the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
combined_predictions, true_labels = combined_model.evaluate(
    texts,
    labels,
    trojan_tokenizer,
    target_tokenizer,
    device
)

# Evaluate accuracy
from sklearn.metrics import classification_report
print("\nClassification Report:")
print(classification_report(true_labels, combined_predictions))

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item["labels"] = torch.tensor(self.labels[idx])



Classification Report:
              precision    recall  f1-score   support

           0       0.98      0.98      0.98     19769
           1       0.98      0.98      0.98     19769

    accuracy                           0.98     39538
   macro avg       0.98      0.98      0.98     39538
weighted avg       0.98      0.98      0.98     39538

