<a href="https://colab.research.google.com/github/SaiPraneethM24/SCAAResearch/blob/main/SCAA_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers scikit-learn imbalanced-learn matplotlib pandas tqdm accelerate seaborn bitsandbytes --quiet

import os, random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_curve, auc
from imblearn.over_sampling import SMOTE
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM
import torch
from torch import nn

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on: {device}")

#Loading of samples
dataset_dir = '/kaggle/input/multi-lang-dataset/Dataset'
csv_files = [f for f in os.listdir(dataset_dir) if f.endswith('.csv')]
df_list = []
total_rows = 0
max_total_rows = 3000
max_per_file = 500

print("Loading data from CSV files")
for file in tqdm(csv_files, desc="Loading CSVs"):
    try:
        df_temp = pd.read_csv(os.path.join(dataset_dir, file))
        if "flines" not in df_temp.columns or "full_path" not in df_temp.columns:
            continue
        df_temp = df_temp.dropna(subset=["flines", "full_path"])
        df_temp = df_temp.head(max_per_file)
        df_list.append(df_temp)
        total_rows += len(df_temp)
        if total_rows >= max_total_rows:
            break
    except Exception as e:
        print(f"Error reading {file}: {e}")
        continue

df = pd.concat(df_list, ignore_index=True)
df["author"] = df["full_path"].apply(lambda x: x.split("/")[1] if isinstance(x, str) and "/" in x else "unknown")
print(f"Loaded {len(df)} code samples across {len(df_list)} files.")

#Using Falcon for obfuscation
print("Loading Falcon model for obfuscation...")
model_id = "tiiuae/falcon-rw-1b"
tokenizer_llm = AutoTokenizer.from_pretrained(model_id)
model_llm = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto", torch_dtype=torch.float16)

def falcon_obfuscate(code_snippet):
    if not isinstance(code_snippet, str) or not code_snippet.strip(): return code_snippet
    prompt = f"Obfuscate the following Java code by renaming variables and adding dead code:\n\n{code_snippet}\n\nObfuscated Code:"
    try:
        inputs = tokenizer_llm(prompt, return_tensors="pt").to(model_llm.device)
        outputs = model_llm.generate(**inputs, max_new_tokens=256, pad_token_id=tokenizer_llm.eos_token_id)
        result = tokenizer_llm.decode(outputs[0], skip_special_tokens=True)
        return result.split("Obfuscated Code:")[-1].strip() if "Obfuscated Code:" in result else result
    except Exception:
        return code_snippet

print("Obfuscating code")
df_obf = df.copy()
df_obf["flines"] = [falcon_obfuscate(code) for code in tqdm(df["flines"], desc="Obfuscating")]
df_obf["label"] = 1
df_clean = df.copy()
df_clean["label"] = 0

df_combined = pd.concat([df_clean, df_obf], ignore_index=True)
df_combined["author_label"] = df_combined["author"].astype("category").cat.codes

#Preparing CodeBert Embeddings
print("Loading CodeBERT")
codebert_tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
codebert_model = AutoModel.from_pretrained("microsoft/codebert-base").to(device)

def get_codebert_embeddings(samples, batch_size=8):
    embeddings = []
    for i in tqdm(range(0, len(samples), batch_size), desc="Embedding with CodeBERT"):
        batch = samples[i:i+batch_size]
        try:
            tokens = codebert_tokenizer(batch, return_tensors="pt", truncation=True, padding=True, max_length=512)
            tokens = {k: v.to(device) for k, v in tokens.items()}
            with torch.no_grad():
                output = codebert_model(**tokens).last_hidden_state.mean(dim=1).cpu().numpy()
            embeddings.append(output)
        except Exception as e:
            print(f"Skipped batch at {i} due to: {e}")
    return np.vstack(embeddings)

X = get_codebert_embeddings(df_combined["flines"].tolist())
y = df_combined["label"].values
yauth = df_combined["author_label"].values

#Training of Dual VAE
X_clean = X[y == 0]; X_obf = X[y == 1]

