In [None]:
#!/usr/bin/env python3
"""
OpenRouter Frequency & Amplitude Generator (works in Colab, VSCode, or any Python env).

This is a single-file Gradio app that:
- Sends a user prompt to an OpenRouter-compatible endpoint (default: https://openrouter.ai/api/v1/chat/completions).
- Forces the model to return a concise JSON with frequency and amplitude.
- Parses the model output (JSON first, then regex heuristics).
- Persists parsed variables to a local JSON file and sets in-process globals and environment variables
  so other code running in the same process can access them.
- Includes diagnostics and a "list models" helper to discover valid model IDs.

How it behaves across environments:
- It no longer tries to set IPython notebook globals (Colab-specific). Instead it:
  - Writes ./last_vars.json (configurable) with the last parsed values.
  - Sets module-level globals x and y (useful only within the same Python process).
  - Sets environment variables FREQUENCY_HZ and AMPLITUDE_V (visible to subprocesses started after the set).
  This makes the tool environment-agnostic and compatible with VSCode, terminals, servers, and Colab.

Run (example):
1. pip install gradio requests
2. python openrouter_generator.py
3. Open the Gradio link printed in your terminal.

Security:
- Do NOT commit your API key. Paste it into the Gradio UI at runtime.
- If your key has been exposed publicly, rotate/revoke it immediately.
"""

import json
import re
import socket
import urllib.parse
import os
from typing import Tuple, Dict, Any, List

import requests
import gradio as gr

# Defaults
DEFAULT_OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
DEFAULT_MODEL_UI = "x-ai/grok-4.1-fast:free"

# Where to persist the last parsed variables (change if you prefer)
VARS_FILE_PATH = "./last_vars.json"

# Module-level globals (set after a successful parse; available only in this Python process)
x = None  # Hz (int)
y = None  # Volts (float)


def build_instruction(user_prompt: str) -> str:
    return (
        "You are an assistant that MUST respond with a concise machine-friendly result.\n\n"
        "User prompt: " + user_prompt + "\n\n"
        "Also: create specific frequency and amplitude based on what you think we want most.\n\n"
        "OUTPUT REQUIREMENT (MANDATORY):\n"
        "- Return ONLY a single JSON object (no surrounding text) with exactly these keys:\n"
        '  frequency_hz : integer frequency in Hertz (e.g. 1000)\n'
        '  amplitude_v  : float amplitude in Volts (e.g. 3.0)\n'
        '  x : duplicate of frequency_hz (integer)\n'
        '  y : duplicate of amplitude_v (float)\n\n'
        "Example output (for one thousand Hz and three volts):\n"
        '{"frequency_hz": 1000, "amplitude_v": 3.0, "x": 1000, "y": 3.0}\n\n'
        "If you cannot infer exact numbers, pick reasonable defaults and set them.\n"
        "Do not include any extra commentary, explanation, or text outside the JSON object."
    )


def send_to_openrouter(api_key: str, endpoint: str, system_msg: str, user_msg: str, model: str) -> Dict[str, Any]:
    """
    Send a chat-style request to the OpenRouter-compatible endpoint.
    Returns the parsed JSON response (or raises RuntimeError on connectivity/HTTP errors).
    """
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    payload = {
        "model": model,
        "messages": [
            {"role": "system", "content": system_msg},
            {"role": "user", "content": user_msg},
        ],
        "temperature": 0.2,
        "max_tokens": 250,
        "top_p": 0.9,
    }

    try:
        resp = requests.post(endpoint, headers=headers, json=payload, timeout=30)
    except requests.RequestException as e:
        raise RuntimeError(f"Connection error while contacting {endpoint}: {e}")

    if not resp.ok:
        try:
            body = resp.json()
        except Exception:
            body = resp.text
        raise RuntimeError(f"HTTP {resp.status_code} from {endpoint}: {body}")

    try:
        return resp.json()
    except Exception as e:
        raise RuntimeError(f"Failed to parse JSON response from {endpoint}: {e} -- raw: {resp.text}")


def extract_assistant_text_from_response(resp_json: Dict[str, Any]) -> str:
    """
    Extract the assistant's textual response from common OpenAI/OpenRouter-compatible shapes.
    """
    if not isinstance(resp_json, dict):
        return json.dumps(resp_json)

    choices = resp_json.get("choices")
    if choices and isinstance(choices, list) and len(choices) > 0:
        first = choices[0]
        if isinstance(first.get("message"), dict):
            content = first["message"].get("content")
            if isinstance(content, str):
                return content
            if isinstance(content, dict) and "parts" in content:
                parts = content.get("parts")
                if isinstance(parts, list):
                    return "".join(parts)
        if "text" in first and isinstance(first["text"], str):
            return first["text"]

    for k in ("output", "text", "response", "content"):
        if k in resp_json and isinstance(resp_json[k], str):
            return resp_json[k]

    return json.dumps(resp_json, indent=2)


