# SIGMA Generation Notebook
Interact with the AI/ML modal's Generate SIGMA flow via the FastAPI endpoints.


In [None]:
import httpx
import os
import json
from datetime import datetime
import pandas as pd
from pathlib import Path

API_BASE = os.getenv("CTI_API_BASE", "http://localhost:8001/api")
ARTICLES_BASE = f"{API_BASE}/articles"
DEFAULT_ARTICLE_ID = os.getenv("CTI_ARTICLE_ID", "68")
LMSTUDIO_BASE = os.getenv("LMSTUDIO_BASE", "http://localhost:1234/v1")

print(f'API base: {API_BASE}')
print(f'Articles base: {ARTICLES_BASE}')
print(f'Default article: {DEFAULT_ARTICLE_ID}')
print(f'LMStudio base: {LMSTUDIO_BASE}')


In [None]:
def get_article(article_id: str):
    url = f"{ARTICLES_BASE}/{article_id}"
    try:
        resp = httpx.get(url, timeout=10.0)
        if resp.status_code == 200:
            data = resp.json()
            title = data.get('title', '')
            print(f"✅ Article {article_id}: {title[:80]}")
            return data
        else:
            print(f"❌ Article {article_id} fetch failed: {resp.status_code} {resp.text[:200]}")
    except Exception as e:
        print(f"❌ Error fetching article {article_id}: {e}")
    return None


In [None]:
def get_lmstudio_models():
    try:
        resp = httpx.get(f"{LMSTUDIO_BASE}/models", timeout=10.0)
        if resp.status_code == 200:
            data = resp.json()
            models = [m['id'] for m in data.get('data', [])]
            if models:
                print(f"✅ LMStudio models ({len(models)}): {models[:5]}{'...' if len(models)>5 else ''}")
            else:
                print('⚠️ LMStudio returned no models')
            return models
        print(f"❌ LMStudio /models failed: {resp.status_code} {resp.text[:200]}")
    except Exception as e:
        print(f"❌ Error fetching LMStudio models: {e}")
    return []

def set_lmstudio_model(model_id: str):
    try:
        resp = httpx.post(f"{API_BASE}/settings", json={'key': 'lmstudio_model', 'value': model_id}, timeout=10.0)
        if resp.status_code == 200:
            print(f"✅ Set lmstudio_model to {model_id}")
            return True
        print(f"⚠️ Could not persist lmstudio_model (status {resp.status_code}): {resp.text[:200]}")
    except Exception as e:
        print(f"⚠️ Error setting lmstudio_model: {e}")
    return False

def generate_sigma(article_id, model_id, prompt_override, author='Notebook User', force=False, skip_matching=False):
    # Ensure backend uses the selected LMStudio model
    if model_id:
        set_lmstudio_model(model_id)

    url = f"{ARTICLES_BASE}/{article_id}/generate-sigma"
    payload = {
        'ai_model': 'lmstudio',
        'author_name': author,
        'force_regenerate': force,
        'skip_matching': skip_matching,
    }
    if prompt_override:
        payload['prompt_override'] = prompt_override

    resp = httpx.post(url, json=payload, timeout=300.0)
    try:
        data = resp.json()
    except Exception:
        print(f"❌ Response not JSON (status {resp.status_code}): {resp.text[:400]}")
        return None

    if resp.status_code == 200:
        print(f"✅ SIGMA API call succeeded (cached={data.get('cached')})")
    else:
        print(f"❌ SIGMA API returned {resp.status_code}: {data}")
    return data


In [None]:
def summarize_rules(result):
    if not result:
        print('No result to summarize')
        return None

    rules = result.get('rules') or []
    if not rules:
        print('No rules returned.')
        return None

    rows = []
    for idx, rule in enumerate(rules, 1):
        rows.append({
            'rule': idx,
            'title': rule.get('title', '(untitled)'),
            'validated': rule.get('validated', False),
            'level': rule.get('level'),
            'content_length': len(rule.get('content', '') or ''),
        })

    df = pd.DataFrame(rows)
    display(df)
    return df


## Interactive controls
Run SIGMA generation, tweak the prompt, view validation status, and save results.


In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Stateful results
results_df = pd.DataFrame(columns=['timestamp', 'article_id', 'lmstudio_model', 'rules', 'valid_rules', 'cached', 'error'])
last_response = {}

