# Local Coding Agent (Kaggle Notebook)
This notebook adds the agent runtime, tools, FastAPI endpoints, and ngrok wiring.
Model loading is assumed to be available (tokenizer + model).

In [None]:
# Cell 1: Install dependencies (skip if already installed)
!pip -q install -U \
  fastapi uvicorn pyngrok \
  transformers bitsandbytes accelerate sentencepiece


In [None]:
# Cell 2: Imports + config
import os
import json
import re
import subprocess
import threading
from dataclasses import dataclass
from typing import Dict, Any, List, Optional, Tuple

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from pyngrok import ngrok

WORKSPACE_DIR = "/kaggle/working/workspace"
os.makedirs(WORKSPACE_DIR, exist_ok=True)

API_PORT = 8000
DEFAULT_CMD_TIMEOUT = 120

print(f"Workspace: {WORKSPACE_DIR}")


In [None]:
# Cell 3: Validate model/tokenizer are loaded (skip if already done)
try:
    tokenizer
    model
    print("Model and tokenizer already loaded.")
except NameError:
    raise RuntimeError("Load the model and tokenizer before running the agent cells.")


In [None]:
# Cell 4: Tools (safe file IO + command execution)
def _safe_path(path: str) -> str:
    if not path:
        raise ValueError("Path is required.")
    if path.startswith("/"):
        raise ValueError("Absolute paths are not allowed.")
    normalized = os.path.normpath(os.path.join(WORKSPACE_DIR, path))
    if not normalized.startswith(WORKSPACE_DIR):
        raise ValueError("Path escapes workspace.")
    return normalized

def read_file(path: str) -> str:
    full_path = _safe_path(path)
    with open(full_path, "r", encoding="utf-8") as f:
        return f.read()

def write_file(path: str, content: str) -> str:
    full_path = _safe_path(path)
    os.makedirs(os.path.dirname(full_path), exist_ok=True)
    with open(full_path, "w", encoding="utf-8") as f:
        f.write(content)
    return f"Wrote {len(content)} bytes to {path}"

def list_dir(path: str) -> str:
    target = path or "."
    full_path = _safe_path(target)
    return "\n".join(sorted(os.listdir(full_path)))

_BLOCKED_PATTERNS = [
    "rm -rf",
    "sudo",
    "shutdown",
    "reboot",
    "mkfs",
    "dd",
    ":(){:|:&};:",
]

@dataclass
class ProcInfo:
    pid: int
    cmd: str
    port: Optional[int]
    process: subprocess.Popen

_processes: Dict[str, ProcInfo] = {}
_tunnels: Dict[int, str] = {}

def _is_blocked(cmd: str) -> bool:
    lowered = cmd.lower()
    return any(pat in lowered for pat in _BLOCKED_PATTERNS)

def run_cmd(cmd: str, timeout: int = DEFAULT_CMD_TIMEOUT, background: bool = False) -> Dict[str, Any]:
    if _is_blocked(cmd):
        return {
            "stdout": "",
            "stderr": "Blocked potentially destructive command.",
            "exit_code": 1,
        }
    try:
        if background:
            process = subprocess.Popen(
                cmd,
                cwd=WORKSPACE_DIR,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            )
            proc_id = f"proc-{process.pid}"
            _processes[proc_id] = ProcInfo(pid=process.pid, cmd=cmd, port=None, process=process)
            return {
                "stdout": "",
                "stderr": "",
                "exit_code": 0,
                "process_id": proc_id,
            }
        result = subprocess.run(
            cmd,
            cwd=WORKSPACE_DIR,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout,
        )
        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "exit_code": result.returncode,
        }
    except subprocess.TimeoutExpired:
        return {
            "stdout": "",
            "stderr": f"Command timed out after {timeout}s.",
            "exit_code": 124,
        }
    except Exception as exc:
        return {
            "stdout": "",
            "stderr": str(exc),
            "exit_code": 1,
        }

def stop_process(proc_id: str) -> Dict[str, Any]:
    info = _processes.get(proc_id)
    if not info:
        return {
            "stdout": "",
            "stderr": "Process not found.",
            "exit_code": 1,
        }
    info.process.terminate()
    info.process.wait(timeout=10)
    _processes.pop(proc_id, None)
    return {
        "stdout": f"Stopped {proc_id}",
        "stderr": "",
        "exit_code": 0,
    }

def ensure_ngrok_tunnel(port: int) -> str:
    if port in _tunnels:
        return _tunnels[port]
    public_url = ngrok.connect(port).public_url
    _tunnels[port] = public_url
    print(f"ngrok tunnel for {port}: {public_url}")
    return public_url


In [None]:
# Cell 5: Agent prompt + tool parsing
SYSTEM_PROMPT = """
You are DevAgent, a local coding agent optimized for frontend development (React, Next.js, Vite, TS, CSS).
You must emit tool calls using EXACTLY this format:

TOOL: <tool_name>
ARGS:
<arguments as plain text>

Tool args can be either:
- A plain text blob (e.g., run_cmd command)
- OR key=value lines, e.g.:
  path=src/App.tsx
  content=...file contents...

Available tools: read_file, write_file, list_dir, run_cmd.
Rules:
- Only use paths inside /kaggle/working/workspace.
- Avoid destructive commands (rm -rf, sudo, shutdown, reboot, mkfs, dd, fork bombs).
- Use tools for file or shell interactions.
""".strip()

conversation_history: List[Dict[str, str]] = [
    {"role": "system", "content": SYSTEM_PROMPT}
]