def parse_response_for_vars(text: str) -> Dict[str, Any]:
    """
    Parse frequency and amplitude from text. Returns {'frequency_hz': int|None, 'amplitude_v': float|None}
    """
    text_stripped = (text or "").strip()

    # Try JSON extraction first
    json_obj = None
    try:
        json_obj = json.loads(text_stripped)
    except Exception:
        m = re.search(r"\{[\s\S]*?\}", text_stripped)
        if m:
            try:
                json_obj = json.loads(m.group(0))
            except Exception:
                json_obj = None

    frequency_hz = None
    amplitude_v = None

    if isinstance(json_obj, dict):
        for key in ["frequency_hz", "frequency", "freq_hz", "freq", "x"]:
            if key in json_obj:
                try:
                    frequency_hz = int(round(float(json_obj[key])))
                    break
                except Exception:
                    pass
        for key in ["amplitude_v", "amplitude", "amp_v", "amp", "y"]:
            if key in json_obj:
                try:
                    amplitude_v = float(json_obj[key])
                    break
                except Exception:
                    pass

    # Regex heuristics if JSON parsing didn't work
    if frequency_hz is None:
        m = re.search(r"([0-9]+(?:\.[0-9]+)?)\s*(k|K)?\s*(?:hz|Hz|HZ)?\b", text_stripped)
        if m:
            try:
                num = float(m.group(1))
                mult = 1000.0 if (m.group(2) and m.group(2).lower() == "k") else 1.0
                frequency_hz = int(round(num * mult))
            except Exception:
                pass

    if amplitude_v is None:
        m = re.search(r"([0-9]+(?:\.[0-9]+)?)\s*(m?V|v|V)\b", text_stripped)
        if m:
            try:
                val = float(m.group(1))
                unit = m.group(2)
                amplitude_v = val / 1000.0 if unit.lower().startswith("mv") else float(val)
            except Exception:
                pass

    if frequency_hz is None:
        m = re.search(r"x\s*=\s*([0-9]+(?:\.[0-9]+)?)", text_stripped)
        if m:
            try:
                frequency_hz = int(round(float(m.group(1))))
            except Exception:
                pass

    if amplitude_v is None:
        m = re.search(r"y\s*=\s*([0-9]+(?:\.[0-9]+)?)", text_stripped)
        if m:
            try:
                amplitude_v = float(m.group(1))
            except Exception:
                pass

    # Basic validation
    if isinstance(frequency_hz, (int, float)):
        frequency_hz = int(round(float(frequency_hz)))
        if frequency_hz <= 0 or frequency_hz > 10_000_000:
            frequency_hz = None
    if isinstance(amplitude_v, (int, float)):
        amplitude_v = float(amplitude_v)
        if amplitude_v <= 0 or amplitude_v > 1e6:
            amplitude_v = None
        else:
            amplitude_v = float(round(amplitude_v, 6))

    return {"frequency_hz": frequency_hz, "amplitude_v": amplitude_v}


def persist_and_set_vars(parsed: Dict[str, Any], persist_path: str = VARS_FILE_PATH) -> str:
    """
    Persist the parsed values to a JSON file and set process-level globals and environment variables.
    Returns a status message.
    """
    global x, y
    f = parsed.get("frequency_hz")
    a = parsed.get("amplitude_v")
    lines = []

    if f is None or a is None:
        lines.append("Parsing incomplete: not setting globals/environment variables, but will still persist whatever was parsed.")
    else:
        # Set module globals (only visible inside this process)
        x = int(f)
        y = float(a)
        lines.append(f"Set in-process globals: x = {x} (Hz), y = {y} (V)")

        # Set environment variables (visible to new subprocesses spawned after this)
        os.environ["FREQUENCY_HZ"] = str(x)
        os.environ["AMPLITUDE_V"] = str(y)
        lines.append("Set environment variables: FREQUENCY_HZ, AMPLITUDE_V")

    # Persist to file
    try:
        data = {"frequency_hz": f, "amplitude_v": a, "x": f, "y": a}
        with open(persist_path, "w") as fh:
            json.dump(data, fh)
        lines.append(f"Wrote variables to {persist_path}")
    except Exception as e:
        lines.append(f"Failed to write variables to {persist_path}: {e}")

    return "\n".join(lines)


