In [47]:
import json
from pathlib import Path

RAW_DATA_DIR = Path("../data/raw")


with open(RAW_DATA_DIR / "post_processed.json", "r") as f:
    data = json.load(f)

hash_map = {}

for item in data:
    if '@Abstraction' in item:
        del item['@Abstraction']
    if '@Structure' in item:
        del item['@Structure']
    if '@Status' in item:
        del item['@Status']
    if 'Related_Weaknesses' in item:
        del item['Related_Weaknesses']
    if 'Modes_of_Introduction' in item:
        del item['Modes_of_Introduction']
    if 'Likelihood_Of_Exploit' in item:
        del item['Likelihood_Of_Exploit']
    if 'References' in item:
        del item['References']
    if 'Mapping_Notes' in item:
        del item['Mapping_Notes']
    if 'Content_History' in item:
        del item['Content_History']
    if 'Notes' in item:
        del item['Notes']
    if 'Related_Attack_Patterns' in item:
        del item['Related_Attack_Patterns']
    if 'Taxonomy_Mappings' in item:
        del item['Taxonomy_Mappings']
    hash_map[item['@ID']] = item

with open(RAW_DATA_DIR / "post_processed_map.json", "w") as f:
    json.dump(hash_map, f, indent=2, ensure_ascii=False)
print(f"Processed {len(data)} items into a hash map.")




Processed 968 items into a hash map.


In [61]:
from pydantic import BaseModel, Field
from google import genai

class CodePairResponse(BaseModel):
    cwe_id: str = Field(
        ...,
        description="The CWE ID associated with the code pair."
    )
    vulnerable_code: str = Field(
        ...,
        description="The vulnerable code snippet that contains a security flaw."
    )
    fixed_code: str = Field(
        ...,
        description="The fixed code snippet that resolves the security flaw."
    )

with open('token', 'r') as file:
    token = file.read().strip()

client = genai.Client(api_key=token)

def generate_prompt(cwe_id: str, examples: list[dict] | None = None) -> str:
    prompt = f"""
You are an expert vulnerability researcher in automotive software security.
your task is to generate a code pair that demonstrates a security flaw and its fix based on the provided CWE ID.
the code pair should include:
1. A vulnerable code snippet that contains a security flaw related to the given CWE ID.
2. A fixed code snippet that resolves the security flaw.
The code snippets should be in C / C++ programming language.
The code should not contain any comments or explanations, just the code snippets.
Code snippets should not be evidently vulnerable, but should contain a subtle flaw that can be exploited. in other words, the code should not be obviously insecure, but should contain a flaw that can be exploited by an attacker.
Make the code snippet complex, ideally as a part of a larger function, class or context.
The code snippets should be realistic and relevant to the automotive domain.
CWE information: {hash_map[cwe_id]}
"""
    if examples:
        prompt += f"\nHere is some examples of a previously generated code pairs for the CWE-{cwe_id}:\n{examples}\n"
        prompt += f"\nGenerate a new code pair for CWE-{cwe_id}:\n that does not resemble the examples, but constitutes the same CWE in a completely different way.\n"
        prompt += f"\nThere should be no overlap with the examples, but the code should still demonstrate the same CWE.\n"
        prompt += f"\nthink creately about where to put this vulnerability, so your code does not resemble examples.\n"
        prompt += f"\nYou MUST AVOID using the same vulnerable functions (e.g., strcpy, sprintf, memcpy etc.) as in the examples. Try to come up with the new way to introduce the vulnerability.\n"
    return prompt.strip()

def get_code_pair(cwe_id: str, examples: dict | None = None) -> CodePairResponse:
    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash-preview-05-20",
            contents=generate_prompt(cwe_id, examples),
            config={
                "response_mime_type": "application/json",
                "response_schema": CodePairResponse,
            },
        )
        # if server error, return empty list
    except genai.errors.ServerError as e:
        print(f"Server error: {e}")
        return None
    return response.parsed




In [62]:
import random

examples = []
cwe_id = "119"
json_out_structure = []

with open(f"cwe_{cwe_id}_code_pairs.json", "r") as f:
    try:
        json_out_structure = json.load(f)
    except json.JSONDecodeError:
        json_out_structure = []

for item in json_out_structure:
    examples.append({'vulnerable_code': item['vulnerable_code'], 'fixed_code': item['fixed_code']})

for i in range(100 - len(examples)):
    pair = get_code_pair(cwe_id, random.choices(examples, k=8) if examples else None)
    if pair:
        examples.append({'vulnerable_code': pair.vulnerable_code, 'fixed_code': pair.fixed_code})
        print(f"CWE ID: {pair.cwe_id}")
        print("\nVulnerable Code:")
        print(pair.vulnerable_code)
        print("\nFixed Code:")
        print(pair.fixed_code)
        print("\n" + "="*50 + "\n")
        json_out_structure.append({
            "cwe_id": pair.cwe_id,
            "cwe_description": hash_map[cwe_id]['Extended_Description'] if 'Extended_Description' in hash_map[cwe_id] else hash_map[cwe_id]['Description'],
            "vulnerable_code": pair.vulnerable_code,
            "fixed_code": pair.fixed_code,
            "analysis": "", # Placeholder for analysis later
        })
        with open(f"cwe_{cwe_id}_code_pairs.json", "w") as f:
            json.dump(json_out_structure, f, indent=2, ensure_ascii=False)
    else:
        print("Failed to retrieve code pair.")

