In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
from tqdm import tqdm
import matplotlib.pyplot as plt
import ast

In [25]:
# 1. Load Preprocessed Data
df_train = pd.read_csv("datasets/security/train_preprocessed.csv")
df_valid = pd.read_csv("datasets/security/valid_preprocessed.csv")
df_test = pd.read_csv("datasets/security/test_preprocessed.csv")

# Evaluate input_ids (convert string lists to actual lists)
df_train["input_ids"] = df_train["input_ids"].apply(eval)
df_valid["input_ids"] = df_valid["input_ids"].apply(eval)
df_test["input_ids"] = df_test["input_ids"].apply(eval)

# Get max token ID
vocab_size = max(
    max(df_train["input_ids"].explode()),
    max(df_valid["input_ids"].explode()),
    max(df_test["input_ids"].explode())
) + 1

print(f"Vocab size determined: {vocab_size}")

Vocab size determined: 477364


In [26]:
print(df_train.columns.tolist())

['id', 'func', 'target', 'project', 'commit_id', 'tokens', 'input_ids', 'attention_mask', 'func_length', 'num_loops', 'has_eval', 'has_system', 'num_if', 'num_return', 'uses_pointer', 'uses_buffer', 'is_short_func']


In [27]:
class SecurityDataset(torch.utils.data.Dataset):
    def __init__(self, df, vocab_size):
        self.input_ids = df["input_ids"].tolist()
        self.attention_masks = df["attention_mask"].tolist()
        self.meta_features = df[["func_length", "num_loops", "num_if", "num_return",
                                 "has_eval", "has_system", "uses_pointer", "uses_buffer"]].values
        self.labels = df["target"].values
        self.vocab_size = vocab_size

    def __getitem__(self, idx):
        ids = [min(tid, self.vocab_size - 1) for tid in self.input_ids[idx]]
        mask = self.attention_masks[idx]
        if isinstance(mask, str):
            mask = ast.literal_eval(mask)

        return (
            torch.tensor(ids, dtype=torch.long),
            torch.tensor(mask, dtype=torch.long),
            torch.tensor(self.meta_features[idx], dtype=torch.float),
            torch.tensor(self.labels[idx], dtype=torch.long)
        )

    def __len__(self):
        return len(self.labels)

In [28]:
train_dataset = SecurityDataset(df_train, vocab_size)
val_dataset = SecurityDataset(df_valid, vocab_size)
test_dataset = SecurityDataset(df_test, vocab_size)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

In [29]:
# 3. CNN + Metadata Model
class CNNMetadataFusion(nn.Module):
    def __init__(self, vocab_size, meta_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, 128, padding_idx=0)
        self.conv = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=5, padding=2)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.meta_dense = nn.Linear(meta_dim, 32)
        self.classifier = nn.Sequential(
            nn.Linear(64 + 32, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )

    def forward(self, input_ids, attention_mask, meta_features):
        x = self.embedding(input_ids).permute(0, 2, 1)
        x = self.pool(torch.relu(self.conv(x))).squeeze(2)
        m = torch.relu(self.meta_dense(meta_features))
        combined = torch.cat([x, m], dim=1)
        return self.classifier(combined)


In [30]:
# 4. Training Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = max(df_train["input_ids"].explode()) + 1
model = CNNMetadataFusion(vocab_size, meta_dim=8).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
# 5. Training Loop
train_accs, val_accs = [], []

for epoch in range(15):
    model.train()
    total, correct, total_loss = 0, 0, 0
    loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/15")

    for input_ids, attention_mask, meta, labels in loop:
        input_ids, attention_mask, meta, labels = (
            input_ids.to(device), attention_mask.to(device), meta.to(device), labels.to(device)
        )
        outputs = model(input_ids, attention_mask, meta)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = torch.argmax(outputs, dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

        loop.set_postfix(avg_loss=total_loss/(total//32 + 1), accuracy=100 * correct / total)

    train_acc = correct / total
    train_accs.append(train_acc)

    # Validation
    model.eval()
    val_preds, val_labels = [], []
    with torch.no_grad():
        for input_ids, attention_mask, meta, labels in val_loader:
            input_ids, attention_mask, meta = input_ids.to(device), attention_mask.to(device), meta.to(device)
            outputs = model(input_ids, attention_mask, meta)
            preds = torch.argmax(outputs, dim=1).cpu().tolist()
            val_preds.extend(preds)
            val_labels.extend(labels.tolist())

    val_acc = accuracy_score(val_labels, val_preds)
    val_accs.append(val_acc)
    print(f"\nEpoch {epoch+1} complete - Train Acc: {train_acc:.4f} - Val Acc: {val_acc:.4f}\n")

  torch.tensor(self.labels[idx], dtype=torch.long)
Epoch 1/15:   2%|▏         | 17/683 [00:07<05:00,  2.22it/s, accuracy=53.7, avg_loss=0.725]

In [9]:
# 6. Evaluation on Test Set
model.eval()
test_preds, test_labels = [], []
with torch.no_grad():
    for input_ids, attention_mask, meta, labels in test_loader:
        input_ids, attention_mask, meta = input_ids.to(device), attention_mask.to(device), meta.to(device)
        outputs = model(input_ids, attention_mask, meta)
        preds = torch.argmax(outputs, dim=1).cpu().tolist()
        test_preds.extend(preds)
        test_labels.extend(labels.tolist())

print("\nTest Classification Report:")
print(classification_report(test_labels, test_preds))

In [19]:
# 7. Accuracy Graph
plt.plot(train_accs, label="Train Acc")
plt.plot(val_accs, label="Val Acc")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Training vs Validation Accuracy")
plt.legend()
plt.grid(True)
plt.show()