def generate_and_parse(api_key: str, endpoint: str, model: str, prompt: str) -> Tuple[str, Dict[str, Any], str, str]:
    """
    High-level flow: call OpenRouter, extract text, parse, persist/set, and produce code snippet + status.
    """
    if not api_key:
        return "ERROR: Missing API key (do not paste keys into public notebooks).", {}, "", ""

    system_instruction = build_instruction(prompt)
    try:
        resp_json = send_to_openrouter(api_key=api_key, endpoint=endpoint, system_msg=system_instruction, user_msg=prompt, model=model)
    except RuntimeError as e:
        err_msg = str(e)
        suggestion = ""
        if "is not a valid model" in err_msg or '"is not a valid model' in err_msg:
            suggestion = "\n\nDetected server-side invalid-model error. Click 'List available models' to fetch valid IDs from the API."
        return f"ERROR contacting OpenRouter: {err_msg}{suggestion}", {}, "", ""

    assistant_text = extract_assistant_text_from_response(resp_json)
    parsed = parse_response_for_vars(assistant_text)

    f = parsed.get("frequency_hz")
    a = parsed.get("amplitude_v")

    if f is not None and a is not None:
        code_snippet = (
            f"// Standard variables\n"
            f"frequency_hz = {int(f)}  # integer Hz\n"
            f"amplitude_v = {float(round(a,6))}  # volts\n"
            f"x = {int(f)}\n"
            f"y = {float(round(a,6))}\n"
        )
    else:
        code_snippet = "# Could not parse both frequency and amplitude from model output.\n# See raw response above."

    status = persist_and_set_vars(parsed, persist_path=VARS_FILE_PATH)

    return assistant_text, parsed, code_snippet, status


# --- Model listing helper and diagnostics (unchanged behavior) ---


def try_model_list_endpoint(api_key: str, url: str) -> Tuple[bool, Any]:
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    try:
        r = requests.get(url, headers=headers, timeout=15)
    except requests.RequestException as e:
        return False, f"Connection error: {e}"

    if not r.ok:
        try:
            return False, r.json()
        except Exception:
            return False, r.text

    try:
        return True, r.json()
    except Exception as e:
        return False, f"Failed to parse JSON: {e} -- raw: {r.text}"


def list_models(api_key: str, endpoint: str) -> Tuple[str, List[str]]:
    if not api_key:
        return "ERROR: Missing API key.", []

    try:
        parsed = urllib.parse.urlparse(endpoint)
        base = f"{parsed.scheme}://{parsed.netloc}"
    except Exception:
        base = endpoint.rstrip("/")

    candidate_paths = [
        f"{base}/api/v1/models",
        f"{base}/api/v1/models?include_all=true",
        f"{base}/api/models",
        f"{base}/v1/models",
        f"{base}/api/v1/engines",
        f"{base}/v1/engines",
    ]

    diagnostics = []
    found_models = set()

    for url in candidate_paths:
        diagnostics.append(f"Trying: {url}")
        ok, result = try_model_list_endpoint(api_key, url)
        if not ok:
            diagnostics.append(f"  -> failed: {result}")
            continue

        diagnostics.append(f"  -> success: got JSON (attempting to extract model ids)")
        try:
            if isinstance(result, dict):
                if "models" in result and isinstance(result["models"], list):
                    for m in result["models"]:
                        if isinstance(m, dict):
                            if "id" in m:
                                found_models.add(str(m["id"]))
                            elif "model" in m:
                                found_models.add(str(m["model"]))
                            elif "name" in m:
                                found_models.add(str(m["name"]))
                            else:
                                found_models.add(json.dumps(m))
                        else:
                            found_models.add(str(m))
                elif "data" in result and isinstance(result["data"], list):
                    for m in result["data"]:
                        if isinstance(m, dict):
                            if "id" in m:
                                found_models.add(str(m["id"]))
                            elif "model" in m:
                                found_models.add(str(m["model"]))
                            elif "name" in m:
                                found_models.add(str(m["name"]))
                            else:
                                found_models.add(json.dumps(m))
                        else:
                            found_models.add(str(m))
                else:
                    text = json.dumps(result)
                    candidates = re.findall(r'"([a-zA-Z0-9_\-\.]{3,60})"', text)
                    for c in candidates:
                        if len(c) > 3 and not c.lower().startswith("http") and c.lower() not in ("true", "false", "null"):
                            found_models.add(c)
            else:
                diagnostics.append(f"  -> unexpected response type: {type(result)}")
        except Exception as e:
            diagnostics.append(f"  -> extraction error: {e}")
        if found_models:
            break

    diag_text = "\n".join(diagnostics)
    return diag_text, sorted(found_models)