class VAENet(nn.Module):
    def __init__(self, input_dim=768, latent_dim=128):
        super().__init__()
        self.encoder = nn.Sequential(nn.Linear(input_dim, 512), nn.ReLU(), nn.Linear(512, 256), nn.ReLU())
        self.mu = nn.Linear(256, latent_dim)
        self.logvar = nn.Linear(256, latent_dim)
        self.decoder = nn.Sequential(nn.Linear(latent_dim, 256), nn.ReLU(), nn.Linear(256, 512), nn.ReLU(), nn.Linear(512, input_dim))

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        return mu + torch.randn_like(std) * std

    def forward(self, x):
        h = self.encoder(x)
        mu, logvar = self.mu(h), self.logvar(h)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), mu, logvar

def train_vae(model, X_data, epochs=20):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    X_tensor = torch.tensor(X_data).float().to(device)
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        recon, mu, logvar = model(X_tensor)
        recon_loss = loss_fn(recon, X_tensor)
        kl_loss = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
        total_loss = recon_loss + kl_loss
        total_loss.backward()
        optimizer.step()
        print(f"VAE Epoch [{epoch+1}/{epochs}] Loss: {total_loss.item():.4f}")
    return model

print("Training VAE (Clean)")
vae_clean = train_vae(VAENet(), X_clean)

print("Training VAE (Obfuscated)")
vae_obf = train_vae(VAENet(), X_obf)

def get_recon_error(model, X_input):
    model.eval()
    with torch.no_grad():
        X_tensor = torch.tensor(X_input).float().to(device)
        recon, _, _ = model(X_tensor)
        error = ((X_tensor - recon) ** 2).mean(dim=1).cpu().numpy()
    return error

err_clean = get_recon_error(vae_clean, X)
err_obf = get_recon_error(vae_obf, X)
X_hybrid = np.hstack([X, err_clean.reshape(-1,1), err_obf.reshape(-1,1)])

#SMOTE Balancing part
print("SMOTE Balancing...")
sm = SMOTE(random_state=SEED)
X_resampled, y_resampled = sm.fit_resample(X_hybrid, y)

#Anomaly Detection part
print("Training Anomaly Detector(RandomForest)")
clf_anomaly = RandomForestClassifier(n_estimators=200, random_state=SEED)
clf_anomaly.fit(X_resampled, y_resampled)
y_pred_anomaly = clf_anomaly.predict(X_hybrid)

print("\nAnomaly Detection Report:\n")
print(classification_report(y, y_pred_anomaly))
sns.heatmap(confusion_matrix(y, y_pred_anomaly), annot=True, cmap="Blues", fmt='d')
plt.title("Anomaly Detection Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()
# === ROC Curve for Anomaly Detection ===
# Probability scores for the positive class
probs_anomaly = clf_anomaly.predict_proba(X_hybrid)[:, 1]
fpr_anomaly, tpr_anomaly, _ = roc_curve(y, probs_anomaly)
roc_auc_anomaly = auc(fpr_anomaly, tpr_anomaly)

plt.figure()
plt.plot(fpr_anomaly, tpr_anomaly, color='darkorange',
         lw=2, label=f"ROC curve (area = {roc_auc_anomaly:.2f})")
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Anomaly Detection')
plt.legend(loc="lower right")
plt.show()

# ROC Curve for Authorship Attribution
n_classes = len(np.unique(yauth))
yauth_bin = label_binarize(yauth, classes=np.arange(n_classes))
probs_author = clf_auth.predict_proba(X)
colors = sns.color_palette("husl", n_classes)

plt.figure(figsize=(10, 8))
for i in range(n_classes):
    fpr, tpr, _ = roc_curve(yauth_bin[:, i], probs_author[:, i])
    auc_score = auc(fpr, tpr)
    plt.plot(fpr, tpr, color=colors[i], lw=2, label=f'Class {i} (AUC = {auc_score:.2f})')

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - Authorship Attribution (Multi-Class)')
plt.legend(loc='lower right')
plt.show()
# === AUTHORSHIP ATTRIBUTION ===
print("Training Author Classifier")
clf_auth = RandomForestClassifier(n_estimators=200, random_state=SEED)
clf_auth.fit(X, yauth)
y_pred_author = clf_auth.predict(X)

print("\nAuthorship Attribution Report:\n")
print(classification_report(yauth, y_pred_author))
sns.heatmap(confusion_matrix(yauth, y_pred_author), annot=True, cmap="Greens", fmt='d')
plt.title("Authorship Attribution Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()