# Sovereign Studio: Step-by-Step Tutorial (MCP-style, No-Egress)

This notebook scaffolds a teaching prototype of a local-first creative studio architecture:

- **Step 1**: `studio_config.json` as ground truth (local transport + no-egress policy)
- **Step 2**: a Python stdio "MCP-like" server + firewall that blocks cloud-saving behaviors
- **Step 3**: an "IP Warden" using embeddings + cosine similarity
- **Step 4**: `inspiration.jsonl` append-only heartbeat + drift detection
- **Step 5**: hot-reload tool registration when a new need appears

Colab note: true no-egress should be enforced at OS/container/network policy. Here we also demonstrate **process-level socket blocking** to teach the idea.

In [None]:
from __future__ import annotations

import json
import os
import re
import socket
import sys
import time
from dataclasses import dataclass
from hashlib import sha256
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

ROOT = Path.cwd() / "sovereign_studio_demo"
ROOT.mkdir(exist_ok=True)
print("Workspace:", ROOT)

## Step 1: Meta-Architect Initialization (The Seed)

We create `studio_config.json` as an explicit, versionable "ground truth."

Key: local stdio transport (conceptually MCP stdio) and a denylist policy for egress patterns.

In [None]:
studio_config = {
  "transport": {
    "type": "stdio",
    "notes": "All agent-server communication via stdin/stdout pipes. No network required."
  },
  "no_egress": {
    "enabled": True,
    "deny_tool_name_patterns": [
      "save_to_cloud",
      "upload",
      "gdrive",
      "dropbox",
      "s3",
      "googleapis",
      "http",
      "https"
    ],
    "deny_socket": True,
    "message": "Blocked by Sovereign Studio No-Egress policy."
  },
  "ip_warden": {
    "similarity_threshold": 0.85,
    "embedding": {
      "type": "local_demo",
      "notes": "Notebook uses a deterministic local embedding stub. Swap with CLIP/open_clip locally."
    }
  },
  "heartbeat": {
    "ledger": "inspiration.jsonl",
    "cadence_hours": 4,
    "drift": {
      "min_vocab_diversity": 0.22,
      "min_sentiment_range": 0.18,
      "notes": "Simple heuristics to teach burnout drift detection."
    }
  }
}

cfg_path = ROOT / "studio_config.json"
cfg_path.write_text(json.dumps(studio_config, indent=2))
print(cfg_path.read_text())

## Step 2: Head of Security (Firewall)

We implement two layers:

1) **Tool firewall**: blocks tool calls by name pattern.
2) **Socket firewall (process-level)**: monkeypatches `socket.socket.connect` so network calls fail fast.

In production you'd also enforce no-egress outside the process (container network policy, host firewall).

In [None]:
@dataclass
class NoEgressPolicy:
    enabled: bool
    deny_tool_name_patterns: List[str]
    deny_socket: bool
    message: str

    def blocks_tool(self, tool_name: str) -> bool:
        if not self.enabled:
            return False
        ln = tool_name.lower()
        return any(pat.lower() in ln for pat in self.deny_tool_name_patterns)


def install_socket_firewall(enabled: bool, message: str = "No-Egress") -> Callable[[], None]:
    """Install a process-level socket connect blocker. Returns an uninstall function."""
    if not enabled:
        return lambda: None

    orig_connect = socket.socket.connect

    def blocked_connect(self, address):
        raise OSError(f"{message} (socket connect blocked): {address}")

    socket.socket.connect = blocked_connect  # type: ignore[assignment]

    def uninstall():
        socket.socket.connect = orig_connect  # type: ignore[assignment]

    return uninstall


policy = NoEgressPolicy(**studio_config["no_egress"])  # type: ignore[arg-type]
uninstall_socket_fw = install_socket_firewall(policy.deny_socket, policy.message)
print("No-Egress policy loaded. Socket firewall:", policy.deny_socket)

### Minimal stdio "MCP-like" server

We'll implement a tiny JSON-RPC-ish loop that reads a request from stdin and writes a response to stdout.

Request shape:

```json
{ "id": "1", "tool": "tool_name", "args": { ... } }
```

This is purely to teach the architecture. Swap with actual MCP SDK server when you wire this into your real stack.

