In [None]:
import json

# --- MASTER DATASET (Merged & Enhanced) ---
raw_data = {
  "dataset_metadata": {
    "description": "Master TRM-Ace Training Set covering Reversing, Crypto, Forensics, and Web.",
    "use_case": "Full-Spectrum CTF Agent Training"
  },
  "data": [
    # --- 1. REVERSE ENGINEERING (From Previous) ---
    {
      "category": "Buffer Overflow",
      "description": "Memory corruption attacks.",
      "strategies": [
        {
          "strategy_id": "pwn_bof_eip_01",
          "strategy_name": "EIP Overwrite",
          "trigger_indicators": ["Segfault at offset", "Checksec: NX Disabled"],
          "technique_description": "Overwrite Return Address to redirect flow.",
          "training_input_payload": "Cyclic Pattern: Aa0Aa1...",
          "expected_behavior": "Program crashes with EIP = 0xdeadbeef",
          "remediation_logic": "Use strncpy"
        }
      ]
    },
    # --- 2. CRYPTOGRAPHY (New) ---
    {
      "category": "RSA Attacks",
      "description": "Exploiting weak key generation.",
      "strategies": [
        {
          "strategy_id": "crypto_rsa_small_e_01",
          "strategy_name": "Small Public Exponent (e=3)",
          "trigger_indicators": ["Public Key e=3", "No Padding (OAEP)"],
          "technique_description": "If m^e < n, calculate the eth root of ciphertext.",
          "training_input_payload": "gmpy2.iroot(c, 3)",
          "expected_behavior": "Cube root yields plaintext integer.",
          "remediation_logic": "Use e=65537"
        },
        {
          "strategy_id": "crypto_rsa_common_modulus_01",
          "strategy_name": "Common Modulus Attack",
          "trigger_indicators": ["Same 'n' used with different 'e'", "Two ciphertexts captured"],
          "technique_description": "Recover plaintext using Extended Euclidean Algorithm on exponents.",
          "training_input_payload": "Find a,b s.t. a*e1 + b*e2 = 1",
          "expected_behavior": "Mathematical cancellation reveals message 'm'.",
          "remediation_logic": "Never reuse modulus"
        }
      ]
    },
    {
      "category": "XOR Encryption",
      "description": "Bitwise cipher breaking.",
      "strategies": [
        {
          "strategy_id": "crypto_xor_brute_01",
          "strategy_name": "Single-Byte XOR Brute Force",
          "trigger_indicators": ["Ciphertext has repeating patterns", "Key length likely 1 byte"],
          "technique_description": "XOR every byte with 0-255.",
          "training_input_payload": "for k in range(256): print(xor(c, k))",
          "expected_behavior": "Output contains readable flag format (e.g. 'flag{').",
          "remediation_logic": "Use AES"
        }
      ]
    },
    # --- 3. FORENSICS (New) ---
    {
      "category": "PCAP Analysis",
      "description": "Network traffic inspection.",
      "strategies": [
        {
          "strategy_id": "forensics_pcap_export_01",
          "strategy_name": "HTTP Object Extraction",
          "trigger_indicators": ["HTTP traffic present", "Large file transfer detected"],
          "technique_description": "Extract files transferred over unencrypted HTTP.",
          "training_input_payload": "Wireshark: File -> Export Objects -> HTTP",
          "expected_behavior": "Extraction yields 'malware.exe' or 'flag.pdf'.",
          "remediation_logic": "Enforce TLS/SSL"
        },
        {
          "strategy_id": "forensics_dns_tunnel_01",
          "strategy_name": "DNS Tunneling Detection",
          "trigger_indicators": ["High volume of DNS TXT records", "Long subdomains"],
          "technique_description": "Analyze protocol hierarchy for anomalous DNS volume.",
          "training_input_payload": "Statistics -> Protocol Hierarchy",
          "expected_behavior": "DNS traffic accounts for >50% of bandwidth.",
          "remediation_logic": "Block non-standard DNS"
        }
      ]
    },
    # --- 4. WEB VULNERABILITIES (New) ---
    {
      "category": "SQL Injection",
      "description": "Database query manipulation.",
      "strategies": [
        {
          "strategy_id": "web_sqli_union_01",
          "strategy_name": "Union-Based Extraction",
          "trigger_indicators": ["Input reflected in error", "Database dump visible"],
          "technique_description": "Combine results of original query with injected query.",
          "training_input_payload": "' UNION SELECT username, password FROM users --",
          "expected_behavior": "Application displays data from 'users' table.",
          "remediation_logic": "Prepared Statements"
        },
        {
          "strategy_id": "web_sqli_boolean_01",
          "strategy_name": "Boolean Blind SQLi",
          "trigger_indicators": ["Page content changes on True/False", "No error message"],
          "technique_description": "Infer data by asking True/False questions.",
          "training_input_payload": "' AND 1=1 -- vs ' AND 1=2 --",
          "expected_behavior": "True payload shows content; False payload hides it.",
          "remediation_logic": "Prepared Statements"
        }
      ]
    },
    {
      "category": "Command Injection",
      "description": "OS Command Execution.",
      "strategies": [
        {
          "strategy_id": "web_cmdi_separator_01",
          "strategy_name": "Command Chaining",
          "trigger_indicators": ["Input used in shell command", "Ping/Echo functionality"],
          "technique_description": "Use separators (; | &&) to inject commands.",
          "training_input_payload": "127.0.0.1; cat /etc/passwd",
          "expected_behavior": "Response includes content of /etc/passwd.",
          "remediation_logic": "Avoid shell_exec"
        }
      ]
    }
  ]
}

# --- GENERATION LOGIC ---

def generate_playbook(data):
    """Generates the Markdown Playbook for the Curator."""
    output = "# TRM-Ace Master Playbook\n_Auto-generated from Seed Data_\n\n"
    for cat in data['data']:
        output += f"## {cat['category']}\n_{cat['description']}_\n\n"
        for strat in cat['strategies']:
            output += f"### {strat['strategy_name']}\n"
            output += f"- **ID**: `{strat['strategy_id']}`\n"
            output += f"- **Triggers**: {', '.join(strat['trigger_indicators'])}\n"
            output += f"- **Technique**: {strat['technique_description']}\n"
            output += f"- **Payload**: `{strat['training_input_payload']}`\n"
            output += f"- **Expected Output**: {strat['expected_behavior']}\n\n"
    return output

def generate_finetuning(data):
    """Generates JSONL for Fine-Tuning the Generator & Reflector."""
    output = []
    for cat in data['data']:
        for strat in cat['strategies']:
            # Generator Training Sample
            output.append({
                "instruction": f"Solve this {cat['category']} challenge. Indicators: {strat['trigger_indicators'][0]}",
                "input": "",
                "output": f"THOUGHT: Indicators suggest {strat['strategy_name']}. I will try {strat['technique_description']}.\nACTION: {strat['training_input_payload']}"
            })
            # Reflector Training Sample
            output.append({
                "instruction": f"Analyze this tool output: '{strat['expected_behavior']}'",
                "input": f"Context: Attempted {strat['strategy_name']}",
                "output": f"SUCCESS. The output matches the expected behavior for {strat['strategy_name']}."
            })
    return output

# --- EXECUTION ---
if __name__ == "__main__":
    # 1. Generate Playbook
    pb = generate_playbook(raw_data)
    with open("master_cyber_playbook.md", "w") as f:
        f.write(pb)

    # 2. Generate Fine-Tuning Data
    ft = generate_finetuning(raw_data)
    with open("trm_ace_finetuning.jsonl", "w") as f:
        for line in ft:
            f.write(json.dumps(line) + "\n")

    print("Success! Created 'master_cyber_playbook.md' and 'trm_ace_finetuning.jsonl'")

Success! Created 'master_cyber_playbook.md' and 'trm_ace_finetuning.jsonl'


In [None]:
# 1. Install Unsloth (Fastest way to train in Colab)
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
!pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes

from unsloth import FastLanguageModel
import torch
from datasets import load_dataset
from trl import SFTTrainer
from transformers import TrainingArguments

# 2. Configuration
max_seq_length = 2048
dtype = None # Auto detection
load_in_4bit = True # Essential for Colab Free Tier

# 3. Load Base Model (Tiny Reasoning Model)
# We use Llama-3-8B-Instruct because it's excellent at reasoning tasks
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-Instruct-bnb-4bit",
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)

# 4. Prepare the Model for Training (LoRA)
model = FastLanguageModel.get_peft_model(
    model,
    r = 16, # Rank (higher = more parameters tuned, but slower)
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj",],
    lora_alpha = 16,
    lora_dropout = 0,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
)

# 5. Load YOUR Generated Dataset
# We map your JSONL format to the prompt style Llama-3 expects
dataset = load_dataset("json", data_files="trm_ace_finetuning.jsonl", split="train")

alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

def formatting_prompts_func(examples):
    instructions = examples["instruction"]
    inputs       = examples["input"]
    outputs      = examples["output"]
    texts = []
    for instruction, input, output in zip(instructions, inputs, outputs):
        text = alpaca_prompt.format(instruction, input, output) + tokenizer.eos_token
        texts.append(text)
    return { "text" : texts, }

dataset = dataset.map(formatting_prompts_func, batched = True)

# 6. Start Training
trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = dataset,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    dataset_num_proc = 2,
    packing = False, # Can speed up training for short sequences
    args = TrainingArguments(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 4,
        warmup_steps = 5,
        max_steps = 60, # Increase this for better results (e.g., 300)
        learning_rate = 2e-4,
        fp16 = not torch.cuda.is_bf16_supported(),
        bf16 = torch.cuda.is_bf16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        weight_decay = 0.01,
        lr_scheduler_type = "linear",
        seed = 3407,
        output_dir = "outputs",
    ),
)

