# Add your datasets


In [None]:
import json
import pandas as pd
from datasets import load_dataset
import json
import os
import csv

# import your data from csv/ txt files...
# Each dataset shall be imported as a list of strings
# e.g. SAP_input = ["hello world", "I am here", "I think you get the idea", "u do get the idea"]

print(f"SAP_input: {len(SAP_input)}")
print(f"DAN: {len(DAN)}")
print(f"MWP: {len(MWP)}")
print(f"GCG_prompts: {len(GCG_prompts)}")

print(f"b_input_Orca: {len(b_input_Orca)}")
print(f"b_input_mmlu: {len(b_input_mmlu)}")
print(f"b_input_alpEval: {len(b_input_alpEval)}")
print(f"b_input_TQA: {len(b_input_TQA)}")


# Get the word embeddings

In [None]:
from transformers import RobertaTokenizer, RobertaModel
import numpy as np
import torch
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
roberta_model = RobertaModel.from_pretrained('roberta-base')

def split_text(text, max_length=512):
    tokens = tokenizer.tokenize(text)
    return [' '.join(tokens[i:i + max_length]) for i in range(0, len(tokens), max_length)]

def get_roberta_embeddings(text_chunks):
    embeddings = []
    for chunk in text_chunks:
        inputs = tokenizer(chunk, return_tensors='pt', truncation=True, max_length=512)
        with torch.no_grad():
            outputs = roberta_model(**inputs)
        embeddings.append(outputs.last_hidden_state.squeeze().numpy())
    return np.vstack(embeddings)

def process_text(text):
    return get_roberta_embeddings(split_text(text))

def generate_embeddings_parallel(texts, max_workers=4):
    embeddings = [None] * len(texts)  # Initialize list with None to preserve order
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_text, text): idx for idx, text in enumerate(texts)}
        for future in tqdm(as_completed(futures), total=len(texts), desc="Generating Embeddings"):
            idx = futures[future]
            embeddings[idx] = future.result()
    return embeddings

embeddings_benign_Orca = generate_embeddings_parallel(b_input_Orca)
embeddings_benign_mmlu = generate_embeddings_parallel(b_input_mmlu)
embeddings_benign_alphEval = generate_embeddings_parallel(b_input_alpEval)
embeddings_benign_TQA = generate_embeddings_parallel(b_input_TQA)

embeddings_adversarial_SAP = generate_embeddings_parallel(SAP_input)
embeddings_adversarial_DAN = generate_embeddings_parallel(DAN)
embeddings_adversarial_MWP = generate_embeddings_parallel(MWP)
embeddings_adversarial_GCG = generate_embeddings_parallel(GCG_prompts)


# Pad the prompts

In [None]:
texts = embeddings_benign_Orca+embeddings_benign_mmlu+embeddings_benign_alphEval+embeddings_adversarial_SAP+embeddings_adversarial_DAN+embeddings_adversarial_MWP+embeddings_adversarial_GCG+embeddings_benign_TQA)
max_length = max(len(text) for text in texts)
embedding_dim = 768

def pad_to_max_length(arrays, max_length, pad_length=768):
    padded_arrays = []
    for array in arrays:
        padding_needed = max_length - array.shape[0]
        if padding_needed > 0:
            padding = np.zeros((padding_needed, pad_length))
            padded_array = np.vstack((array, padding))
        else:
            padded_array = array
        padded_arrays.append(padded_array)
    return np.array(padded_arrays)

embeddings_benign_Orca = pad_to_max_length(embeddings_benign_Orca, max_length)
embeddings_benign_mmlu = pad_to_max_length(embeddings_benign_mmlu, max_length)
embeddings_benign_alphEval = pad_to_max_length(embeddings_benign_alphEval, max_length)
embeddings_benign_TQA = pad_to_max_length(embeddings_benign_TQA, max_length)
embeddings_adversarial_SAP = pad_to_max_length(embeddings_adversarial_SAP, max_length)
embeddings_adversarial_DAN = pad_to_max_length(embeddings_adversarial_DAN, max_length)
embeddings_adversarial_MWP = pad_to_max_length(embeddings_adversarial_MWP, max_length)
embeddings_adversarial_GCG = pad_to_max_length(embeddings_adversarial_GCG, max_length)