def _parse_key_values(text: str) -> Dict[str, str]:
    result = {}
    for line in text.splitlines():
        if "=" in line:
            key, value = line.split("=", 1)
            result[key.strip()] = value
    return result

def _extract_tool_calls(text: str) -> List[Dict[str, str]]:
    pattern = re.compile(r"TOOL:\s*(\w+)\s*\nARGS:\s*([\s\S]*?)(?=\nTOOL:|$)")
    calls = []
    for match in pattern.finditer(text):
        calls.append({
            "tool": match.group(1).strip(),
            "args": match.group(2).strip(),
        })
    return calls

def _format_tool_result(tool_name: str, result: Any) -> str:
    payload = result if isinstance(result, str) else json.dumps(result, indent=2)
    return f"TOOL_RESULT: {tool_name}\n{payload}"


In [None]:
# Cell 6: Agent loop
def _generate_model_reply(messages: List[Dict[str, str]]) -> str:
    prompt = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
    )
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=512,
        temperature=0.2,
        do_sample=True,
    )
    generated = outputs[0][inputs["input_ids"].shape[-1]:]
    return tokenizer.decode(generated, skip_special_tokens=True).strip()

def _call_tool(tool_name: str, args: str) -> Tuple[Any, str]:
    parsed = _parse_key_values(args)
    if tool_name == "read_file":
        path = parsed.get("path", args)
        return read_file(path), ""
    if tool_name == "write_file":
        if parsed:
            path = parsed.get("path", "")
            content = parsed.get("content", "")
        else:
            path, content = args.split("\n", 1)
        return write_file(path.strip(), content), ""
    if tool_name == "list_dir":
        path = parsed.get("path", args)
        return list_dir(path.strip()), ""
    if tool_name == "run_cmd":
        cmd = parsed.get("cmd", args)
        timeout = int(parsed.get("timeout", DEFAULT_CMD_TIMEOUT))
        background = parsed.get("background", "false").lower() == "true"
        return run_cmd(cmd, timeout=timeout, background=background), ""
    return {"error": f"Unknown tool: {tool_name}"}, ""

def run_agent(user_message: str, max_loops: int = 6) -> Dict[str, str]:
    conversation_history.append({"role": "user", "content": user_message})
    logs: List[str] = []

    for _ in range(max_loops):
        assistant_reply = _generate_model_reply(conversation_history)
        tool_calls = _extract_tool_calls(assistant_reply)
        if not tool_calls:
            conversation_history.append({"role": "assistant", "content": assistant_reply})
            return {"reply": assistant_reply, "logs": "\n\n".join(logs)}

        conversation_history.append({"role": "assistant", "content": assistant_reply})

        for call in tool_calls:
            result, _ = _call_tool(call["tool"], call["args"])
            log_entry = _format_tool_result(call["tool"], result)
            logs.append(log_entry)
            conversation_history.append({"role": "user", "content": log_entry})

    return {
        "reply": "Agent stopped after reaching max tool loops.",
        "logs": "\n\n".join(logs),
    }


In [None]:
# Cell 7: FastAPI endpoints
app = FastAPI()

class ChatRequest(BaseModel):
    message: str

class FileRequest(BaseModel):
    path: str
    content: str

class CmdRequest(BaseModel):
    cmd: str
    timeout: Optional[int] = DEFAULT_CMD_TIMEOUT
    background: Optional[bool] = False

class ServerRequest(BaseModel):
    cmd: str
    port: int

class StopRequest(BaseModel):
    process_id: str

@app.get("/health")
def health():
    return {"status": "ok"}

@app.post("/reset")
def reset():
    conversation_history.clear()
    conversation_history.append({"role": "system", "content": SYSTEM_PROMPT})
    return {"status": "reset"}

@app.get("/files")
def files(path: str = "."):
    return {"entries": list_dir(path)}

@app.get("/file")
def get_file(path: str):
    return {"content": read_file(path)}

@app.post("/file")
def put_file(req: FileRequest):
    return {"result": write_file(req.path, req.content)}

@app.post("/cmd")
def cmd(req: CmdRequest):
    return run_cmd(req.cmd, timeout=req.timeout or DEFAULT_CMD_TIMEOUT, background=bool(req.background))

@app.post("/run_server")
def run_server(req: ServerRequest):
    result = run_cmd(req.cmd, timeout=DEFAULT_CMD_TIMEOUT, background=True)
    process_id = result.get("process_id")
    if not process_id:
        return result
    proc_info = _processes.get(process_id)
    if proc_info:
        proc_info.port = req.port
    app_url = ensure_ngrok_tunnel(req.port)
    return {
        **result,
        "app_url": app_url,
    }

@app.post("/stop_server")
def stop_server(req: StopRequest):
    return stop_process(req.process_id)

@app.post("/chat")
def chat(req: ChatRequest):
    return run_agent(req.message)

def _run_api():
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=API_PORT, log_level="info")

api_thread = threading.Thread(target=_run_api, daemon=True)
api_thread.start()

print(f"FastAPI running on port {API_PORT}.")


In [None]:
# Cell 8: ngrok tunnel for the API
ngrok_auth_token = os.getenv("NGROK_AUTHTOKEN")
if ngrok_auth_token:
    ngrok.set_auth_token(ngrok_auth_token)

api_public_url = ensure_ngrok_tunnel(API_PORT)
print(f"Agent API URL: {api_public_url}")
print("Use POST /chat on this URL.")
