In [1]:
!pip install --upgrade openai gradio pymongo

Collecting openai
  Downloading openai-1.83.0-py3-none-any.whl.metadata (25 kB)
Collecting gradio
  Downloading gradio-5.32.1-py3-none-any.whl.metadata (16 kB)
Collecting pymongo
  Downloading pymongo-4.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting aiofiles<25.0,>=22.0 (from gradio)
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.6.0-py3-none-any.whl.metadata (2.9 kB)
Collecting gradio-client==1.10.2 (from gradio)
  Downloading gradio_client-1.10.2-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-p

In [2]:
pip install pymongo certifi




LLM-SETUP

In [3]:
import openai

# Your API key
api_key =   # Replace with your OpenAI key

client = openai.OpenAI(api_key=api_key)

# Function to query ChatGPT
def query_chatgpt(prompt, model="gpt-4", temperature=0.7):
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt}
            ],
            temperature=temperature,
            max_tokens=1024
        )
        return response.choices[0].message.content
    except Exception as e:
        return f"Error: {str(e)}"

prompt = f"""what is can bus """
print(query_chatgpt(prompt))


CAN bus, which stands for Controller Area Network bus, is a communication standard that allows microcontrollers and devices to communicate with each other within a vehicle without a host computer. It was developed by Bosch in the 1980s and is primarily used in automotive and industrial applications for connection and communication between different control systems. It's known for its robustness, efficiency, and flexibility, allowing electronic components to communicate in a manner that reduces wiring and complexity.


this code is for prompt checking and can use to generate attack tree

In [12]:
import openai
from openai import OpenAI
import gradio as gr
from pymongo import MongoClient
from datetime import datetime
import re
import csv
import os
import pandas as pd
from collections import defaultdict, deque

# API & DB Config
OPENAI_API_KEY = #replace with api key
MONGODB_URI =  #replace with uri

client_ai = OpenAI(api_key=OPENAI_API_KEY)
mongo_client = MongoClient(MONGODB_URI)
db = mongo_client["threat_db"]
collection = db["attack_trees"]

EXPORT_DIR = "csv_exports"
os.makedirs(EXPORT_DIR, exist_ok=True)

def parse_mermaid_to_named_edges(mermaid_code):
    node_labels = {}
    edges = []

    lines = mermaid_code.splitlines()
    for line in lines:
        node_match = re.findall(r'(\w+)\[(.+?)\]', line)
        for node_id, label in node_match:
            node_labels[node_id.strip()] = label.strip()

    edge_pattern = re.compile(r'(\w+)\s*-->\s*(\w+)')
    for line in lines:
        match = edge_pattern.search(line)
        if match:
            parent_id = match.group(1).strip()
            child_id = match.group(2).strip()
            parent_label = node_labels.get(parent_id, parent_id)
            child_label = node_labels.get(child_id, child_id)
            edges.append((parent_label, child_label))

    return edges

def build_ordered_paths(edges):
    tree = defaultdict(list)
    indegree = defaultdict(int)
    for parent, child in edges:
        tree[parent].append(child)
        indegree[child] += 1

    roots = set(tree.keys()) - set(indegree.keys())
    if not roots:
        return []

    root = list(roots)[0]  # Choose first root
    paths = []
    queue = deque([(root, [root])])

    while queue:
        node, path = queue.popleft()
        if node not in tree:
            paths.append(path)
        else:
            for child in tree[node]:
                queue.append((child, path + [child]))

    return paths