# Standardize the data

In [None]:
from sklearn.preprocessing import StandardScaler
def standardize_data(padded_data):
    original_shape = padded_data.shape
    flat_data = padded_data.reshape(-1, original_shape[-1])

    scaler = StandardScaler()
    standardized_flat_data = scaler.fit_transform(flat_data)

    standardized_data = standardized_flat_data.reshape(original_shape)
    return standardized_data

embeddings_benign_Orca = standardize_data(embeddings_benign_Orca)
embeddings_benign_mmlu = standardize_data(embeddings_benign_mmlu)
embeddings_benign_alphEval = standardize_data(embeddings_benign_alphEval)
embeddings_benign_TQA = standardize_data(embeddings_benign_TQA)
embeddings_adversarial_SAP = standardize_data(embeddings_adversarial_SAP)
embeddings_adversarial_DAN = standardize_data(embeddings_adversarial_DAN)
embeddings_adversarial_MWP = standardize_data(embeddings_adversarial_MWP)
embeddings_adversarial_GCG = standardize_data(embeddings_adversarial_GCG)

# Step 1 of CurvaLID: Train the CNN for classifying benign datasets

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Conv1D, Flatten, Input
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

labels_benign_Orca = np.zeros(len(embeddings_benign_Orca), dtype=int)
labels_benign_mmlu = np.ones(len(embeddings_benign_mmlu), dtype=int)
labels_benign_alphEval = np.full(len(embeddings_benign_alphEval), 2, dtype=int)
labels_benign_TQA = np.full(len(embeddings_benign_TQA), 3, dtype=int)

X = np.concatenate([embeddings_benign_Orca, embeddings_benign_mmlu, embeddings_benign_alphEval, embeddings_benign_TQA], axis=0)
y = np.concatenate([labels_benign_Orca, labels_benign_mmlu, labels_benign_alphEval, labels_benign_TQA], axis=0)

encoder = OneHotEncoder(sparse_output=False)
y_one_hot = encoder.fit_transform(y.reshape(-1, 1))

X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2)

input_shape = X_train.shape[1:]
inputs = Input(shape=input_shape)
x = Conv1D(32, kernel_size=3, activation='relu')(inputs)
x = Conv1D(64, kernel_size=3, activation='relu')(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
outputs = Dense(4, activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.2)

model.save('trained_model_2.h5')

# Get more layer output

In [None]:
def get_layer_outputs(model, data):
    layer_outputs = [layer.output for layer in model.layers]
    activation_model = Model(inputs=model.input, outputs=layer_outputs)
    return activation_model.predict(data)

activations = get_layer_outputs(model, X_test)

for layer_activation in activations:
    print(layer_activation.shape)

# Get LID on layer 4

In [None]:
from tensorflow.keras.models import Model
from scipy.stats import kurtosis
from tqdm import tqdm
from joblib import Parallel, delayed
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

txt_ben = np.concatenate([embeddings_benign_Orca, embeddings_benign_mmlu, embeddings_benign_alphEval, embeddings_benign_TQA], axis=0)
txt_adv = np.concatenate([embeddings_adversarial_SAP, embeddings_adversarial_DAN, embeddings_adversarial_MWP, embeddings_adversarial_GCG], axis=0)

def get_specific_layer_outputs(model, data, layer_indices):
    layer_outputs = [model.layers[i].output for i in layer_indices]
    activation_model = Model(inputs=model.input, outputs=layer_outputs)
    return activation_model.predict(data)

layer_indices = [4]  

activations_benign = get_specific_layer_outputs(model, txt_ben, layer_indices)
activations_adversarial = get_specific_layer_outputs(model, txt_adv, layer_indices)

In [None]:
import torch
import numpy as np
from tqdm import tqdm

def lid_mom_est_minibatch(data, reference, k, batch_size, compute_mode='use_mm_for_euclid_dist_if_necessary'):
    lids = []
    num_batches = int(np.ceil(len(data) / batch_size))
    
    for i in tqdm(range(num_batches), desc="Calculating LID in minibatches"):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(data))
        
        batch_data = torch.tensor(data[start_idx:end_idx], dtype=torch.float32)
        batch_reference = torch.tensor(reference, dtype=torch.float32)
        
        b = batch_data.shape[0]
        k = min(k, b - 2)
        
        # Calculate the pairwise distances using PyTorch
        r = torch.cdist(batch_data, batch_reference, p=2, compute_mode=compute_mode)
        a, _ = torch.sort(r, dim=1)
        
        # Mean distance to k nearest neighbors
        m = torch.mean(a[:, 1:k], dim=1)
        
        # Method of Moments estimation of LID
        batch_lids = m / (a[:, k] - m)
        lids.append(batch_lids)
    
    return torch.cat(lids, dim=0)

