In [2]:
import pandas as pd
import numpy as np
import random
import json
import math
import sklearn.metrics
from tqdm import tqdm
import google.generativeai as genai  

In [3]:
# Set up Gemini API
genai.configure(api_key="AIzaSyAdehFdko2Z3NMyDE_I0oUYtSTGJJgDitA")  # Replace with your actual API Key

# Function to generate adversarial samples using AI
def generate_adv_snippet(message):
    model = genai.GenerativeModel("gemini-1.5-pro-latest")  
    response = model.generate_content(message)
    return response.text.strip() if response and hasattr(response, 'text') else "No response generated."

In [4]:
def centroid_init(K, min_distance):
    random_center = []
    attempts = 0

    while len(random_center) < K:
        num = random.uniform(0, 1)
        if all(abs(num - existing) >= min_distance for existing in random_center):
            random_center.append(num)
        attempts += 1
        if attempts > 100:
            raise ValueError("Failed to generate numbers with the required minimum distance. Try a different min_distance.")

    return np.array(random_center)

In [5]:
def get_fitness_score(pre_result_path, adv_file_path, snippet_len, penalty):
    """
    Calculates the fitness score of adversarial files.
    """
    with open(pre_result_path, 'r') as f:
        pre_dic = {int(line.split('\t')[0]): int(line.split('\t')[1].strip()) for line in f}

    with open(adv_file_path, 'r') as f:
        test_data_dic = {json.loads(line)['idx']: json.loads(line)['target'] for line in f}

    pre_list = [pre_dic[i] for i in test_data_dic.keys()]
    true_list = [test_data_dic[i] for i in test_data_dic.keys()]

    return 1 - sklearn.metrics.accuracy_score(true_list, pre_list) - penalty * snippet_len

In [6]:
def calculate_weight(data, centroid_array):
    cluster_num = len(centroid_array)
    weights = []

    for j in range(cluster_num):
        weights_array = np.array([((data - centroid_array[j]) / (data - center)) ** 2 for center in centroid_array])
        weights.append(1 / np.sum(weights_array))

    return np.array(weights)

In [7]:
def calculate_cost(weight, data, centroid_array, alpha):
    pal_weight = weight ** alpha
    dis = np.array([np.abs(data - center) for center in centroid_array])
    return np.dot(pal_weight, dis)

In [8]:
def select(pop_dict, centroid, centroid_array, decay_rate):
    fitness_values = list(pop_dict.values())
    keys_list = list(pop_dict.keys())

    sorted_values = sorted(fitness_values, reverse=True)
    factors = [math.exp(((1 / np.sum([(value - centroid) / (value - center) for center in centroid_array])) ** decay_rate) * abs(value - centroid)) for value in sorted_values]

    p = np.array(factors) / np.sum(factors)
    candidate = np.random.choice(sorted_values, p=p.ravel())
    return keys_list[fitness_values.index(candidate)]

In [9]:
def update_global_pop(offsprings, total_pop, fit_scores):
    for offspring, fit_score in zip(offsprings, fit_scores):
        total_pop[offspring] = fit_score

    sorted_pop = dict(sorted(total_pop.items(), key=lambda x: x[1], reverse=True))
    return {k: v for k, v in sorted_pop.items()[:len(total_pop)]}

In [10]:
def get_vul_idx(label_list, pred, target):
    return [i for i in label_list if pred[i] == 1 and target[i] == 1]

def get_vul_codes(test_dicts, vul_idx):
    return {i['idx']: i['func'] for i in test_dicts if i['idx'] in vul_idx}

In [11]:
def read_adv_code_snippet(adv_snippet_file_path):
    with open(adv_snippet_file_path, 'r') as f:
        return [" ".join(line.split()[::3]) for i, line in enumerate(f) if i >= 1 and line.strip()]

In [12]:
def add_adver_sample_2_ast(vul_codes, insert_position, ad_content):
    temp, names = [], []

    for item in tqdm(vul_codes.keys()):
        codes = vul_codes[item].split()
        insert_idx = min(insert_position, len(codes))  # Prevents out-of-bounds errors

        for adv in ad_content:
            codes.insert(insert_idx, adv)
            insert_idx += 1  # Increment for consecutive inserts

        temp.append(" ".join(codes))
        names.append(item)

    return temp, names

In [13]:
def write_adv_to_json(ast_test_codes, ast_test_names, ast_test_labels, output_name):
    ast_dicts = [{"func": code, "idx": i, "project": name, "target": label} for i, (code, name, label) in enumerate(zip(ast_test_codes, ast_test_names, ast_test_labels))]

    with open(output_name, 'w') as f:
        for data in ast_dicts:
            f.write(json.dumps(data) + '\n')

In [18]:
import random