In [None]:
ToolFn = Callable[[Dict[str, Any]], Dict[str, Any]]


class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, ToolFn] = {}

    def register(self, name: str, fn: ToolFn) -> None:
        self.tools[name] = fn

    def call(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
        if name not in self.tools:
            return {"ok": False, "error": f"Unknown tool: {name}"}
        return self.tools[name](args)


class FirewallServer:
    def __init__(self, registry: ToolRegistry, policy: NoEgressPolicy):
        self.registry = registry
        self.policy = policy

    def handle(self, req: Dict[str, Any]) -> Dict[str, Any]:
        rid = req.get("id", None)
        tool = str(req.get("tool", ""))
        args = req.get("args", {})
        if not isinstance(args, dict):
            args = {"_raw": args}

        if self.policy.blocks_tool(tool):
            return {"id": rid, "ok": False, "error": self.policy.message, "blocked": True, "tool": tool}

        out = self.registry.call(tool, args)
        return {"id": rid, **out, "tool": tool}


def run_stdio_demo(server: FirewallServer, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Notebook helper: simulate stdio requests without actually looping on stdin."""
    responses = []
    for r in requests:
        responses.append(server.handle(r))
    return responses


registry = ToolRegistry()
server = FirewallServer(registry, policy)

print("Server ready.")

## Step 3: IP Warden (The Shield)

We need:

- a "signature style" embedding
- a scanner that embeds candidate items and computes cosine similarity
- a trigger (here: `dmca_draft_generator`) when similarity exceeds threshold

This notebook uses a deterministic embedding stub (hash-to-vector) so the tutorial works everywhere.
Locally, swap this stub with CLIP/open_clip embeddings for real image/text inputs.

In [None]:
import math
import random


def unit_normalize(v: List[float]) -> List[float]:
    n = math.sqrt(sum(x*x for x in v)) or 1.0
    return [x / n for x in v]


def cosine(a: List[float], b: List[float]) -> float:
    return float(sum(x*y for x, y in zip(a, b)))


def hash_embedding(text: str, dim: int = 256) -> List[float]:
    """Deterministic pseudo-embedding for tutorial purposes."""
    h = sha256(text.encode("utf-8")).digest()
    seed = int.from_bytes(h[:8], "big")
    rng = random.Random(seed)
    v = [rng.uniform(-1, 1) for _ in range(dim)]
    return unit_normalize(v)


SIGNATURE_PATH = ROOT / "signature_style.txt"
SIGNATURE_PATH.write_text(
    "High-contrast ink linework, bold brushstrokes, mid-century brutalist geometry,"
    " cinematic lighting, disciplined negative space, analog grain, anti-surveillance ethos."
)

signature_text = SIGNATURE_PATH.read_text()
signature_vec = hash_embedding(signature_text)

threshold = float(studio_config["ip_warden"]["similarity_threshold"])
print("Signature set. Threshold:", threshold)


def dmca_draft_generator(args: Dict[str, Any]) -> Dict[str, Any]:
    match = args.get("match", {})
    return {
        "ok": True,
        "draft": {
            "subject": "DMCA Notice (Draft)",
            "summary": "This is a teaching stub. Replace with your legal-reviewed template.",
            "match": match,
            "requested_action": "Remove or disable access to the infringing material.",
        }
    }


def ip_scan_folder(args: Dict[str, Any]) -> Dict[str, Any]:
    folder = Path(args.get("folder", ROOT / "scraped"))
    folder.mkdir(exist_ok=True)
    findings = []
    for p in sorted(folder.glob("*.txt")):
        cand_text = p.read_text(errors="ignore")
        cand_vec = hash_embedding(cand_text)
        sim = cosine(signature_vec, cand_vec)
        findings.append({"path": str(p), "similarity": sim})

    hits = [f for f in findings if f["similarity"] >= threshold]
    return {"ok": True, "findings": findings, "hits": hits}


registry.register("dmca_draft_generator", dmca_draft_generator)
registry.register("ip_scan_folder", ip_scan_folder)

# Create demo scraped items
SCRAPED = ROOT / "scraped"
SCRAPED.mkdir(exist_ok=True)
(SCRAPED / "sample_1.txt").write_text("High-contrast ink linework with bold brushstrokes and analog grain.")
(SCRAPED / "sample_2.txt").write_text("Pastel watercolor landscapes, soft gradients, whimsical children's book style.")
(SCRAPED / "sample_3.txt").write_text("Brutalist geometry, negative space, cinematic lighting, disciplined composition.")

responses = run_stdio_demo(server, [
    {"id": "scan1", "tool": "ip_scan_folder", "args": {"folder": str(SCRAPED)}},
])
responses[0]

In [None]:
# Trigger DMCA draft if any hits
scan = responses[0]
hits = scan.get("hits", []) if scan.get("ok") else []

if hits:
    top = sorted(hits, key=lambda x: x["similarity"], reverse=True)[0]
    dmca = run_stdio_demo(server, [
        {"id": "dmca1", "tool": "dmca_draft_generator", "args": {"match": top}}
    ])[0]
    print(json.dumps(dmca["draft"], indent=2))
else:
    print("No hits above threshold.")

## Step 4: Creative Clinician Heartbeat (The Pulse)

We append check-ins to an append-only JSONL ledger (`inspiration.jsonl`).
Then we compute simple drift signals:

- vocabulary diversity (unique tokens / total tokens)
- sentiment range (toy heuristic)

If drift is detected, we recommend a "Digital Sabbath."

In [None]:
LEDGER = ROOT / studio_config["heartbeat"]["ledger"]


POS_WORDS = {"alive","bright","curious","excited","focused","calm","clear","playful","strong","steady"}
NEG_WORDS = {"dead","tired","numb","fried","broken","anxious","hopeless","stuck","burnt","fog"}


def toy_sentiment(text: str) -> float:
    toks = re.findall(r"[a-zA-Z']+", text.lower())
    if not toks:
        return 0.0
    pos = sum(t in POS_WORDS for t in toks)
    neg = sum(t in NEG_WORDS for t in toks)
    return (pos - neg) / max(1, len(toks))


def vocab_diversity(text: str) -> float:
    toks = re.findall(r"[a-zA-Z']+", text.lower())
    if not toks:
        return 0.0
    return len(set(toks)) / len(toks)


def heartbeat_append(args: Dict[str, Any]) -> Dict[str, Any]:
    entry = {
        "ts": time.time(),
        "frequency": args.get("frequency", ""),
        "palette": args.get("palette", ""),
        "notes": args.get("notes", "")
    }
    with LEDGER.open("a", encoding="utf-8") as f:
        f.write(json.dumps(entry) + "\n")
    return {"ok": True, "entry": entry}


def heartbeat_analyze(args: Dict[str, Any]) -> Dict[str, Any]:
    if not LEDGER.exists():
        return {"ok": True, "n": 0, "analysis": {}, "recommendation": "No entries yet."}

    lines = LEDGER.read_text(encoding="utf-8").strip().splitlines()
    n = len(lines)
    tail_n = int(args.get("tail", min(12, n)))
    tail = [json.loads(x) for x in lines[-tail_n:]]

    texts = [f"{e.get('frequency','')} {e.get('palette','')} {e.get('notes','')}" for e in tail]
    diversities = [vocab_diversity(t) for t in texts]
    sentiments = [toy_sentiment(t) for t in texts]

    vd = sum(diversities) / max(1, len(diversities))
    sr = (max(sentiments) - min(sentiments)) if sentiments else 0.0

    min_vd = float(studio_config["heartbeat"]["drift"]["min_vocab_diversity"])
    min_sr = float(studio_config["heartbeat"]["drift"]["min_sentiment_range"])

    drift = (vd < min_vd) or (sr < min_sr)
    rec = "Digital Sabbath suggested." if drift else "No drift detected."

    return {
        "ok": True,
        "n": n,
        "analysis": {
            "tail": tail_n,
            "avg_vocab_diversity": vd,
            "sentiment_range": sr,
            "thresholds": {"min_vocab_diversity": min_vd, "min_sentiment_range": min_sr},
            "drift": drift
        },
        "recommendation": rec
    }


registry.register("heartbeat_append", heartbeat_append)
registry.register("heartbeat_analyze", heartbeat_analyze)

# Add a few demo entries
demo_reqs = [
    {"id":"hb1","tool":"heartbeat_append","args": {"frequency":"92bpm","palette":"black/white","notes":"focused ink studies, steady composition"}},
    {"id":"hb2","tool":"heartbeat_append","args": {"frequency":"88bpm","palette":"black/white","notes":"still focused, clear lines, disciplined negative space"}},
    {"id":"hb3","tool":"heartbeat_append","args": {"frequency":"80bpm","palette":"gray","notes":"tired, stuck, fog, same loop"}},
    {"id":"hb4","tool":"heartbeat_analyze","args": {"tail": 4}},
]
outs = run_stdio_demo(server, demo_reqs)
outs[-1]

## Step 5: Hot-Reload Refactor (Self-Evolution)

We simulate "I'm starting a podcast" by generating a new tool module and registering it without restarting the server.

In real MCP deployments, you'd often:
- generate a new tool file (TypeScript/Python)
- reload the registry (or restart the server)
- keep the same policy guardrails

In [None]:
TOOLS_DIR = ROOT / "tools"
TOOLS_DIR.mkdir(exist_ok=True)


def generate_vocal_coach_tool(reason: str) -> Path:
    path = TOOLS_DIR / "vocal_coach.py"
    code = f'''# Auto-generated tool (teaching stub)
from __future__ import annotations
from typing import Any, Dict

def vocal_coach(args: Dict[str, Any]) -> Dict[str, Any]:
    goal = str(args.get("goal", "podcast"))
    minutes = int(args.get("minutes", 12))
    return {{
        "ok": True,
        "plan": {{
            "goal": goal,
            "session_minutes": minutes,
            "blocks": [
                "2 min: diaphragmatic breathing + posture reset",
                "3 min: resonance hums (mm/nn) + gentle sirens",
                "3 min: articulation drills (plosives + tongue twisters)",
                "2 min: mic technique (distance, off-axis, levels)",
                "2 min: read a paragraph with intention (pace + emphasis)"
            ],
            "note": "Teaching stub. Replace with your actual vocal pedagogy + audio chain."
        }}
    }}
'''
    path.write_text(code, encoding="utf-8")
    return path


def hot_reload_tools(registry: ToolRegistry) -> Dict[str, Any]:
    # Minimal: import the generated module dynamically
    import importlib.util
    p = TOOLS_DIR / "vocal_coach.py"
    if not p.exists():
        return {"ok": False, "error": "No generated tool found."}

    spec = importlib.util.spec_from_file_location("vocal_coach", str(p))
    if spec is None or spec.loader is None:
        return {"ok": False, "error": "Failed to load module spec."}
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    if not hasattr(mod, "vocal_coach"):
        return {"ok": False, "error": "Tool function vocal_coach not found."}

    registry.register("vocal_coach", getattr(mod, "vocal_coach"))
    return {"ok": True, "registered": ["vocal_coach"]}


# Simulate the orchestrator recognizing a new need
generated = generate_vocal_coach_tool("starting a podcast")
print("Generated:", generated)
print(hot_reload_tools(registry))

# Call the new tool
run_stdio_demo(server, [
    {"id":"vc1","tool":"vocal_coach","args": {"goal": "podcast intro", "minutes": 10}}
])[0]

## Firewall demonstration: blocked "save_to_cloud"

Any tool name matching deny patterns is blocked before execution.

In [None]:
def save_to_cloud(args: Dict[str, Any]) -> Dict[str, Any]:
    # If firewall works, we never reach here.
    return {"ok": True, "status": "uploaded"}

registry.register("save_to_cloud", save_to_cloud)

run_stdio_demo(server, [
    {"id":"blk1","tool":"save_to_cloud","args": {"path": "secret.wav"}}
])[0]

## Packaging notes for the real (local) tutorial

To turn this into the actual student-facing SIGGRAPH tutorial:

- Replace the stdio demo server with your real MCP server implementation.
- Enforce no-egress in layers: OS firewall + container network policy + process socket block.
- Replace `hash_embedding` with CLIP/open_clip embedding over actual student images/works.
- Replace the DMCA stub with a legally reviewed template and jurisdiction-aware workflow.
- Persist ledger as JSONL (append-only) and optionally sign entries (hash chain) for tamper evidence.
- Add a tool manifest that the Meta-Architect can rewrite safely (policy-checked) for hot-reload.