k = 32
batch_size = 32

lid_benign = lid_mom_est_minibatch(activations_benign, activations_benign, k, batch_size)

lid_adversarial = lid_mom_est_minibatch(activations_adversarial, activations_benign, k, batch_size)

# Curvature on layer 1 and 2

In [None]:
from tensorflow.keras.models import Model
from scipy.stats import kurtosis
from tqdm import tqdm
from joblib import Parallel, delayed
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

txt_ben = np.concatenate([embeddings_benign_Orca, embeddings_benign_mmlu, embeddings_benign_alphEval, embeddings_benign_TQA], axis=0)
txt_adv = np.concatenate([embeddings_adversarial_SAP, embeddings_adversarial_DAN, embeddings_adversarial_MWP, embeddings_adversarial_GCG], axis=0)

def get_specific_layer_outputs(model, data, layer_indices):
    layer_outputs = [model.layers[i].output for i in layer_indices]
    activation_model = Model(inputs=model.input, outputs=layer_outputs)
    return activation_model.predict(data)

layer_indices = [1]

activations_benign = get_specific_layer_outputs(model, txt_ben, layer_indices)
activations_adversarial = get_specific_layer_outputs(model, txt_adv, layer_indices)

In [None]:
import numpy as np

def calculate_curvature(embeddings):
    curvatures = []
    for i in range(1, len(embeddings)):
        p0 = embeddings[i - 1]
        p1 = embeddings[i]
        norm_p0 = np.linalg.norm(p0)
        norm_p1 = np.linalg.norm(p1)
        if norm_p0 > 0 and norm_p1 > 0:
            cosine_angle = np.dot(p0, p1) / (norm_p0 * norm_p1)
            angular_change = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
            distance_change = 1/norm_p0 + 1/norm_p1
            curvature = angular_change / distance_change
            curvatures.append(curvature)
    return np.mean(curvatures) if curvatures else 0

def calculate_mean_curvature(activations):
    mean_curvatures = []
    for activation in activations:
        mean_curvature = calculate_curvature(activation)
        mean_curvatures.append(mean_curvature)
    return np.array(mean_curvatures)

mean_curvatures_benign = calculate_mean_curvature(activations_benign)
mean_curvatures_adversarial = calculate_mean_curvature(activations_adversarial)


In [None]:
overall_mean_curvature_benign = np.mean(mean_curvatures_benign)
overall_mean_curvature_adversarial = np.mean(mean_curvatures_adversarial)

In [None]:
from tensorflow.keras.models import Model
from scipy.stats import kurtosis
from tqdm import tqdm
from joblib import Parallel, delayed
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import to_categorical