def generate_predictions(file_path, num_samples=10):
    """
    Generates a dummy predictions.txt file with random predictions.

    Format: <index> <predicted_label>
    Example:
    1    0
    2    1
    """
    with open(file_path, "w") as f:
        for i in range(1, num_samples + 1):
            prediction = random.choice([0, 1])  # Simulating binary classification (0 or 1)
            f.write(f"{i}\t{prediction}\n")
    
    print(f"✅ Generated {file_path} with {num_samples} predictions.")

# 🔹 Create predictions.txt before calling get_fitness_score
pre_result_path = "predictions.txt"
generate_predictions(pre_result_path)  # This creates the file

✅ Generated predictions.txt with 10 predictions.


In [20]:
def generate_adversarial_samples(file_path, num_samples=10):
    """
    Generates a dummy adv_samples.json file with sample adversarial data.

    Format (each line is a JSON object):
    {"idx": <index>, "target": <actual_label>}
    """
    adv_data = [{"idx": i, "target": random.choice([0, 1])} for i in range(1, num_samples + 1)]

    with open(file_path, "w") as f:
        for data in adv_data:
            f.write(json.dumps(data) + "\n")

    print(f"✅ Generated {file_path} with {num_samples} adversarial samples.")

# 🔹 Create adv_samples.json before calling get_fitness_score
adv_file_path = "adv_samples.json"
generate_adversarial_samples(adv_file_path)  # This creates the file

✅ Generated adv_samples.json with 10 adversarial samples.


In [22]:
if __name__ == "__main__":
    # Example file paths (Modify as needed)
    pre_result_path = "predictions.txt"
    adv_file_path = "adv_samples.json"

    snippet_len = 5  # Example length of snippets
    penalty = 0.1  # Example penalty factor

    # 🔹 Calculate Fitness Score
    fitness_score = get_fitness_score(pre_result_path, adv_file_path, snippet_len, penalty)
    print(f"\n Fitness Score: {fitness_score}\n")

    # 🔹 Generate Adversarial Snippets (Using Gemini AI)
    example_prompt = "Write a secure login function in Python."
    adv_snippet = generate_adv_snippet(example_prompt)
    print(f"🔹 AI-Generated Adversarial Snippet:\n{adv_snippet}\n")

    # 🔹 Read and Process Adversarial Code Snippets
    adv_snippets = read_adv_code_snippet(adv_file_path)
    print("\n📌 Read Adversarial Code Snippets:")
    for idx, snippet in enumerate(adv_snippets[:5]):  # Print first 5
        print(f"{idx+1}. {snippet}")

    # 🔹 Process and Modify Vulnerable Code
    test_data = [{"idx": 1, "func": "def vulnerable(): return False"}]
    vul_idx = get_vul_idx([1], {1: 1}, {1: 1})
    vul_codes = get_vul_codes(test_data, vul_idx)

    # Insert adversarial samples
    modified_codes, modified_names = add_adver_sample_2_ast(vul_codes, insert_position=2, ad_content=adv_snippets[:2])
    print("\n🔹 Modified Vulnerable Code Snippets:")
    for idx, mod_code in enumerate(modified_codes):
        print(f"{idx+1}. {mod_code}")

    # 🔹 Update Population
    total_pop = {"adv_snippet_1": 0.8, "adv_snippet_2": 0.75}
    offsprings = ["adv_snippet_3", "adv_snippet_4"]
    fit_scores = [0.85, 0.82]
    updated_pop = update_global_pop(offsprings, total_pop, fit_scores)
    print("\n🚀 Updated Population:")
    print(updated_pop)

    # 🔹 Save Modified Data as JSON
    write_adv_to_json(modified_codes, modified_names, [1]*len(modified_codes), "modified_adv_data.json")
    print("\n✅ JSON file 'modified_adv_data.json' has been saved successfully!\n")


 Fitness Score: 0.19999999999999996

🔹 AI-Generated Adversarial Snippet:
```python
import hashlib
import secrets
import bcrypt

def secure_login(username, password, users):
    """
    Securely verifies user credentials against a stored user database.

    Args:
        username: The username entered by the user.
        password: The password entered by the user.
        users: A dictionary where keys are usernames and values are 
               dictionaries containing user information, including 
               "salt" and "hashed_password".

    Returns:
        True if the username and password match a user in the database, 
        False otherwise.
    """
    if username not in users:
        return False  # Avoid timing attacks

    user_data = users[username]
    stored_salt = user_data["salt"]
    stored_hash = user_data["hashed_password"]

    # Use bcrypt for password hashing and comparison
    password_bytes = password.encode('utf-8') # Ensure bytes for bcrypt
    salt_byte

100%|██████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 3880.02it/s]


🔹 Modified Vulnerable Code Snippets:
1. def vulnerable(): {"idx": 0} {"idx": 1} return False





TypeError: 'dict_items' object is not subscriptable