def run_diagnostics(endpoint: str) -> str:
    try:
        parsed = urllib.parse.urlparse(endpoint)
        host = parsed.hostname
        if not host:
            return f"Invalid endpoint URL: {endpoint}"
    except Exception as e:
        return f"Invalid endpoint URL parse error: {e}"

    out_lines = []
    out_lines.append(f"Endpoint: {endpoint}")
    out_lines.append(f"Resolved host: {host}")

    try:
        addrs = socket.getaddrinfo(host, None)
        uniq_addrs = sorted({addr[4][0] for addr in addrs})
        out_lines.append(f"DNS lookup: SUCCESS, addresses: {', '.join(uniq_addrs)}")
    except Exception as e:
        out_lines.append(f"DNS lookup: FAILURE ({e})")
        out_lines.append("If DNS lookup fails, possible causes:\n - network restrictions\n - typo in domain\n - temporary DNS outage")
        return "\n".join(out_lines)

    try:
        test_headers = {"User-Agent": "openrouter-diagnostic/1.0"}
        resp = requests.options(endpoint, headers=test_headers, timeout=10)
        out_lines.append(f"HTTP OPTIONS: status {resp.status_code} - {resp.reason}")
        if "server" in resp.headers:
            out_lines.append(f"Server header: {resp.headers['server']}")
    except requests.RequestException as e:
        out_lines.append(f"HTTP connectivity test: FAILURE ({e})")
        try:
            resp2 = requests.post(endpoint, json={"model": "test", "messages": [{"role": "system", "content": "ping"}, {"role": "user", "content": "ping"}]}, timeout=10)
            out_lines.append(f"HTTP POST attempt: status {resp2.status_code} - {resp2.reason}")
            out_lines.append(f"Response body (first 512 chars): {resp2.text[:512]}")
        except Exception as e2:
            out_lines.append(f"HTTP POST attempt: FAILURE ({e2})")
    return "\n".join(out_lines)


# --- Gradio UI ---

with gr.Blocks(title="OpenRouter Frequency & Amplitude Generator") as demo:
    gr.Markdown(
        "OpenRouter Frequency & Amplitude Generator.\n\n"
        "**Security reminder:** If your API key has been posted publicly, revoke/rotate it now."
    )

    with gr.Row():
        api_key_input = gr.Textbox(label="OpenRouter API Key (paste here, kept only in memory)", type="password")
        endpoint_input = gr.Textbox(label="Endpoint URL", value=DEFAULT_OPENROUTER_URL)
        model_input = gr.Textbox(label="Model (optional, try listing models)", value=DEFAULT_MODEL_UI)

    prompt_input = gr.Textbox(label="Prompt", placeholder="e.g. I want a tone around 1 kHz and amplitude ~3 volts.", lines=3)
    with gr.Row():
        diag_button = gr.Button("Run diagnostics")
        list_models_button = gr.Button("List available models")
        gen_button = gr.Button("Generate")

    diag_out = gr.Textbox(label="Diagnostics output", lines=12)
    models_out = gr.Textbox(label="Available model IDs (suggestions)", lines=8)
    raw_out = gr.Textbox(label="Raw model response", lines=10)
    parsed_out = gr.JSON(label="Parsed variables")
    code_out = gr.Textbox(label="Code snippet", lines=6)
    status_out = gr.Textbox(label="Status (persistence / env / in-process)", lines=6)

    def on_diag(endpoint):
        return run_diagnostics(endpoint.strip())

    def on_list_models(api_key, endpoint):
        diag_text, models = list_models(api_key.strip(), endpoint.strip())
        if not models:
            return diag_text
        return diag_text + "\n\nFound model IDs:\n" + "\n".join(models)

    def on_generate(api_key, endpoint, model, prompt):
        raw, parsed, code, status = generate_and_parse(api_key.strip(), endpoint.strip(), model.strip() or DEFAULT_MODEL_UI, prompt.strip())
        return raw, parsed, code, status

    diag_button.click(on_diag, inputs=[endpoint_input], outputs=[diag_out])
    list_models_button.click(on_list_models, inputs=[api_key_input, endpoint_input], outputs=[models_out])
    gen_button.click(on_generate, inputs=[api_key_input, endpoint_input, model_input, prompt_input], outputs=[raw_out, parsed_out, code_out, status_out])

if __name__ == "__main__":
    demo.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://1947cde31b5935ec5d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


In [None]:
print("Hz:", x, ", V:", y)

Hz: 1600 , V: 1.0