def export_structured_csv(prompt, paths):
    filename = f"{prompt[:30].replace(' ', '_').replace('/', '_')}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    filepath = os.path.join(EXPORT_DIR, filename)

    with open(filepath, mode='w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(["Surface Goal", "Attack Vector", "Technique", "Method", "Path"])

        for path in paths:
            # Fill to 4 levels max: Surface Goal, Vector, Technique, Method
            row = path[:4] + [" > ".join(path)]
            while len(row) < 5:
                row.insert(len(row)-1, "")
            writer.writerow(row)

    return filepath

def read_csv_as_dataframe(csv_path):
    try:
        return pd.read_csv(csv_path)
    except Exception:
        return pd.DataFrame(columns=["Surface Goal", "Attack Vector", "Technique", "Method", "Path"])

def generate_attack_tree(prompt):
    if not prompt.strip():
        return "❌ Please enter a valid prompt", pd.DataFrame()

    try:
        system_message = {
            "role": "system",
            "content": (
                "You are a cybersecurity expert. For any input threat scenario, respond with only the attack tree in valid Mermaid syntax using 'graph TD'. "
                "Do not include any explanation, just output:\n```mermaid\ngraph TD\n...\n```"
            )
        }

        response = client_ai.chat.completions.create(
            model="gpt-4-turbo",
            messages=[system_message, {"role": "user", "content": prompt}],
            temperature=0.3,
            max_tokens=1000
        )

        output = response.choices[0].message.content.strip()
        if output.startswith("```mermaid"):
            output = output.replace("```mermaid", "").replace("```", "").strip()

        collection.update_one(
            {"prompt": prompt},
            {"$set": {"mermaid_code": output, "updated_at": datetime.utcnow()}},
            upsert=True
        )

        edges = parse_mermaid_to_named_edges(output)
        paths = build_ordered_paths(edges)
        csv_path = export_structured_csv(prompt, paths)
        df = read_csv_as_dataframe(csv_path)

        return f"```mermaid\n{output}\n```", df

    except Exception as e:
        return f"❌ Error: {str(e)}", pd.DataFrame()

# Gradio UI
with gr.Blocks() as demo:
    gr.Markdown("## 🚗 Threat Tree Generator for Vehicle ECUs")
    prompt_input = gr.Textbox(label="📝 Enter Threat Prompt", lines=5, placeholder="e.g. Generate attack tree for CAN bus")
    mermaid_output = gr.Markdown(label="📌 Mermaid Diagram")
    csv_table = gr.Dataframe(headers=["Surface Goal", "Attack Vector", "Technique", "Method", "Path"], datatype=["str"]*5, interactive=False)

    generate_button = gr.Button("🚀 Generate & Save")
    generate_button.click(fn=generate_attack_tree, inputs=prompt_input, outputs=[mermaid_output, csv_table])

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://a4c6a10343dc9517a0.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Stored 50 attack surface prompts with label and aliases

In [9]:
import ssl
import certifi
import json
from pymongo import MongoClient

# MongoDB Atlas connection URI
MONGODB_URI = #replace with uri

# Connect to MongoDB
client = MongoClient(
    MONGODB_URI,
    tls=True,
    tlsCAFile=certifi.where()
)

# Select database and collection
db = client["threat_db"]
prompt_library = db["prompt_library"]

# 1. Predefined prompt templates with aliases
predefined_prompts = [
    {
        "label": "can bus injection",
        "aliases": ["can bus", "controller area network", "can bus attack", "bus injection"],
        "prompt": (
            """Generate an attack tree for the Controller Area Network (CAN) bus in an automotive system. The root node should represent a successful attack on the CAN bus.

The first level of branches should include:
- 'Message Injection',
- 'Denial of Service',
- 'Spoofing',
- 'Eavesdropping',
- 'Fault Injection',
- 'Firmware Manipulation',
- 'Physical Access'.

Use Mermaid format with 'graph TD' syntax.

For each branch, expand with at least two sub-branches on the second level, and for each of those sub-branches, add at least two further sub-branches on the third level, resulting in a three-level hierarchy in total.

Specifically:

- For 'Message Injection', include sub-branches 'Replay Attack' and 'Arbitrary Message Injection'. Then expand 'Replay Attack' with two sub-branches: 'Capturing Packets' and 'Resending Packets'. Expand 'Arbitrary Message Injection' with 'Crafting Messages' and 'Injecting Malicious Commands'.

- For 'Denial of Service', include 'Bus Flooding' and 'Error Frame Injection'. Expand 'Bus Flooding' with 'Continuous Message Sending' and 'Resource Exhaustion'. Expand 'Error Frame Injection' with 'Error Frame Flood' and 'Bus Off State'.

- For 'Spoofing', include 'ID Spoofing' and 'Timing Spoofing'. Expand 'ID Spoofing' with 'Forged IDs' and 'Masquerading'. Expand 'Timing Spoofing' with 'Delay Injection' and 'Replay Timing Manipulation'.

- For 'Eavesdropping', include 'Passive Listening' and 'Data Capture'. Expand 'Passive Listening' with 'Bus Monitoring' and 'Signal Interception'. Expand 'Data Capture' with 'Message Logging' and 'Packet Analysis'.

- For 'Fault Injection', include 'Voltage Manipulation' and 'Clock Glitching'. Expand 'Voltage Manipulation' with 'Power Supply Interruption' and 'Voltage Spike'. Expand 'Clock Glitching' with 'Clock Signal Interference' and 'Timing Violation'.

- For 'Firmware Manipulation', include 'Malicious Firmware Update' and 'Firmware Downgrade'. Expand 'Malicious Firmware Update' with 'Tampered Firmware File' and 'OTA Exploitation'. Expand 'Firmware Downgrade' with 'Rollback Exploit' and 'Signature Bypass'.

- For 'Physical Access', include 'OBD-II Port Exploit' and 'ECU Extraction'. Expand 'OBD-II Port Exploit' with 'Sniffing Traffic' and 'Sending Commands'. Expand 'ECU Extraction' with 'Direct Flash Access' and 'Hardware Debugging Interface'.

Make sure:
- All node labels are short and readable (no long sentences)
- Use unique node IDs or Mermaid-friendly naming (avoid duplicate text nodes)
- No overlapping concepts
- Output is valid Mermaid syntax

Output the full attack tree **only** in valid Mermaid code, wrapped in triple backticks like this:

```mermaid
graph TD
"""

        )
    }
]

# 2. Load additional prompts from a local JSON file (if provided)
def load_prompts_from_file(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            else:
                print("⚠️ JSON file must contain a list of prompts.")
                return []
    except Exception as e:
        print(f"❌ Error reading JSON file: {e}")
        return []

# 3. Merge predefined prompts with JSON prompts
json_prompts = load_prompts_from_file("ATT2.json")  # Update path if needed
all_prompts = predefined_prompts + json_prompts

# 4. Insert or update all prompts
for item in all_prompts:
    if "label" in item and "prompt" in item:
        prompt_library.update_one(
            {"label": item["label"]},
            {"$set": item},
            upsert=True
        )
    else:
        print(f"⚠️ Skipping invalid prompt entry: {item}")

print("✅ Prompt library with aliases populated successfully.")


✅ Prompt library with aliases populated successfully.


Ui+created library in csv form

In [17]:
# ========================
# 📦 Library Imports
# ========================
import openai
from openai import OpenAI
import gradio as gr
from pymongo import MongoClient
from datetime import datetime
import re
import csv
import os
import pandas as pd
from collections import defaultdict, deque

# ========================
# 🔐 API & DB Config
# ========================
OPENAI_API_KEY = #replace with api key
MONGODB_URI =  #replace with uri

client_ai = OpenAI(api_key=OPENAI_API_KEY)
mongo_client = MongoClient(MONGODB_URI)
db = mongo_client["threat_db"]
attack_tree_collection = db["attack_trees"]
prompt_library = db["prompt_library"]

EXPORT_DIR = "csv_exports"
os.makedirs(EXPORT_DIR, exist_ok=True)

# ========================
# 🟢 Updated Parsing + CSV Logic from second code snippet
# ========================

def parse_mermaid_to_named_edges(mermaid_code):
    node_labels = {}
    edges = []

    lines = mermaid_code.splitlines()
    for line in lines:
        node_match = re.findall(r'(\w+)\[(.+?)\]', line)
        for node_id, label in node_match:
            node_labels[node_id.strip()] = label.strip()

    edge_pattern = re.compile(r'(\w+)\s*-->\s*(\w+)')
    for line in lines:
        match = edge_pattern.search(line)
        if match:
            parent_id = match.group(1).strip()
            child_id = match.group(2).strip()
            parent_label = node_labels.get(parent_id, parent_id)
            child_label = node_labels.get(child_id, child_id)
            edges.append((parent_label, child_label))

    return edges

def build_ordered_paths(edges):
    tree = defaultdict(list)
    indegree = defaultdict(int)
    for parent, child in edges:
        tree[parent].append(child)
        indegree[child] += 1

    roots = set(tree.keys()) - set(indegree.keys())
    if not roots:
        return []

    root = list(roots)[0]  # Take first root
    paths = []
    queue = deque([(root, [root])])

    while queue:
        node, path = queue.popleft()
        if node not in tree:
            paths.append(path)
        else:
            for child in tree[node]:
                queue.append((child, path + [child]))

    return paths

def export_structured_csv(label, paths):
    safe_label = label[:30].replace(' ', '_').replace('/', '_')
    filename = f"{safe_label}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"
    filepath = os.path.join(EXPORT_DIR, filename)

    with open(filepath, mode='w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(["Surface Goal", "Attack Vector", "Technique", "Method", "Path"])

        for path in paths:
            # Fill max 4 columns, pad if less
            row = path[:4] + [" > ".join(path)]
            while len(row) < 5:
                row.insert(len(row) - 1, "")
            writer.writerow(row)

    return filepath

def read_csv_as_dataframe(filepath):
    try:
        df = pd.read_csv(filepath)
        df.drop_duplicates(subset=["Path"], inplace=True)
        return df
    except Exception:
        return pd.DataFrame(columns=["Surface Goal", "Attack Vector", "Technique", "Method", "Path"])

# ========================
# Updated load_saved_attack_tree for Tab 2 using new logic
# ========================

def load_saved_attack_tree(label):
    if not label:
        return "❌ No label provided.", pd.DataFrame(), None

    doc = attack_tree_collection.find_one({"label": label})

    if not doc:
        # Try alias lookup in prompt_library
        alias_doc = prompt_library.find_one({"aliases": {"$in": [label.lower()]}})
        if alias_doc:
            canonical_label = alias_doc["label"]
            doc = attack_tree_collection.find_one({"label": canonical_label})

    if not doc or "mermaid_code" not in doc:
        return "❌ No stored attack tree found.", pd.DataFrame(), None

    mermaid_code = doc["mermaid_code"]
    edges = parse_mermaid_to_named_edges(mermaid_code)
    paths = build_ordered_paths(edges)
    csv_path = export_structured_csv(doc["label"], paths)
    df = read_csv_as_dataframe(csv_path)

    return f"```mermaid\n{mermaid_code}\n```", df, csv_path


# ========================
# 🤖 Generate Tree from Prompt (Unchanged, original first tab logic)
# ========================

def generate_attack_tree_from_label(label_selected):
    if not label_selected:
        return "❌ Select a threat scenario."

    # Try exact match on label
    doc = prompt_library.find_one({"label": label_selected})

    # If not found, try alias match (case-insensitive)
    if not doc:
        doc = prompt_library.find_one({"aliases": {"$in": [label_selected.lower()]}})

    if not doc or "prompt" not in doc:
        return f"❌ No prompt or alias found for '{label_selected}'"

    matched_prompt = doc["prompt"]
    label_to_save = doc["label"]  # Use canonical label for saving

    try:
        system_message = {
            "role": "system",
            "content": "You are a cybersecurity expert. Return only the attack tree in Mermaid format using:\nmermaid\ngraph TD\n...\nStructure the tree logically using OR/AND logic nodes and hierarchical breakdown of threats starting from surface goal > attack vector > attack method."
        }

        response = client_ai.chat.completions.create(
            model="gpt-4-turbo",
            messages=[system_message, {"role": "user", "content": matched_prompt}],
            temperature=0.3,
            max_tokens=1000
        )

        mermaid_code = response.choices[0].message.content.strip()
        if mermaid_code.startswith("mermaid"):
            mermaid_code = mermaid_code.replace("mermaid", "").strip()

        attack_tree_collection.update_one(
            {"label": label_to_save},
            {"$set": {
                "prompt": matched_prompt,
                "mermaid_code": mermaid_code,
                "updated_at": datetime.utcnow()
            }},
            upsert=True
        )

        return f"mermaid\n{mermaid_code}\n"
    except Exception as e:
        return f"❌ Error: {str(e)}"


# ========================
# 🌐 Gradio UI with two tabs
# ========================

with gr.Blocks() as demo:
    with gr.Tab("🧠 Generate Attack Tree"):
        gr.Markdown("### 🔐 attack tree")

        label_dropdown = gr.Dropdown(
            choices=sorted([doc["label"] for doc in prompt_library.find({}, {"label": 1, "_id": 0}) if "label" in doc]),
            label="📌 Select or Type",
            interactive=True,
            allow_custom_value=True
        )
        generate_button = gr.Button("🚀 Generate Attack Tree")
        mermaid_display = gr.Markdown(label="📈 Generated Attack Tree")

        generate_button.click(
            fn=generate_attack_tree_from_label,
            inputs=label_dropdown,
            outputs=mermaid_display
        )

    with gr.Tab("📂 Library"):
        gr.Markdown("### 📉 View and Export Structured Threat Trees")

        saved_dropdown = gr.Dropdown(
            choices=sorted(set([doc["label"] for doc in attack_tree_collection.find({}, {"label": 1, "_id": 0}) if "label" in doc])),
            label="📌 Select or Type Stored Tree",
            interactive=True,
            allow_custom_value=True
        )
        mermaid_output = gr.Markdown(label="📈 Saved Attack Tree")
        relation_table = gr.Dataframe(headers=["Surface Goal", "Attack Vector", "Technique", "Method", "Path"], datatype=["str"]*5, interactive=False)
        download_button = gr.File(label="📥 Download CSV")
        regen_button = gr.Button("🔄 Regenerate Tree from Prompt")

        def wrapper_load(label):
            if not label:
                return "❌ Select a saved attack tree.", pd.DataFrame(columns=["Surface Goal", "Attack Vector", "Technique", "Method", "Path"]), None
            mermaid, df, csv_path = load_saved_attack_tree(label)
            return mermaid, df, csv_path

        saved_dropdown.change(
            fn=wrapper_load,
            inputs=saved_dropdown,
            outputs=[mermaid_output, relation_table, download_button]
        )

        regen_button.click(
            fn=generate_attack_tree_from_label,
            inputs=saved_dropdown,
            outputs=mermaid_output
        )

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://4f8b42c1a28812c015.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