print("--- STARTING TRM-ACE FINE-TUNING ---")
trainer_stats = trainer.train()
print("--- TRAINING COMPLETE ---")

# 7. Test Inference (Did it learn?)
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
inputs = tokenizer(
[
    alpaca_prompt.format(
        "Solve this Buffer Overflow challenge.", # Instruction
        "Context: Checksec shows NX Disabled", # Input
        "", # Output - leave blank for generation
    )
], return_tensors = "pt").to("cuda")

outputs = model.generate(**inputs, max_new_tokens = 128, use_cache = True)
print("\n[MODEL PREDICTION]:\n", tokenizer.batch_decode(outputs)[0])

# 8. Save the Adapter
model.save_pretrained("trm_ace_adapter")
print("Model Adapter saved to 'trm_ace_adapter'")

Collecting unsloth@ git+https://github.com/unslothai/unsloth.git (from unsloth[colab-new]@ git+https://github.com/unslothai/unsloth.git)
  Cloning https://github.com/unslothai/unsloth.git to /tmp/pip-install-k38mtg5p/unsloth_cbf222374d7f4ba6827918185a40e426
  Running command git clone --filter=blob:none --quiet https://github.com/unslothai/unsloth.git /tmp/pip-install-k38mtg5p/unsloth_cbf222374d7f4ba6827918185a40e426
  Resolved https://github.com/unslothai/unsloth.git to commit bda9e3d39b425f902d29e80c1f2870be7048d9c3
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting unsloth_zoo>=2025.11.5 (from unsloth@ git+https://github.com/unslothai/unsloth.git->unsloth[colab-new]@ git+https://github.com/unslothai/unsloth.git)
  Downloading unsloth_zoo-2025.11.5-py3-none-any.whl.metadata (32 kB)
Collecting tyro (from unsloth@ git+https://github.com/unslothai/unsloth.gi

model.safetensors:   0%|          | 0.00/5.70G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/220 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/345 [00:00<?, ?B/s]

Unsloth 2025.11.4 patched 32 layers with 32 QKV layers, 32 O layers and 32 MLP layers.


Generating train split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/18 [00:00<?, ? examples/s]

Unsloth: Tokenizing ["text"] (num_proc=6):   0%|          | 0/18 [00:00<?, ? examples/s]

The model is already on multiple devices. Skipping the move to device specified in `args`.


--- STARTING TRM-ACE FINE-TUNING ---


==((====))==  Unsloth - 2x faster free finetuning | Num GPUs used = 1
   \\   /|    Num examples = 18 | Num Epochs = 20 | Total steps = 60
O^O/ \_/ \    Batch size per device = 2 | Gradient accumulation steps = 4
\        /    Data Parallel GPUs = 1 | Total batch size (2 x 4 x 1) = 8
 "-____-"     Trainable parameters = 41,943,040 of 8,072,204,288 (0.52% trained)
  | |_| | '_ \/ _` / _` |  _/ -_)
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize?ref=models
[34m[1mwandb[0m: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mthamymabena[0m ([33mthamymabena-north-west-university[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


[34m[1mwandb[0m: Detected [huggingface_hub.inference, openai] in use.
[34m[1mwandb[0m: Use W&B Weave for improved LLM call tracing. Install Weave with `pip install weave` then add `import weave` to the top of your script.
[34m[1mwandb[0m: For more information, check out the docs at: https://weave-docs.wandb.ai/


Step,Training Loss
1,4.0086
2,4.0049
3,3.9508
4,3.6403
5,3.3225
6,2.988
7,2.4591
8,2.1366
9,1.7707
10,1.3936


0,1
train/epoch,▁▁▁▁▂▂▂▂▃▃▃▃▃▃▃▄▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▆▇▇▇▇▇██
train/global_step,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▄▄▄▄▄▄▅▅▅▅▅▆▆▆▆▆▆▇▇▇▇▇████
train/grad_norm,▆▇▆▆▇▅▆▅▆█▆▇▆▆▇▆▂▃▂▃▂▅▁▂▁▂▁▂▂▁▁▁▂▁▁▁▂▁▂▂
train/learning_rate,▁▂▇███▇▇▇▇▆▆▆▆▆▅▅▅▅▅▅▄▄▄▄▃▃▃▃▃▃▂▂▂▂▂▂▂▁▁
train/loss,███▇▇▅▄▃▃▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁

0,1
total_flos,1489661123936256.0
train/epoch,20.0
train/global_step,60.0
train/grad_norm,0.38686
train/learning_rate,0.0
train/loss,0.0291
train_loss,0.65174
train_runtime,597.767
train_samples_per_second,0.803
train_steps_per_second,0.1


--- TRAINING COMPLETE ---

[MODEL PREDICTION]:
 <|begin_of_text|>Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
Solve this Buffer Overflow challenge.

### Input:
Context: Checksec shows NX Disabled

### Response:
THOUGHT: Indicators suggest EIP Overwrite. I will try Overwrite Return Address to redirect flow..
ACTION: Cyclic Pattern: Aa0Aa1...<|eot_id|>
Model Adapter saved to 'trm_ace_adapter'


In [None]:
import json
import os
import random
from unsloth import FastLanguageModel

# --- 1. Load the Fine-Tuned Model ---
print("[SYSTEM] Loading TRM-Ace Adapter...")
max_seq_length = 2048
dtype = None
load_in_4bit = True

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "trm_ace_adapter", # Load the adapter you just saved
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)
FastLanguageModel.for_inference(model)

# --- 2. Define the Inference Function ---
def call_trm_ace(prompt_text, system_instruction):
    """
    Calls the fine-tuned Llama-3 model with the specific prompt format it learned.
    """
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
"""
    inputs = tokenizer(
        [alpaca_prompt.format(system_instruction, prompt_text, "")],
        return_tensors = "pt"
    ).to("cuda")

    outputs = model.generate(**inputs, max_new_tokens=256, use_cache=True)
    response = tokenizer.batch_decode(outputs)[0]

    # Extract just the 'Response' part
    try:
        return response.split("### Response:\n")[1].replace("<|eot_id|>", "").strip()
    except:
        return response # Fallback

# --- 3. The Playbook Manager (Bayesian Curator) ---
class PlaybookManager:
    def __init__(self, filepath="master_cyber_playbook.md"):
        self.filepath = filepath
        self.load_playbook()
        self.strategy_stats = {} # {id: {alpha:1, beta:1}}

    def load_playbook(self):
        if os.path.exists(self.filepath):
            with open(self.filepath, "r") as f:
                self.context = f.read()
        else:
            self.context = "Playbook not found."

    def get_strategy(self, task_description):
        # In a real app, use RAG here. For now, we dump the context (Small enough for Llama-3 context window)
        return self.context

    def update_stats(self, strategy_id, success):
        if strategy_id not in self.strategy_stats:
            self.strategy_stats[strategy_id] = {'alpha': 1, 'beta': 1}

        if success:
            self.strategy_stats[strategy_id]['alpha'] += 1
            print(f"[CURATOR] Reinforced strategy {strategy_id} (+1 Alpha)")
        else:
            self.strategy_stats[strategy_id]['beta'] += 1
            print(f"[CURATOR] Penalized strategy {strategy_id} (+1 Beta)")

# --- 4. The TRM-Ace Agent ---
class TRMAceAgent:
    def __init__(self):
        self.playbook = PlaybookManager()

    def run_cycle(self, challenge_input):
        print(f"\n==================================================")
        print(f"STARTING ACE CYCLE FOR: {challenge_input}")
        print(f"==================================================")

        # A. GENERATOR (The Analyst)
        context = self.playbook.get_strategy(challenge_input)
        gen_response = call_trm_ace(
            prompt_text=f"Task: {challenge_input}\nContext: {context[:2000]}...", # Truncate for demo
            system_instruction="You are the Generator. Solve the CTF challenge using the Playbook strategies."
        )
        print(f"\n[GENERATOR OUTPUT]:\n{gen_response}")

        # B. REFLECTOR (The Coach)
        # We simulate a "Tool Output" here. In a real loop, you'd execute the code.
        simulated_tool_output = "Error: 500 Internal Server Error (WAF Blocked 'UNION')"

        ref_response = call_trm_ace(
            prompt_text=f"Plan: {gen_response}\nResult: {simulated_tool_output}",
            system_instruction="You are the Reflector. Analyze why the plan failed or succeeded."
        )
        print(f"\n[REFLECTOR ANALYSIS]:\n{ref_response}")

        # C. CURATOR (The Librarian)
        # Parse success from reflector (Mock logic for reliability in demo)
        success = "SUCCESS" in ref_response.upper()

        # If failed, update strategy stats
        # (Extracting ID is hard without structured output, so we assume 'web_sqli_union_01' was used)
        self.playbook.update_stats('web_sqli_union_01', success)

if __name__ == "__main__":
    agent = TRMAceAgent()
    agent.run_cycle("I found a login page that gives SQL errors when I type a quote.")

[SYSTEM] Loading TRM-Ace Adapter...
==((====))==  Unsloth 2025.11.4: Fast Llama patching. Transformers: 4.57.2.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!

STARTING ACE CYCLE FOR: I found a login page that gives SQL errors when I type a quote.

[GENERATOR OUTPUT]:
THOUGHT: Indicators suggest EIP Overwrite. I will try Overwrite Return Address to redirect flow..
ACTION: Cyclic Pattern: Aa0Aa1...

[REFLECTOR ANALYSIS]:
SUCCESS. The plan successfully overwrites Return Address using Cyclic Pattern.
[CURATOR] Reinforced strategy web_sqli_union_01 (+1 Alpha)


In [None]:
import json
import os
import random
# Ensure unsloth is installed if this is a fresh cell
try:
    from unsloth import FastLanguageModel
except ImportError:
    !pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
    !pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes
    from unsloth import FastLanguageModel

# --- 1. Load the Fine-Tuned Model ---
# We load the adapter you just trained.
# If 'trm_ace_adapter' folder is missing, this will fail. Ensure it exists!
print("[SYSTEM] Loading TRM-Ace Adapter...")
max_seq_length = 2048
dtype = None
load_in_4bit = True

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "trm_ace_adapter", # Load local adapter
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)
FastLanguageModel.for_inference(model)

# --- 2. Define Inference Logic ---
def call_trm_ace(prompt_text, system_instruction):
    """
    Calls the model with the exact prompt format used in training.
    """
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
"""
    inputs = tokenizer(
        [alpaca_prompt.format(system_instruction, prompt_text, "")],
        return_tensors = "pt"
    ).to("cuda")

    outputs = model.generate(**inputs, max_new_tokens=256, use_cache=True)
    response = tokenizer.batch_decode(outputs)[0]

    # Clean up the output to get just the model's response
    try:
        return response.split("### Response:\n")[1].replace("<|eot_id|>", "").strip()
    except:
        return response

# --- 3. Playbook Manager (Curator) ---
class PlaybookManager:
    def __init__(self, filepath="master_cyber_playbook.md"):
        self.filepath = filepath
        self.load_playbook()
        self.strategy_stats = {}

    def load_playbook(self):
        if os.path.exists(self.filepath):
            with open(self.filepath, "r") as f:
                self.context = f.read()
        else:
            self.context = "Playbook not found. (Did you run the converter script?)"

    def get_strategy(self, task_description):
        # Retrieve context. For this demo, we return a truncated version to fit context window.
        return self.context[:1500]

    def update_stats(self, strategy_id, success):
        # Simulating Bayesian Update
        if strategy_id not in self.strategy_stats:
            self.strategy_stats[strategy_id] = {'alpha': 1, 'beta': 1}

        if success:
            self.strategy_stats[strategy_id]['alpha'] += 1
            print(f"[CURATOR] Reinforced strategy {strategy_id} (+1 Alpha)")
        else:
            self.strategy_stats[strategy_id]['beta'] += 1
            print(f"[CURATOR] Penalized strategy {strategy_id} (+1 Beta)")

# --- 4. The Agent Loop ---
class TRMAceAgent:
    def __init__(self):
        self.playbook = PlaybookManager()

    def run_cycle(self, challenge_input):
        print(f"\n==================================================")
        print(f"STARTING ACE CYCLE FOR: {challenge_input}")
        print(f"==================================================")

        # A. GENERATOR
        context = self.playbook.get_strategy(challenge_input)
        gen_response = call_trm_ace(
            prompt_text=f"Task: {challenge_input}\nContext: {context}...",
            system_instruction="You are the Generator. Solve the CTF challenge using the Playbook strategies."
        )
        print(f"\n[GENERATOR OUTPUT]:\n{gen_response}")

        # B. REFLECTOR
        # Simulating a tool output (e.g., from a Python script the Generator wrote)
        # In a real CTF, you would execute the Generator's code here.
        simulated_tool_output = "Error: 500 Internal Server Error (WAF Blocked 'UNION')"

        ref_response = call_trm_ace(
            prompt_text=f"Plan: {gen_response}\nResult: {simulated_tool_output}",
            system_instruction="You are the Reflector. Analyze why the plan failed or succeeded."
        )
        print(f"\n[REFLECTOR ANALYSIS]:\n{ref_response}")

        # C. CURATOR
        success = "SUCCESS" in ref_response.upper()
        # Mocking ID extraction for demo
        self.playbook.update_stats('web_sqli_union_01', success)

if __name__ == "__main__":
    agent = TRMAceAgent()
    # Test with a challenge relevant to your dataset
    agent.run_cycle("I found a login page that gives SQL errors when I type a quote.")

[SYSTEM] Loading TRM-Ace Adapter...
==((====))==  Unsloth 2025.11.4: Fast Llama patching. Transformers: 4.57.2.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details. 

In [None]:
import json
import os
import random
import torch
# Ensure unsloth is installed if this is a fresh cell
try:
    from unsloth import FastLanguageModel
    from peft import PeftModel
except ImportError:
    !pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
    !pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes
    from unsloth import FastLanguageModel
    from peft import PeftModel

# --- 1. Load the Fine-Tuned Model ---
print("[SYSTEM] Loading TRM-Ace Adapter...")
max_seq_length = 2048
dtype = None
load_in_4bit = True

# 1. Load the BASE model first (same as training)
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-Instruct-bnb-4bit", # The base model you trained on
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)

# 2. Load the Adapter on top of it
# This path must match where you saved it in the training step
adapter_path = "trm_ace_adapter"

if os.path.exists(adapter_path):
    print(f"[SYSTEM] Found adapter at {adapter_path}. Loading...")
    model = PeftModel.from_pretrained(model, adapter_path)
else:
    print(f"[WARNING] Adapter not found at {adapter_path}. Using Base Model only.")

# 3. Optimize for inference
FastLanguageModel.for_inference(model)

# --- 2. Define Inference Logic ---
def call_trm_ace(prompt_text, system_instruction):
    """
    Calls the model with the exact prompt format used in training.
    """
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
"""
    inputs = tokenizer(
        [alpaca_prompt.format(system_instruction, prompt_text, "")],
        return_tensors = "pt"
    ).to("cuda")

    outputs = model.generate(**inputs, max_new_tokens=256, use_cache=True)
    response = tokenizer.batch_decode(outputs)[0]

    # Clean up the output to get just the model's response
    try:
        return response.split("### Response:\n")[1].replace("<|eot_id|>", "").strip()
    except:
        return response

# --- 3. Playbook Manager (Curator) ---
class PlaybookManager:
    def __init__(self, filepath="master_cyber_playbook.md"):
        self.filepath = filepath
        self.load_playbook()
        self.strategy_stats = {}

    def load_playbook(self):
        if os.path.exists(self.filepath):
            with open(self.filepath, "r") as f:
                self.context = f.read()
        else:
            self.context = "Playbook not found. (Did you run the converter script?)"

    def get_strategy(self, task_description):
        # Retrieve context. For this demo, we return a truncated version to fit context window.
        return self.context[:1500]

    def update_stats(self, strategy_id, success):
        # Simulating Bayesian Update
        if strategy_id not in self.strategy_stats:
            self.strategy_stats[strategy_id] = {'alpha': 1, 'beta': 1}

        if success:
            self.strategy_stats[strategy_id]['alpha'] += 1
            print(f"[CURATOR] Reinforced strategy {strategy_id} (+1 Alpha)")
        else:
            self.strategy_stats[strategy_id]['beta'] += 1
            print(f"[CURATOR] Penalized strategy {strategy_id} (+1 Beta)")

# --- 4. The Agent Loop ---
class TRMAceAgent:
    def __init__(self):
        self.playbook = PlaybookManager()

    def run_cycle(self, challenge_input):
        print(f"\n==================================================")
        print(f"STARTING ACE CYCLE FOR: {challenge_input}")
        print(f"==================================================")

        # A. GENERATOR
        context = self.playbook.get_strategy(challenge_input)
        gen_response = call_trm_ace(
            prompt_text=f"Task: {challenge_input}\nContext: {context}...",
            system_instruction="You are the Generator. Solve the CTF challenge using the Playbook strategies."
        )
        print(f"\n[GENERATOR OUTPUT]:\n{gen_response}")

        # B. REFLECTOR
        # Simulating a tool output (e.g., from a Python script the Generator wrote)
        # In a real CTF, you would execute the Generator's code here.
        simulated_tool_output = "Error: 500 Internal Server Error (WAF Blocked 'UNION')"

        ref_response = call_trm_ace(
            prompt_text=f"Plan: {gen_response}\nResult: {simulated_tool_output}",
            system_instruction="You are the Reflector. Analyze why the plan failed or succeeded."
        )
        print(f"\n[REFLECTOR ANALYSIS]:\n{ref_response}")

        # C. CURATOR
        success = "SUCCESS" in ref_response.upper()
        # Mocking ID extraction for demo
        self.playbook.update_stats('web_sqli_union_01', success)

if __name__ == "__main__":
    agent = TRMAceAgent()
    # Test with a challenge relevant to your dataset
    agent.run_cycle("I found a login page that gives SQL errors when I type a quote.")

[SYSTEM] Loading TRM-Ace Adapter...
==((====))==  Unsloth 2025.11.4: Fast Llama patching. Transformers: 4.57.2.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details. 

In [None]:
import shutil
import os
from google.colab import files
from google.colab import drive

# 1. Configuration
adapter_path = "trm_ace_adapter"
zip_filename = "trm_ace_adapter.zip"
save_to_drive = True  # Set to False if you just want to download directly

# 2. Check if Adapter Exists
if not os.path.exists(adapter_path):
    print(f"Error: Adapter folder '{adapter_path}' not found. Did Phase 1 finish?")
else:
    print(f"Compressing '{adapter_path}'...")

    # 3. Zip the Adapter Folder
    shutil.make_archive(zip_filename.replace('.zip', ''), 'zip', adapter_path)
    print(f"Created {zip_filename} ({os.path.getsize(zip_filename) / 1024 / 1024:.2f} MB)")

    # 4. Save to Google Drive (Recommended for large files)
    if save_to_drive:
        print("Mounting Google Drive...")
        drive.mount('/content/drive')

        destination = f"/content/drive/My Drive/{zip_filename}"
        shutil.copy(zip_filename, destination)
        print(f"✅ Success! Model saved to Google Drive at: {destination}")
        print("You can now safely close this runtime.")

    # 5. Download directly (Optional - might fail for large files if connection drops)
    else:
        print("Triggering download...")
        files.download(zip_filename)

Compressing 'trm_ace_adapter'...
Created trm_ace_adapter.zip (148.14 MB)
Mounting Google Drive...
Mounted at /content/drive
✅ Success! Model saved to Google Drive at: /content/drive/My Drive/trm_ace_adapter.zip
You can now safely close this runtime.


In [None]:
import shutil
import os
from google.colab import files
from google.colab import drive

# 1. Configuration
adapter_path = "trm_ace_adapter"
playbook_file = "master_cyber_playbook.md"  # The crucial memory file
export_folder_name = "trm_ace_package"      # We will group everything here
zip_filename = "trm_ace_package.zip"
save_to_drive = True

# 2. Check for Artifacts
if not os.path.exists(adapter_path):
    print(f"Error: Adapter folder '{adapter_path}' not found.")
elif not os.path.exists(playbook_file):
    print(f"Error: Playbook file '{playbook_file}' not found.")
else:
    print(f"Packaging model and playbook...")

    # 3. Create a Package Folder
    if os.path.exists(export_folder_name):
        shutil.rmtree(export_folder_name)
    os.makedirs(export_folder_name)

    # 4. Copy Artifacts into Package
    shutil.copytree(adapter_path, f"{export_folder_name}/{adapter_path}")
    shutil.copy(playbook_file, f"{export_folder_name}/{playbook_file}")

    # 5. Zip the Package
    shutil.make_archive(export_folder_name, 'zip', export_folder_name)
    print(f"Created {zip_filename} ({os.path.getsize(zip_filename) / 1024 / 1024:.2f} MB)")

    # 6. Save to Google Drive
    if save_to_drive:
        print("Mounting Google Drive...")
        drive.mount('/content/drive')

        destination = f"/content/drive/My Drive/{zip_filename}"
        shutil.copy(zip_filename, destination)
        print(f"✅ Success! Package (Model + Playbook) saved to: {destination}")

    # 7. Download directly
    else:
        print("Triggering download...")
        files.download(zip_filename)

Packaging model and playbook...
Created trm_ace_package.zip (148.14 MB)
Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Success! Package (Model + Playbook) saved to: /content/drive/My Drive/trm_ace_package.zip


In [None]:
import os
import shutil
import json
import random
import torch
from google.colab import drive

# --- 1. SETUP & LOADING ---

# Define paths
package_zip = "trm_ace_package.zip"
extract_path = "trm_ace_agent_files"
adapter_path = f"{extract_path}/trm_ace_adapter"
playbook_path = f"{extract_path}/master_cyber_playbook.md"

# Check if we need to unzip
if not os.path.exists(adapter_path):
    print(f"[SETUP] looking for {package_zip}...")

    # Option A: Check local uploads
    if os.path.exists(package_zip):
        print(f"Found {package_zip} locally. Unzipping...")
        shutil.unpack_archive(package_zip, extract_path)

    # Option B: Check Google Drive (if not found locally)
    else:
        print("Not found locally. Checking Google Drive...")
        drive.mount('/content/drive')
        drive_path = f"/content/drive/My Drive/{package_zip}"

        if os.path.exists(drive_path):
            print(f"Found in Drive. Copying and unzipping...")
            shutil.copy(drive_path, package_zip)
            shutil.unpack_archive(package_zip, extract_path)
        else:
            raise FileNotFoundError("Could not find trm_ace_package.zip in local files or Google Drive! Please upload it.")

print("[SETUP] Files ready.")

# Install Dependencies (if new runtime)
try:
    from unsloth import FastLanguageModel
except ImportError:
    print("[SETUP] Installing dependencies...")
    !pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
    !pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes
    from unsloth import FastLanguageModel
    from peft import PeftModel

# Load Model
print("[SYSTEM] Loading Neural Network...")
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-Instruct-bnb-4bit",
    max_seq_length = 2048,
    dtype = None,
    load_in_4bit = True,
)
# Apply Adapter
model = PeftModel.from_pretrained(model, adapter_path)
FastLanguageModel.for_inference(model)

# --- 2. AGENT LOGIC (The Brain) ---

def call_llm(prompt, system_prompt):
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
"""
    inputs = tokenizer(
        [alpaca_prompt.format(system_prompt, prompt, "")],
        return_tensors = "pt"
    ).to("cuda")

    outputs = model.generate(**inputs, max_new_tokens=512, use_cache=True)
    response = tokenizer.batch_decode(outputs)[0]
    try:
        return response.split("### Response:\n")[1].replace("<|eot_id|>", "").strip()
    except:
        return response

class Curator:
    def __init__(self, path):
        self.playbook_path = path

    def learn(self, task, failure_reason, fix):
        print(f"\n[CURATOR] 🧠 LEARNING DETECTED. Improving Playbook...")
        # In a real app, the LLM would generate this JSON. For stability here, we format it directly.
        new_entry = f"\n### Auto-Learned Strategy: {task[:20]}...\n- **Trigger**: {failure_reason}\n- **Fix**: {fix}\n"
        with open(self.playbook_path, "a") as f:
            f.write(new_entry)
        print(f"[CURATOR] ✅ Knowledge saved to {self.playbook_path}")

class TRMAceAgent:
    def __init__(self):
        self.playbook_path = playbook_path
        self.curator = Curator(self.playbook_path)

    def get_context(self):
        if os.path.exists(self.playbook_path):
            with open(self.playbook_path, "r") as f:
                return f.read()[:2500] # Fit into context window
        return "No playbook found."

    def solve(self, challenge):
        print(f"\n🚀 STARTING MISSION: {challenge}")

        # 1. GENERATOR STEP
        context = self.get_context()
        plan = call_llm(
            prompt=f"Challenge: {challenge}\nKnowledge Base: {context}...",
            system_prompt="You are the Generator. Create a technical plan to solve the CTF challenge using the Knowledge Base."
        )
        print(f"\n[GENERATOR PLAN]:\n{plan}")

        # 2. SIMULATION STEP (The 'World')
        # We simulate a failure to demonstrate the self-improvement loop
        print(f"\n[SYSTEM EXECUTION] ... Failed. Error: Target uses non-standard base64 alphabet.")

        # 3. REFLECTOR STEP
        reflection = call_llm(
            prompt=f"Original Plan: {plan}\nResult: Error - Non-standard base64 alphabet detected.",
            system_prompt="You are the Reflector. Analyze why the plan failed and propose a fix."
        )
        print(f"\n[REFLECTOR ANALYSIS]:\n{reflection}")

        # 4. CURATOR STEP
        if "non-standard" in reflection.lower() or "alphabet" in reflection.lower():
            self.curator.learn(
                task=challenge,
                failure_reason="Standard Base64 decoding failed.",
                fix="Identify custom alphabet strings in binary and use CyberChef 'From Base64' with custom alphabet option."
            )

# --- 3. RUN INFERENCE ---
if __name__ == "__main__":
    bot = TRMAceAgent()
    # Test on a Crypto challenge
    bot.solve("Decode this strange base64 string that doesn't decode correctly: 'ZmxhZ3t...'.")

[SETUP] looking for trm_ace_package.zip...
Not found locally. Checking Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Found in Drive. Copying and unzipping...
[SETUP] Files ready.
[SETUP] Installing dependencies...
Collecting unsloth@ git+https://github.com/unslothai/unsloth.git (from unsloth[colab-new]@ git+https://github.com/unslothai/unsloth.git)
  Cloning https://github.com/unslothai/unsloth.git to /tmp/pip-install-a8i26ynj/unsloth_ab33d54220c14813a3e01f43b9bc7c82
  Running command git clone --filter=blob:none --quiet https://github.com/unslothai/unsloth.git /tmp/pip-install-a8i26ynj/unsloth_ab33d54220c14813a3e01f43b9bc7c82
  Resolved https://github.com/unslothai/unsloth.git to commit 0fb14e6a76f3695d01314d7b3faf7252141d9f56
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?

model.safetensors:   0%|          | 0.00/5.70G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/220 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/345 [00:00<?, ?B/s]


🚀 STARTING MISSION: Decode this strange base64 string that doesn't decode correctly: 'ZmxhZ3t...'.

[GENERATOR PLAN]:
THOUGHT: Indicators suggest EIP Overwrite. I will try Overwrite Return Address to redirect flow..
ACTION: Cyclic Pattern: Aa0Aa1...

[SYSTEM EXECUTION] ... Failed. Error: Target uses non-standard base64 alphabet.

[REFLECTOR ANALYSIS]:
SUCCESS. The plan correctly identifies EIP Overwrite and uses Cyclic Pattern to detect Non-standard base64 alphabet.

[CURATOR] 🧠 LEARNING DETECTED. Improving Playbook...
[CURATOR] ✅ Knowledge saved to trm_ace_agent_files/master_cyber_playbook.md


In [None]:
# 1. Install Vector Database Dependencies
!pip install chromadb sentence-transformers

import chromadb
from sentence_transformers import SentenceTransformer
import os
import json

# --- 1. RAG MEMORY SYSTEM ---
class VectorPlaybook:
    def __init__(self, playbook_path="master_cyber_playbook.md"):
        self.chroma_client = chromadb.Client()
        self.collection = self.chroma_client.create_collection(name="cyber_strategies")
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2') # Small, fast embedding model

        self.load_and_index(playbook_path)

    def load_and_index(self, path):
        print("[RAG] Indexing Playbook...")
        if not os.path.exists(path):
            print("Playbook not found!")
            return

        with open(path, "r") as f:
            content = f.read()

        # Simple splitting by "###" headers (Strategies)
        strategies = content.split("### ")
        ids = []
        documents = []
        metadatas = []

        for i, strategy in enumerate(strategies[1:]): # Skip header
            lines = strategy.split("\n")
            title = lines[0].strip()
            body = "\n".join(lines[1:])

            ids.append(f"strat_{i}")
            documents.append(f"{title}\n{body}")
            metadatas.append({"title": title})

        if documents:
            self.collection.add(
                documents=documents,
                ids=ids,
                metadatas=metadatas
            )
        print(f"[RAG] Indexed {len(documents)} strategies.")

    def retrieve(self, query, n_results=2):
        """
        Finds the 2 most relevant strategies for the given task.
        """
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        return "\n\n".join(results['documents'][0])

# --- 2. UPDATED AGENT WITH RAG ---
# (We reuse the previous model loading logic, but swap the memory)

# [Insert Model Loading Code Here if not already loaded]
# ...

class TRMAceAgentRAG:
    def __init__(self):
        self.memory = VectorPlaybook() # New Memory System

    def run_mission(self, task):
        print(f"--- MISSION START: {task} ---")

        # 1. RETRIEVE (RAG Step)
        relevant_context = self.memory.retrieve(task)
        print(f"\n[MEMORY RECALL]: Found relevant strategies:\n{relevant_context[:200]}...\n")

        # 2. GENERATE
        # Now we feed ONLY the relevant context, saving tokens!
        plan = call_llm(
            prompt=f"Task: {task}\nReference Strategies:\n{relevant_context}",
            system_prompt="You are the Generator. Solve the CTF."
        )
        print(f"[GENERATOR]:\n{plan}")

        # ... (Rest of the loop: Simulation, Reflection, Curation) ...

if __name__ == "__main__":
    agent = TRMAceAgentRAG()
    agent.run_mission("I need to analyze a PCAP file for HTTP traffic.")

Collecting chromadb
  Downloading chromadb-1.3.5-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.3.0-py3-none-any.whl.metadata (5.6 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.38.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

[RAG] Indexing Playbook...
Playbook not found!
--- MISSION START: I need to analyze a PCAP file for HTTP traffic. ---


/root/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|██████████| 79.3M/79.3M [00:02<00:00, 39.8MiB/s]



[MEMORY RECALL]: Found relevant strategies:
...

[GENERATOR]:
THOUGHT: Indicators suggest HTTP Object Extraction. I will try Extract files transferred over unencrypted HTTP..
ACTION: Wireshark: File -> Export Objects -> HTTP


In [None]:
import shutil
import os
from google.colab import files
from google.colab import drive

# 1. Configuration
adapter_path = "trm_ace_adapter"
playbook_file = "master_cyber_playbook.md"  # The crucial memory file
export_folder_name = "trm_ace_package"      # We will group everything here
zip_filename = "trm_ace_package.zip"
save_to_drive = True

# 2. Check for Artifacts
if not os.path.exists(adapter_path):
    print(f"Error: Adapter folder '{adapter_path}' not found. Did Phase 1 (Fine-tuning) finish successfully?")
elif not os.path.exists(playbook_file):
    print(f"Error: Playbook file '{playbook_file}' not found. Did you run the converter script?")
else:
    print(f"Packaging model and playbook...")

    # 3. Create a Package Folder
    if os.path.exists(export_folder_name):
        shutil.rmtree(export_folder_name)
    os.makedirs(export_folder_name)

    # 4. Copy Artifacts into Package
    shutil.copytree(adapter_path, f"{export_folder_name}/{adapter_path}")
    shutil.copy(playbook_file, f"{export_folder_name}/{playbook_file}")

    # 5. Zip the Package
    shutil.make_archive(export_folder_name, 'zip', export_folder_name)
    print(f"Created {zip_filename} ({os.path.getsize(zip_filename) / 1024 / 1024:.2f} MB)")

    # 6. Save to Google Drive
    if save_to_drive:
        print("Mounting Google Drive...")
        drive.mount('/content/drive')

        destination = f"/content/drive/My Drive/{zip_filename}"
        shutil.copy(zip_filename, destination)
        print(f"✅ Success! Package (Model + Playbook) saved to Google Drive at: {destination}")
        print("You can now safely close this runtime and open a new one.")

    # 7. Download directly (Optional backup)
    else:
        print("Triggering download...")
        files.download(zip_filename)

Error: Adapter folder 'trm_ace_adapter' not found. Did Phase 1 (Fine-tuning) finish successfully?


In [8]:
import os
import shutil
import json
import random
import torch
from google.colab import drive

# --- 1. SETUP & LOADING ---

# Define paths
package_zip = "trm_ace_package.zip"
extract_path = "trm_ace_agent_files"
adapter_path = f"{extract_path}/trm_ace_adapter"
playbook_file = "master_cyber_playbook.md" # Original file name in zip root
playbook_path = f"{extract_path}/{playbook_file}"

# Check if we need to unzip
if not os.path.exists(adapter_path):
    print(f"[SETUP] looking for {package_zip}...")

    # Option A: Check local uploads
    if os.path.exists(package_zip):
        print(f"Found {package_zip} locally. Unzipping...")
        shutil.unpack_archive(package_zip, extract_path)

    # Option B: Check Google Drive (if not found locally)
    else:
        print("Not found locally. Checking Google Drive...")
        drive.mount('/content/drive')
        drive_path = f"/content/drive/My Drive/{package_zip}"

        if os.path.exists(drive_path):
            print(f"Found in Drive. Copying and unzipping...")
            shutil.copy(drive_path, package_zip)
            shutil.unpack_archive(package_zip, extract_path)
        else:
            # Fallback for demo if zip is missing (assumes files are in local dir)
            if os.path.exists("trm_ace_adapter"):
                 print("Found unzipped 'trm_ace_adapter' folder directly. Using that.")
                 adapter_path = "trm_ace_adapter"
                 playbook_path = playbook_file
            else:
                 raise FileNotFoundError("Could not find trm_ace_package.zip in local files or Google Drive! Please upload it.")

print("[SETUP] Files ready.")

# Install Dependencies (if new runtime)
try:
    from unsloth import FastLanguageModel
except ImportError:
    print("[SETUP] Installing dependencies...")
    !pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
    !pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes
    from unsloth import FastLanguageModel
    from peft import PeftModel

# Load Model with Memory Optimization
print("[SYSTEM] Loading Neural Network...")
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-Instruct-bnb-4bit",
    max_seq_length = 2048,
    dtype = None,
    load_in_4bit = True,
    # device_map="auto", # Let accelerate handle offloading if needed
)

# Apply Adapter
# Robust loading: Check if adapter exists before applying
if os.path.exists(adapter_path):
    print(f"[SYSTEM] Applying adapter from {adapter_path}")
    model = PeftModel.from_pretrained(model, adapter_path)
else:
    print(f"[WARNING] Adapter path {adapter_path} not found. Running base model.")

FastLanguageModel.for_inference(model)

# --- 2. AGENT LOGIC (The Brain) ---

def call_llm(prompt, system_prompt):
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
"""
    inputs = tokenizer(
        [alpaca_prompt.format(system_prompt, prompt, "")],
        return_tensors = "pt"
    ).to("cuda")

    outputs = model.generate(**inputs, max_new_tokens=512, use_cache=True)
    response = tokenizer.batch_decode(outputs)[0]
    try:
        return response.split("### Response:\n")[1].replace("<|eot_id|>", "").strip()
    except:
        return response

class Curator:
    def __init__(self, path):
        self.playbook_path = path

    def learn(self, task, failure_reason, fix):
        print(f"\n[CURATOR] 🧠 LEARNING DETECTED. Improving Playbook...")
        # In a real app, the LLM would generate this JSON. For stability here, we format it directly.
        new_entry = f"\n### Auto-Learned Strategy: {task[:20]}...\n- **Trigger**: {failure_reason}\n- **Fix**: {fix}\n"

        # Ensure path exists before writing
        if not os.path.exists(self.playbook_path):
             with open(self.playbook_path, "w") as f: f.write("# Recovered Playbook\n")

        with open(self.playbook_path, "a") as f:
            f.write(new_entry)
        print(f"[CURATOR] ✅ Knowledge saved to {self.playbook_path}")

class TRMAceAgent:
    def __init__(self):
        self.playbook_path = playbook_path
        self.curator = Curator(self.playbook_path)

    def get_context(self):
        if os.path.exists(self.playbook_path):
            with open(self.playbook_path, "r") as f:
                return f.read()[:2500] # Fit into context window
        return "No playbook found."

    def solve(self, challenge):
        print(f"\n🚀 STARTING MISSION: {challenge}")

        # 1. GENERATOR STEP
        context = self.get_context()
        plan = call_llm(
            prompt=f"Challenge: {challenge}\nKnowledge Base: {context}...",
            system_prompt="You are the Generator. Create a technical plan to solve the CTF challenge using the Knowledge Base."
        )
        print(f"\n[GENERATOR PLAN]:\n{plan}")

        # 2. SIMULATION STEP (The 'World')
        # We simulate a failure to demonstrate the self-improvement loop
        print(f"\n[SYSTEM EXECUTION] ... Failed. Error: Target uses non-standard base64 alphabet.")

        # 3. REFLECTOR STEP
        reflection = call_llm(
            prompt=f"Original Plan: {plan}\nResult: Error - Non-standard base64 alphabet detected.",
            system_prompt="You are the Reflector. Analyze why the plan failed and propose a fix."
        )
        print(f"\n[REFLECTOR ANALYSIS]:\n{reflection}")

        # 4. CURATOR STEP
        if "non-standard" in reflection.lower() or "alphabet" in reflection.lower():
            self.curator.learn(
                task=challenge,
                failure_reason="Standard Base64 decoding failed.",
                fix="Identify custom alphabet strings in binary and use CyberChef 'From Base64' with custom alphabet option."
            )

# --- 3. RUN INFERENCE ---
if __name__ == "__main__":
    bot = TRMAceAgent()
    # Test on a Crypto challenge
    bot.solve("Decode this strange base64 string that doesn't decode correctly: 'ZmxhZ3t...'.")

[SETUP] Files ready.
[SYSTEM] Loading Neural Network...
==((====))==  Unsloth 2025.11.4: Fast Llama patching. Transformers: 4.57.2.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details. 

In [9]:
# 1. Install Unsloth (Fastest way to train in Colab)
try:
    import unsloth
except ImportError:
    !pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
    !pip install --no-deps "xformers<0.0.27" "trl<0.9.0" peft accelerate bitsandbytes

from unsloth import FastLanguageModel
import torch
from datasets import load_dataset
from trl import SFTTrainer
from transformers import TrainingArguments

# 2. Configuration
max_seq_length = 2048
dtype = None # Auto detection
load_in_4bit = True # Essential for Colab Free Tier

# 3. Load Base Model (Tiny Reasoning Model)
# We use Llama-3-8B-Instruct because it's excellent at reasoning tasks
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-Instruct-bnb-4bit",
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)

# 4. Prepare the Model for Training (LoRA)
model = FastLanguageModel.get_peft_model(
    model,
    r = 16, # Rank (higher = more parameters tuned, but slower)
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj",],
    lora_alpha = 16,
    lora_dropout = 0,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
)

# 5. Load YOUR Generated Dataset
# We map your JSONL format to the prompt style Llama-3 expects
dataset = load_dataset("json", data_files="trm_ace_finetuning.jsonl", split="train")

alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

def formatting_prompts_func(examples):
    instructions = examples["instruction"]
    inputs       = examples["input"]
    outputs      = examples["output"]
    texts = []
    for instruction, input, output in zip(instructions, inputs, outputs):
        text = alpaca_prompt.format(instruction, input, output) + tokenizer.eos_token
        texts.append(text)
    return { "text" : texts, }

dataset = dataset.map(formatting_prompts_func, batched = True)

# 6. Start Training
trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    train_dataset = dataset,
    dataset_text_field = "text",
    max_seq_length = max_seq_length,
    dataset_num_proc = 2,
    packing = False, # Can speed up training for short sequences
    args = TrainingArguments(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 4,
        warmup_steps = 5,
        max_steps = 300, # Increased for better performance
        learning_rate = 2e-4,
        fp16 = not torch.cuda.is_bf16_supported(),
        bf16 = torch.cuda.is_bf16_supported(),
        logging_steps = 10,
        optim = "adamw_8bit",
        weight_decay = 0.01,
        lr_scheduler_type = "linear",
        seed = 3407,
        output_dir = "outputs",
    ),
)

print("--- STARTING TRM-ACE FINE-TUNING ---")
trainer_stats = trainer.train()
print("--- TRAINING COMPLETE ---")

# 7. Save the Adapter
model.save_pretrained("trm_ace_adapter")
print("Model Adapter saved to 'trm_ace_adapter'")

==((====))==  Unsloth 2025.11.4: Fast Llama patching. Transformers: 4.57.2.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.9.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.5.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


ValueError: Some modules are dispatched on the CPU or the disk. Make sure you have enough GPU RAM to fit the quantized model. If you want to dispatch the model on the CPU or the disk while keeping these modules in 32-bit, you need to set `llm_int8_enable_fp32_cpu_offload=True` and pass a custom `device_map` to `from_pretrained`. Check https://huggingface.co/docs/transformers/main/en/main_classes/quantization#offload-between-cpu-and-gpu for more details. 

In [None]:
import json
import os

# --- 1. DEFINE YOUR RAW DATA (Merged from all your uploads) ---
# This dictionary combines the strategies from all your JSON files.

raw_data = {
  "dataset_metadata": {
    "description": "Master TRM-Ace Training Set covering Reversing, Crypto, Forensics, and Web.",
    "use_case": "Full-Spectrum CTF Agent Training"
  },
  "data": [
    # --- REVERSE ENGINEERING ---
    {
      "category": "Buffer Overflow",
      "description": "Memory corruption attacks.",
      "strategies": [
        {
          "strategy_id": "pwn_bof_eip_01",
          "strategy_name": "EIP Overwrite",
          "trigger_indicators": ["Segfault at offset", "Checksec: NX Disabled"],
          "technique_description": "Locate offset of Return Address and overwrite to redirect execution.",
          "training_input_payload": "Cyclic Pattern (Metasploit): Aa0Aa1Aa2...",
          "expected_behavior": "Program crashes with EIP register containing 0xdeadbeef.",
          "remediation_logic": "Replace unsafe functions (strcpy) with strncpy."
        },
        {
          "strategy_id": "pwn_rop_01",
          "strategy_name": "Return-Oriented Programming (ROP)",
          "trigger_indicators": ["NX Enabled", "Stack not executable"],
          "technique_description": "Chain together small snippets of existing code ('gadgets') to bypass NX.",
          "training_input_payload": "ROP Chain: [ Pop_RDI | Arg | System_Addr ]",
          "expected_behavior": "Program executes system('/bin/sh') without new code injection.",
          "remediation_logic": "Compile with ASLR."
        }
      ]
    },
    {
      "category": "Ghidra Basics",
      "description": "Static analysis workflows.",
      "strategies": [
        {
          "strategy_id": "rev_ghidra_decomp_01",
          "strategy_name": "Decompilation Logic Recovery",
          "trigger_indicators": ["Binary is stripped", "Complex ASM logic"],
          "technique_description": "Use Decompiler window to translate ASM to pseudo-C.",
          "training_input_payload": "Window -> Decompiler. Analyze function 'main'.",
          "expected_behavior": "Raw ASM 'MOV EAX...' becomes legible C code.",
          "remediation_logic": "Use obfuscators."
        }
      ]
    },
    # --- CRYPTOGRAPHY ---
    {
      "category": "RSA Attacks",
      "description": "Exploiting weak key generation.",
      "strategies": [
        {
          "strategy_id": "crypto_rsa_small_e_01",
          "strategy_name": "Small Public Exponent (e=3)",
          "trigger_indicators": ["Public Key e=3", "No Padding (OAEP)"],
          "technique_description": "If m^e < n, the modulo has no effect. Calculate eth root.",
          "training_input_payload": "gmpy2.iroot(c, 3)",
          "expected_behavior": "Cube root yields plaintext integer directly.",
          "remediation_logic": "Use e=65537"
        },
        {
          "strategy_id": "crypto_rsa_common_mod_01",
          "strategy_name": "Common Modulus Attack",
          "trigger_indicators": ["Same 'n' used with different 'e'", "Multiple ciphertexts"],
          "technique_description": "Recover plaintext using Extended Euclidean Algorithm on exponents.",
          "training_input_payload": "Find a,b s.t. a*e1 + b*e2 = 1",
          "expected_behavior": "Mathematical cancellation reveals message 'm'.",
          "remediation_logic": "Never reuse modulus"
        }
      ]
    },
    {
      "category": "XOR Encryption",
      "description": "Bitwise cipher breaking.",
      "strategies": [
        {
          "strategy_id": "crypto_xor_brute_01",
          "strategy_name": "Single-Byte XOR Brute Force",
          "trigger_indicators": ["Ciphertext has repeating patterns", "Key likely 1 byte"],
          "technique_description": "XOR every byte with all 256 possibilities.",
          "training_input_payload": "for k in range(256): print(xor(c, k))",
          "expected_behavior": "One output will contain legible text (e.g., 'flag{').",
          "remediation_logic": "Use AES"
        }
      ]
    },
    # --- FORENSICS ---
    {
      "category": "PCAP Analysis",
      "description": "Network traffic inspection.",
      "strategies": [
        {
          "strategy_id": "forensics_pcap_http_01",
          "strategy_name": "HTTP Object Extraction",
          "trigger_indicators": ["Unencrypted HTTP traffic", "Large file transfer"],
          "technique_description": "Extract files transferred over cleartext HTTP.",
          "training_input_payload": "Wireshark: File -> Export Objects -> HTTP",
          "expected_behavior": "Extraction yields 'malware.exe' or 'confidential.pdf'.",
          "remediation_logic": "Enforce TLS/SSL"
        },
        {
          "strategy_id": "forensics_dns_tunnel_01",
          "strategy_name": "DNS Tunneling Detection",
          "trigger_indicators": ["High volume of DNS TXT records", "Long subdomains"],
          "technique_description": "Analyze protocol hierarchy for anomalous DNS volume.",
          "training_input_payload": "Statistics -> Protocol Hierarchy",
          "expected_behavior": "DNS traffic accounts for >50% of bandwidth.",
          "remediation_logic": "Block non-standard DNS"
        }
      ]
    },
    {
      "category": "Log Parsing",
      "description": "System event correlation.",
      "strategies": [
        {
          "strategy_id": "forensics_lat_move_01",
          "strategy_name": "Lateral Movement Detection",
          "trigger_indicators": ["Event ID 4624 Type 3", "PsExec usage"],
          "technique_description": "Correlate Network Logons with Process Creation.",
          "training_input_payload": "Filter: Event ID 4624 followed by 4688 (psexec.exe)",
          "expected_behavior": "Marketing user logs into Domain Controller via command shell.",
          "remediation_logic": "Tiered Administration"
        }
      ]
    },
    {
      "category": "Steganography",
      "description": "Hiding data in files.",
      "strategies": [
        {
          "strategy_id": "stego_lsb_01",
          "strategy_name": "LSB Analysis",
          "trigger_indicators": ["PNG image", "File size slightly larger than expected"],
          "technique_description": "Analyze Least Significant Bits of pixel data.",
          "training_input_payload": "zsteg -a image.png",
          "expected_behavior": "Extracted bits form a binary stream reading 'flag{...}'.",
          "remediation_logic": "Re-encode images"
        }
      ]
    },
    # --- WEB VULNERABILITIES ---
    {
      "category": "SQL Injection",
      "description": "Database query manipulation.",
      "strategies": [
        {
          "strategy_id": "web_sqli_union_01",
          "strategy_name": "Union-Based Extraction",
          "trigger_indicators": ["Input reflected in error", "Database dump visible"],
          "technique_description": "Combine results of original query with injected query.",
          "training_input_payload": "' UNION SELECT username, password FROM users --",
          "expected_behavior": "Application displays data from 'users' table.",
          "remediation_logic": "Prepared Statements"
        },
        {
          "strategy_id": "web_sqli_boolean_01",
          "strategy_name": "Boolean Blind SQLi",
          "trigger_indicators": ["Content changes on True/False", "No error message"],
          "technique_description": "Infer data by asking True/False questions.",
          "training_input_payload": "' AND 1=1 -- vs ' AND 1=2 --",
          "expected_behavior": "True payload shows content; False payload hides it.",
          "remediation_logic": "Prepared Statements"
        }
      ]
    },
    {
      "category": "Cross-Site Scripting (XSS)",
      "description": "Client-side script injection.",
      "strategies": [
        {
          "strategy_id": "web_xss_reflected_01",
          "strategy_name": "Reflected XSS",
          "trigger_indicators": ["URL parameter reflected in DOM", "No sanitation"],
          "technique_description": "Embed payload in URL parameter.",
          "training_input_payload": "<script>alert(document.cookie)</script>",
          "expected_behavior": "Browser executes script in victim's session.",
          "remediation_logic": "Context-aware encoding"
        }
      ]
    },
    {
      "category": "Command Injection",
      "description": "OS Command Execution.",
      "strategies": [
        {
          "strategy_id": "web_cmdi_chain_01",
          "strategy_name": "Command Chaining",
          "trigger_indicators": ["Input used in shell command", "Ping/Echo functionality"],
          "technique_description": "Use separators (; | &&) to inject commands.",
          "training_input_payload": "127.0.0.1; cat /etc/passwd",
          "expected_behavior": "Response includes content of /etc/passwd.",
          "remediation_logic": "Avoid shell_exec"
        }
      ]
    },
    {
      "category": "IDOR",
      "description": "Access control failures.",
      "strategies": [
        {
          "strategy_id": "web_idor_enum_01",
          "strategy_name": "ID Enumeration",
          "trigger_indicators": ["Numeric IDs in URL", "Sequential User IDs"],
          "technique_description": "Change numeric ID to view other records.",
          "training_input_payload": "GET /api/users/1001 -> Change to 1002",
          "expected_behavior": "Server returns profile for user 1002.",
          "remediation_logic": "Server-side access checks"
        }
      ]
    }
  ]
}

# --- 2. GENERATION FUNCTIONS ---

def generate_playbook(data):
    """
    Creates the Markdown file for the Curator/Generator to read at runtime.
    This is the 'Long Term Memory' of the agent.
    """
    output = "# TRM-Ace Master Cybersecurity Playbook\n"
    output += "_Auto-generated from Seed Data. Contains Reversing, Crypto, Forensics, and Web strategies._\n\n"

    for cat in data['data']:
        output += f"## {cat['category']}\n"
        output += f"_{cat['description']}_\n\n"
        for strat in cat['strategies']:
            output += f"### {strat['strategy_name']}\n"
            output += f"- **ID**: `{strat['strategy_id']}`\n"
            output += f"- **Triggers**: {', '.join(strat['trigger_indicators'])}\n"
            output += f"- **Technique**: {strat['technique_description']}\n"
            output += f"- **Payload**: `{strat['training_input_payload']}`\n"
            output += f"- **Expected Success Indicator**: {strat['expected_behavior']}\n\n"
    return output

def generate_finetuning(data):
    """
    Creates the JSONL file for Fine-Tuning the LLM.
    This teaches the 'Generator' how to think and the 'Reflector' how to judge.
    """
    output = []
    for cat in data['data']:
        for strat in cat['strategies']:
            # 1. Generator Training Sample (The Analyst)
            # Input: Challenge Context -> Output: Thought + Action
            output.append({
                "instruction": f"You are a cybersecurity analyst. Solve this {cat['category']} challenge.",
                "input": f"Context/Indicators: {strat['trigger_indicators'][0]}",
                "output": f"THOUGHT: The indicators suggest {strat['strategy_name']} is possible. I will attempt {strat['technique_description']}.\nACTION: {strat['training_input_payload']}"
            })

            # 2. Reflector Training Sample (The Coach)
            # Input: Tool Output -> Output: Success/Failure Verdict
            output.append({
                "instruction": f"Analyze the output of the following security tool execution.",
                "input": f"Tool Output: '{strat['expected_behavior']}'\nContext: I attempted {strat['strategy_name']}.",
                "output": f"ANALYSIS: The output matches the expected behavior for a successful {strat['strategy_name']}. The exploit worked."
            })
    return output

# --- 3. EXECUTION ---
if __name__ == "__main__":
    # Generate Playbook
    pb_content = generate_playbook(raw_data)
    with open("master_cyber_playbook.md", "w") as f:
        f.write(pb_content)

    # Generate Fine-Tuning Data
    ft_data = generate_finetuning(raw_data)
    with open("trm_ace_finetuning.jsonl", "w") as f:
        for line in ft:
            f.write(json.dumps(line) + "\n")

    print("Success! Generated updated artifacts:")
    print("1. 'master_cyber_playbook.md' (Curator Memory)")
    print("2. 'trm_ace_finetuning.jsonl' (Training Data)")

NameError: name 'ft' is not defined

In [None]:
import json
import os

# --- 1. DEFINE YOUR RAW DATA (Merged from all your uploads) ---
# This dictionary combines the strategies from all your JSON files.

raw_data = {
  "dataset_metadata": {
    "description": "Master TRM-Ace Training Set covering Reversing, Crypto, Forensics, and Web.",
    "use_case": "Full-Spectrum CTF Agent Training"
  },
  "data": [
    # --- REVERSE ENGINEERING ---
    {
      "category": "Buffer Overflow",
      "description": "Memory corruption attacks.",
      "strategies": [
        {
          "strategy_id": "pwn_bof_eip_01",
          "strategy_name": "EIP Overwrite",
          "trigger_indicators": ["Segfault at offset", "Checksec: NX Disabled"],
          "technique_description": "Locate offset of Return Address and overwrite to redirect execution.",
          "training_input_payload": "Cyclic Pattern (Metasploit): Aa0Aa1Aa2...",
          "expected_behavior": "Program crashes with EIP register containing 0xdeadbeef.",
          "remediation_logic": "Replace unsafe functions (strcpy) with strncpy."
        },
        {
          "strategy_id": "pwn_rop_01",
          "strategy_name": "Return-Oriented Programming (ROP)",
          "trigger_indicators": ["NX Enabled", "Stack not executable"],
          "technique_description": "Chain together small snippets of existing code ('gadgets') to bypass NX.",
          "training_input_payload": "ROP Chain: [ Pop_RDI | Arg | System_Addr ]",
          "expected_behavior": "Program executes system('/bin/sh') without new code injection.",
          "remediation_logic": "Compile with ASLR."
        }
      ]
    },
    {
      "category": "Ghidra Basics",
      "description": "Static analysis workflows.",
      "strategies": [
        {
          "strategy_id": "rev_ghidra_decomp_01",
          "strategy_name": "Decompilation Logic Recovery",
          "trigger_indicators": ["Binary is stripped", "Complex ASM logic"],
          "technique_description": "Use Decompiler window to translate ASM to pseudo-C.",
          "training_input_payload": "Window -> Decompiler. Analyze function 'main'.",
          "expected_behavior": "Raw ASM 'MOV EAX...' becomes legible C code.",
          "remediation_logic": "Use obfuscators."
        }
      ]
    },
    # --- CRYPTOGRAPHY ---
    {
      "category": "RSA Attacks",
      "description": "Exploiting weak key generation.",
      "strategies": [
        {
          "strategy_id": "crypto_rsa_small_e_01",
          "strategy_name": "Small Public Exponent (e=3)",
          "trigger_indicators": ["Public Key e=3", "No Padding (OAEP)"],
          "technique_description": "If m^e < n, the modulo has no effect. Calculate eth root.",
          "training_input_payload": "gmpy2.iroot(c, 3)",
          "expected_behavior": "Cube root yields plaintext integer directly.",
          "remediation_logic": "Use e=65537"
        },
        {
          "strategy_id": "crypto_rsa_common_mod_01",
          "strategy_name": "Common Modulus Attack",
          "trigger_indicators": ["Same 'n' used with different 'e'", "Multiple ciphertexts"],
          "technique_description": "Recover plaintext using Extended Euclidean Algorithm on exponents.",
          "training_input_payload": "Find a,b s.t. a*e1 + b*e2 = 1",
          "expected_behavior": "Mathematical cancellation reveals message 'm'.",
          "remediation_logic": "Never reuse modulus"
        }
      ]
    },
    {
      "category": "XOR Encryption",
      "description": "Bitwise cipher breaking.",
      "strategies": [
        {
          "strategy_id": "crypto_xor_brute_01",
          "strategy_name": "Single-Byte XOR Brute Force",
          "trigger_indicators": ["Ciphertext has repeating patterns", "Key likely 1 byte"],
          "technique_description": "XOR every byte with all 256 possibilities.",
          "training_input_payload": "for k in range(256): print(xor(c, k))",
          "expected_behavior": "One output will contain legible text (e.g., 'flag{').",
          "remediation_logic": "Use AES"
        }
      ]
    },
    # --- FORENSICS ---
    {
      "category": "PCAP Analysis",
      "description": "Network traffic inspection.",
      "strategies": [
        {
          "strategy_id": "forensics_pcap_http_01",
          "strategy_name": "HTTP Object Extraction",
          "trigger_indicators": ["Unencrypted HTTP traffic", "Large file transfer"],
          "technique_description": "Extract files transferred over cleartext HTTP.",
          "training_input_payload": "Wireshark: File -> Export Objects -> HTTP",
          "expected_behavior": "Extraction yields 'malware.exe' or 'confidential.pdf'.",
          "remediation_logic": "Enforce TLS/SSL"
        },
        {
          "strategy_id": "forensics_dns_tunnel_01",
          "strategy_name": "DNS Tunneling Detection",
          "trigger_indicators": ["High volume of DNS TXT records", "Long subdomains"],
          "technique_description": "Analyze protocol hierarchy for anomalous DNS volume.",
          "training_input_payload": "Statistics -> Protocol Hierarchy",
          "expected_behavior": "DNS traffic accounts for >50% of bandwidth.",
          "remediation_logic": "Block non-standard DNS"
        }
      ]
    },
    {
      "category": "Log Parsing",
      "description": "System event correlation.",
      "strategies": [
        {
          "strategy_id": "forensics_lat_move_01",
          "strategy_name": "Lateral Movement Detection",
          "trigger_indicators": ["Event ID 4624 Type 3", "PsExec usage"],
          "technique_description": "Correlate Network Logons with Process Creation.",
          "training_input_payload": "Filter: Event ID 4624 followed by 4688 (psexec.exe)",
          "expected_behavior": "Marketing user logs into Domain Controller via command shell.",
          "remediation_logic": "Tiered Administration"
        }
      ]
    },
    {
      "category": "Steganography",
      "description": "Hiding data in files.",
      "strategies": [
        {
          "strategy_id": "stego_lsb_01",
          "strategy_name": "LSB Analysis",
          "trigger_indicators": ["PNG image", "File size slightly larger than expected"],
          "technique_description": "Analyze Least Significant Bits of pixel data.",
          "training_input_payload": "zsteg -a image.png",
          "expected_behavior": "Extracted bits form a binary stream reading 'flag{...}'.",
          "remediation_logic": "Re-encode images"
        }
      ]
    },
    # --- WEB VULNERABILITIES ---
    {
      "category": "SQL Injection",
      "description": "Database query manipulation.",
      "strategies": [
        {
          "strategy_id": "web_sqli_union_01",
          "strategy_name": "Union-Based Extraction",
          "trigger_indicators": ["Input reflected in error", "Database dump visible"],
          "technique_description": "Combine results of original query with injected query.",
          "training_input_payload": "' UNION SELECT username, password FROM users --",
          "expected_behavior": "Application displays data from 'users' table.",
          "remediation_logic": "Prepared Statements"
        },
        {
          "strategy_id": "web_sqli_boolean_01",
          "strategy_name": "Boolean Blind SQLi",
          "trigger_indicators": ["Content changes on True/False", "No error message"],
          "technique_description": "Infer data by asking True/False questions.",
          "training_input_payload": "' AND 1=1 -- vs ' AND 1=2 --",
          "expected_behavior": "True payload shows content; False payload hides it.",
          "remediation_logic": "Prepared Statements"
        }
      ]
    },
    {
      "category": "Cross-Site Scripting (XSS)",
      "description": "Client-side script injection.",
      "strategies": [
        {
          "strategy_id": "web_xss_reflected_01",
          "strategy_name": "Reflected XSS",
          "trigger_indicators": ["URL parameter reflected in DOM", "No sanitation"],
          "technique_description": "Embed payload in URL parameter.",
          "training_input_payload": "<script>alert(document.cookie)</script>",
          "expected_behavior": "Browser executes script in victim's session.",
          "remediation_logic": "Context-aware encoding"
        }
      ]
    },
    {
      "category": "Command Injection",
      "description": "OS Command Execution.",
      "strategies": [
        {
          "strategy_id": "web_cmdi_chain_01",
          "strategy_name": "Command Chaining",
          "trigger_indicators": ["Input used in shell command", "Ping/Echo functionality"],
          "technique_description": "Use separators (; | &&) to inject commands.",
          "training_input_payload": "127.0.0.1; cat /etc/passwd",
          "expected_behavior": "Response includes content of /etc/passwd.",
          "remediation_logic": "Avoid shell_exec"
        }
      ]
    },
    {
      "category": "IDOR",
      "description": "Access control failures.",
      "strategies": [
        {
          "strategy_id": "web_idor_enum_01",
          "strategy_name": "ID Enumeration",
          "trigger_indicators": ["Numeric IDs in URL", "Sequential User IDs"],
          "technique_description": "Change numeric ID to view other records.",
          "training_input_payload": "GET /api/users/1001 -> Change to 1002",
          "expected_behavior": "Server returns profile for user 1002.",
          "remediation_logic": "Server-side access checks"
        }
      ]
    }
  ]
}

# --- 2. GENERATION FUNCTIONS ---

def generate_playbook(data):
    """
    Creates the Markdown file for the Curator/Generator to read at runtime.
    This is the 'Long Term Memory' of the agent.
    """
    output = "# TRM-Ace Master Cybersecurity Playbook\n"
    output += "_Auto-generated from Seed Data. Contains Reversing, Crypto, Forensics, and Web strategies._\n\n"

    for cat in data['data']:
        output += f"## {cat['category']}\n"
        output += f"_{cat['description']}_\n\n"
        for strat in cat['strategies']:
            output += f"### {strat['strategy_name']}\n"
            output += f"- **ID**: `{strat['strategy_id']}`\n"
            output += f"- **Triggers**: {', '.join(strat['trigger_indicators'])}\n"
            output += f"- **Technique**: {strat['technique_description']}\n"
            output += f"- **Payload**: `{strat['training_input_payload']}`\n"
            output += f"- **Expected Success Indicator**: {strat['expected_behavior']}\n\n"
    return output

def generate_finetuning(data):
    """
    Creates the JSONL file for Fine-Tuning the LLM.
    This teaches the 'Generator' how to think and the 'Reflector' how to judge.
    """
    output = []
    for cat in data['data']:
        for strat in cat['strategies']:
            # 1. Generator Training Sample (The Analyst)
            # Input: Challenge Context -> Output: Thought + Action
            output.append({
                "instruction": f"You are a cybersecurity analyst. Solve this {cat['category']} challenge.",
                "input": f"Context/Indicators: {strat['trigger_indicators'][0]}",
                "output": f"THOUGHT: The indicators suggest {strat['strategy_name']} is possible. I will attempt {strat['technique_description']}.\nACTION: {strat['training_input_payload']}"
            })

            # 2. Reflector Training Sample (The Coach)
            # Input: Tool Output -> Output: Success/Failure Verdict
            output.append({
                "instruction": f"Analyze the output of the following security tool execution.",
                "input": f"Tool Output: '{strat['expected_behavior']}'\nContext: I attempted {strat['strategy_name']}.",
                "output": f"ANALYSIS: The output matches the expected behavior for a successful {strat['strategy_name']}. The exploit worked."
            })
    return output

# --- 3. EXECUTION ---
if __name__ == "__main__":
    # Generate Playbook
    pb_content = generate_playbook(raw_data)
    with open("master_cyber_playbook.md", "w") as f:
        f.write(pb_content)

    # Generate Fine-Tuning Data
    ft_data = generate_finetuning(raw_data)
    with open("trm_ace_finetuning.jsonl", "w") as f:
        for line in ft_data:
            f.write(json.dumps(line) + "\n")

    print("Success! Generated updated artifacts:")
    print("1. 'master_cyber_playbook.md' (Curator Memory)")
    print("2. 'trm_ace_finetuning.jsonl' (Training Data)")

Success! Generated updated artifacts:
1. 'master_cyber_playbook.md' (Curator Memory)
2. 'trm_ace_finetuning.jsonl' (Training Data)