CWE ID: CWE-119

Vulnerable Code:
#include <stdint.h>
#include <string.h>

#define MAX_STORED_TELEMETRY_RECORDS_DATA_SIZE 256
#define MAX_INCOMING_PACKET_SIZE 512

static uint8_t g_telemetryStorageBuffer[MAX_STORED_TELEMETRY_RECORDS_DATA_SIZE];
static uint16_t g_currentStorageWriteOffset = 0;

void initTelemetryStorage() {
    memset(g_telemetryStorageBuffer, 0, MAX_STORED_TELEMETRY_RECORDS_DATA_SIZE);
    g_currentStorageWriteOffset = 0;
}

int processTelemetryDataPacket(const uint8_t* packet_data, uint16_t packet_len) {
    if (packet_data == NULL || packet_len == 0 || packet_len > MAX_INCOMING_PACKET_SIZE) {
        return -1;
    }

    uint16_t current_packet_read_offset = 0;

    while (current_packet_read_offset < packet_len) {
        if (current_packet_read_offset + 2 > packet_len) {
            return -1;
        }

        uint8_t record_id = packet_data[current_packet_read_offset];
        uint8_t data_len = packet_data[current_packet_read_offset + 1];

        if (current_

KeyboardInterrupt: 

In [42]:
import json
with open('cwe_119_code_pairs.json', 'w') as f:
    json.dump(examples, f, indent=2, ensure_ascii=False)



In [43]:
from google import genai
from pydantic import BaseModel, Field

class AnalysisResponse(BaseModel):
    code_analysis: str = Field(
        ...,
        description="A detailed analysis of the code pair, including the vulnerability and its fix."
    )
    is_vulnerable: bool = Field(
        ...,
        description="Indicates whether the vulnerable code snippet contains a security flaw."
    )

def get_analysis_prompt(code_snippet: str, cwe_info = None):
    prompt = f"""
You are an expert code analyst specializing in automotive software security.
Your task is to analyze the provided code snippet and determine if it contains a security flaw.
The code snippet is in C / C++ programming language.
Here is the code snippet:
```
{code_snippet}
```
Provide a step by step thought process of how you arrived at your conclusion. Pretend that you did not know the problem from the beginning, but reached the conclusion after step by step analysis.
"""
    if cwe_info:
        prompt += f"""
Hint:
The code is vulnerable to {cwe_info}.
Pretend that you did not know the problem from the beginning, but reached the conclusion after step by step analysis.
"""
    return prompt.strip()

def analyze_code(code_snippet: str, cwe_info = None) -> AnalysisResponse:
    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash-preview-05-20",
            contents=get_analysis_prompt(code_snippet, cwe_info),
            config={
                "response_mime_type": "application/json",
                "response_schema": AnalysisResponse,
            },
        )
        return response.parsed
    except genai.errors.ServerError as e:
        print(f"Server error: {e}")
        return None


In [44]:
vulnerable_code = examples[15]['vulnerable_code']
analysis = analyze_code(vulnerable_code, hash_map["119"])
if analysis:
    print(f"Vulnerable code snippet analysis for CWE-119:")
    print(f"{vulnerable_code}")
    print("===================================================")
    print("Analysis Response:")
    print("Code Analysis:")
    print(analysis.code_analysis)
    print(f"Is Vulnerable: {analysis.is_vulnerable}")

Vulnerable code snippet analysis for CWE-119:
#include <stdint.h>
#include <string.h>

#define MAX_PACKET_SIZE 256

#define ENGINE_CONFIG_SIZE 64
#define TRANSMISSION_CONFIG_SIZE 32
#define ADAS_CONFIG_SIZE 128

typedef enum {
    MODULE_ENGINE = 1,
    MODULE_TRANSMISSION = 2,
    MODULE_ADAS = 3
} VehicleModuleId;

typedef struct {
    VehicleModuleId moduleId;
    uint16_t configLength; 
    uint8_t configData[MAX_PACKET_SIZE - sizeof(VehicleModuleId) - sizeof(uint16_t)];
} ModuleConfigPacket;

static uint8_t g_engineConfig[ENGINE_CONFIG_SIZE];
static uint8_t g_transmissionConfig[TRANSMISSION_CONFIG_SIZE];
static uint8_t g_adasConfig[ADAS_CONFIG_SIZE];

static uint16_t g_engineConfigCurrentLen = 0;
static uint16_t g_transmissionConfigCurrentLen = 0;
static uint16_t g_adasConfigCurrentLen = 0;

void initModuleConfigs() {
    memset(g_engineConfig, 0, ENGINE_CONFIG_SIZE);
    memset(g_transmissionConfig, 0, TRANSMISSION_CONFIG_SIZE);
    memset(g_adasConfig, 0, ADAS_CONFIG_SIZE);
    