# Load LMStudio models once at startup
lmstudio_models = get_lmstudio_models()
model_options = lmstudio_models if lmstudio_models else []
default_model = model_options[0] if model_options else None

# Widgets
article_input = widgets.Text(value=str(DEFAULT_ARTICLE_ID), description='Article ID', layout=widgets.Layout(width='180px'))
model_dropdown = widgets.Dropdown(options=model_options, value=default_model, description='LMStudio model', layout=widgets.Layout(width='420px'))
refresh_models_button = widgets.Button(description='Refresh models', button_style='', tooltip='Refresh LMStudio model list')
prompt_input = widgets.Textarea(value=default_prompt, description='Prompt', layout=widgets.Layout(width='100%', height='160px'))
author_input = widgets.Text(value='Notebook User', description='Author', layout=widgets.Layout(width='250px'))
force_toggle = widgets.Checkbox(value=False, description='Force regenerate')
skip_matching_toggle = widgets.Checkbox(value=False, description='Skip matching phase')
run_button = widgets.Button(description='Generate SIGMA', button_style='primary')
save_json_button = widgets.Button(description='Save last JSON', button_style='success')

output_area = widgets.Output()

def refresh_models(_=None):
    global lmstudio_models
    models = get_lmstudio_models()
    if not models:
        model_dropdown.options = []
        model_dropdown.value = None
        return
    lmstudio_models = models
    model_dropdown.options = models
    model_dropdown.value = models[0]

def on_run_clicked(_):
    global last_response, results_df
    with output_area:
        clear_output()
        article_id = article_input.value.strip()
        model_id = model_dropdown.value
        prompt_text = prompt_input.value.strip()
        author = author_input.value.strip() or 'Notebook User'

        if not model_id:
            print('❌ Select an LMStudio model first')
            return

        article = get_article(article_id)
        if not article:
            return

        result = generate_sigma(article_id, model_id=model_id, prompt_override=prompt_text, author=author, force=force_toggle.value, skip_matching=skip_matching_toggle.value)
        last_response = result or {}
        if not result:
            return

        rules = result.get('rules') or []
        valid_rules = sum(1 for r in rules if r.get('validated'))
        coverage = result.get('coverage_summary', {})
        matched = result.get('matched_rules') or []

        print(f"Rules returned: {len(rules)} (valid: {valid_rules})")
        if coverage:
            print(f"Coverage summary: {coverage}")
        if matched:
            print(f"Matched existing rules: {len(matched)}")

        summarize_rules(result)

        conversation = (result.get('metadata') or {}).get('conversation') or []
        if conversation:
            print(f"Conversation attempts: {len(conversation)}; last attempt valid={conversation[-1].get('all_valid')}")

        results_df.loc[len(results_df)] = {
            'timestamp': datetime.now().isoformat(),
            'article_id': article_id,
            'lmstudio_model': model_id,
            'rules': len(rules),
            'valid_rules': valid_rules,
            'cached': result.get('cached'),
            'error': result.get('error'),
        }

def on_save_json(_):
    if not last_response:
        with output_area:
            print('No response to save yet.')
        return
    fname = f"sigma_generation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    Path(fname).write_text(json.dumps(last_response, indent=2))
    with output_area:
        print(f"Saved last response to {fname}")

refresh_models_button.on_click(refresh_models)
run_button.on_click(on_run_clicked)
save_json_button.on_click(on_save_json)

print('Controls ready – select article/model, tweak prompt, and click Generate SIGMA.')


In [None]:
display(widgets.VBox([
    widgets.HTML('<h3>🚀 Generate SIGMA via API</h3>'),
    widgets.HBox([article_input, refresh_models_button]),
    model_dropdown,
    prompt_input,
    widgets.HBox([force_toggle, skip_matching_toggle]),
    author_input,
    widgets.HBox([run_button, save_json_button]),
    output_area
]))


In [None]:
# Inspect accumulated runs
if not results_df.empty:
    display(results_df)
else:
    print('No runs yet.')


In [None]:
# Peek at last raw response
if last_response:
    print(json.dumps(last_response, indent=2)[:2000])
else:
    print('Run a generation first to see raw JSON.')