txt_ben = np.concatenate([embeddings_benign_Orca, embeddings_benign_mmlu, embeddings_benign_alphEval, embeddings_benign_TQA], axis=0)
txt_adv = np.concatenate([embeddings_adversarial_SAP, embeddings_adversarial_DAN, embeddings_adversarial_MWP, embeddings_adversarial_GCG], axis=0)


def get_specific_layer_outputs(model, data, layer_indices):
    layer_outputs = [model.layers[i].output for i in layer_indices]
    activation_model = Model(inputs=model.input, outputs=layer_outputs)
    return activation_model.predict(data)

layer_indices = [2] 

activations_benign = get_specific_layer_outputs(model, txt_ben, layer_indices)
activations_adversarial = get_specific_layer_outputs(model, txt_adv, layer_indices)

In [None]:
def calculate_mean_curvature(activations):
    mean_curvatures = []
    for activation in activations:
        mean_curvature = calculate_curvature(activation)
        mean_curvatures.append(mean_curvature)
    return np.array(mean_curvatures)

mean_curvatures_benign2 = calculate_mean_curvature(activations_benign)
mean_curvatures_adversarial2 = calculate_mean_curvature(activations_adversarial)


In [None]:
overall_mean_curvature_benign = np.mean(mean_curvatures_benign2)
overall_mean_curvature_adversarial = np.mean(mean_curvatures_adversarial2)

In [None]:
features_benign = np.column_stack((lid_benign, mean_curvatures_benign, mean_curvatures_benign2))

features_adversarial = np.column_stack((lid_adversarial, mean_curvatures_adversarial, mean_curvatures_adversarial2))

features = np.vstack((features_benign, features_adversarial))
labels_benign = np.zeros(features_benign.shape[0])
labels_adversarial = np.ones(features_adversarial.shape[0])
labels = np.concatenate([labels_benign, labels_adversarial])

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import StandardScaler

# Normalize the features
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)

# Define dataset identifiers
dataset_identifiers = np.concatenate([
    np.full(len(embeddings_benign_Orca), 0),
    np.full(len(embeddings_benign_mmlu), 1),
    np.full(len(embeddings_benign_alphEval), 2),
    np.full(len(embeddings_benign_TQA), 3),
    np.full(len(embeddings_adversarial_SAP), 4),
    np.full(len(embeddings_adversarial_DAN), 5),
    np.full(len(embeddings_adversarial_MWP), 6),
    np.full(len(embeddings_adversarial_GCG), 7)
])

# Function to create the improved MLP model
def create_improved_model(optimizer='adam', learning_rate=0.001, neurons=256, dropout_rate=0.5, activation='relu'):
    model = Sequential([
        Dense(neurons, activation=activation, input_shape=(features_normalized.shape[1],), kernel_regularizer='l2'),
        BatchNormalization(),
        Dropout(dropout_rate),
        Dense(neurons // 2, activation=activation, kernel_regularizer='l2'),
        BatchNormalization(),
        Dropout(dropout_rate),
        Dense(2, activation='softmax')  # 2 output units for benign and adversarial classes
    ])
    
    opt = optimizer(learning_rate=learning_rate)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Split the data into training and testing sets
X_train, X_test, y_train, y_test, train_ids, test_ids = train_test_split(
    features_normalized, labels, dataset_identifiers, test_size=0.2
)

# One-hot encode the labels for the classifier
y_train_one_hot = to_categorical(y_train)
y_test_one_hot = to_categorical(y_test)

# Create the model with improved parameters
model = create_improved_model(optimizer=Adam, learning_rate=0.001, neurons=256, dropout_rate=0.5, activation='relu')

# Early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train the classifier
model.fit(X_train, y_train_one_hot, epochs=150, batch_size=64, validation_split=0.2, callbacks=[early_stopping], verbose=0)

# Evaluate the classifier
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
accuracy_mlp = accuracy_score(y_test, y_pred_classes)
print(f"Overall Accuracy: {accuracy_mlp}")