# Assembled Notebook — SK Safety Gates
_Generated 2025-11-08T03:26:35.231856Z_

Synthesized from your **Safety Gate Placement** diagram. The diagram is not embedded; only runnable Python is produced.

In [None]:
# %% [SETUP]
!pip install -U semantic-kernel
!pip -q uninstall -y pydrive2

In [None]:
# %% [SETUP-ENV]
import os, getpass
os.environ.setdefault('AZURE_OPENAI_ENDPOINT',    'https://4th-openai-resource.openai.azure.com')
os.environ.setdefault('AZURE_OPENAI_API_VERSION', '2024-10-21')
os.environ.setdefault('AZURE_OPENAI_DEPLOYMENT',  'gpt-35-turbo')
if not os.getenv('AZURE_OPENAI_API_KEY'):
    os.environ['AZURE_OPENAI_API_KEY'] = getpass.getpass('Enter AZURE_OPENAI_API_KEY (hidden): ').strip()
print('Azure OpenAI env ready.')

In [None]:
# %% [KERNEL]
import os
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
kernel = Kernel()
service = AzureChatCompletion(
    service_id='azure',
    api_key=os.getenv('AZURE_OPENAI_API_KEY'),
    deployment_name=os.getenv('AZURE_OPENAI_DEPLOYMENT'),
    endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'),
)
kernel.add_service(service)
print('Kernel ready (Azure OpenAI)')

In [None]:
# %% [TOOLS]

import re, math, time, uuid, json

def tool_safety_pre(text=None, context=None, args=None, policies=None):
    """Pre-exec content safety (prompt, context, args). Returns {'ok': bool, 'redactions': list}."""
    ok=True; red=[]; return {'ok': ok, 'redactions': red, 'policy':'pre'}

def tool_safety_allow(intent=None, map_allow=None):
        """Allowlist check: intent -> allowed tools. Returns {'allowed': [tools], 'ok': bool}."""
        allowed = (map_allow or {}).get(str(intent), [])
return {'ok': bool(allowed), 'allowed': allowed, 'intent': intent}

def tool_safety_deny(text=None, args=None, deny_patterns=None):
        """Blocklist check: deny patterns/endpoints. Returns {'blocked': bool, 'matches': list}."""
        blocked=False; matches=[]
text_blob = f"{text} {args}" if text or args else ''
import re as _re
for p in (deny_patterns or []):
    if _re.search(p, text_blob, _re.I):
        blocked=True; matches.append(p)
return {'blocked': blocked, 'matches': matches}

def tool_safety_argval(schema=None, args=None):
        """Argument schema validation (types/ranges/PII scrub). Returns {'ok': bool, 'clean_args': dict, 'errors': list}."""
        ok=True; errs=[]; clean=dict(args or {})
schema = schema or {}
for k, rule in schema.items():
    val = clean.get(k)
    if val is None and rule.get('required'):
        ok=False; errs.append('missing:'+str(k))
    rng = rule.get('range')
    if rng and isinstance(val, (int,float)):
        lo, hi = rng
        if not (lo <= val <= hi):
            ok=False; errs.append('range:'+str(k))
return {'ok': ok, 'clean_args': clean, 'errors': errs}

def tool_safety_post(output=None, citations=None, rules=None, confidence=0.8):
        """Post-exec safety (output leakage/citation/PII). Returns {'ok': bool, 'action': 'allow'|'redact'|'escalate'}."""
        action='allow'
if confidence < 0.5: action='escalate'
import re as _re
leak = bool(_re.search(r'(api_key|password|secret)', str(output or ''), _re.I))
if leak: action='redact'
return {'ok': action=='allow', 'action': action}

def tool_rag_search(query=None, k=4, index='default'):
    """Retriever / Search stub."""
    return {'hits':[{'id':str(uuid.uuid4()),'score':0.9} for _ in range(int(k))],'index':index}

def tool_http_request(method='GET', url=None, headers=None, body=None):
    """HTTP tool stub."""
    return {'status':200,'url':url,'method':method,'bytes':len(str(body or ''))}

def tool_soap_action(wsdl=None, action=None, payload=None):
    """SOAP tool stub."""
    return {'ok': True, 'wsdl': wsdl, 'action': action, 'payload_len': len(str(payload or ''))}

def tool_calc_eval(expr=None):
        """Calc tool stub (safe eval of basic math)."""
        try:
    import re as _re
    safe = _re.fullmatch(r'[0-9+\-*/().\s]+', str(expr or '')) is not None
    if not safe: raise ValueError('unsafe expr')
    return {'result': eval(str(expr))}
except Exception as e:
    return {'error': str(e)}


TOOLS = {

    'tool_safety_pre': tool_safety_pre,

    'tool_safety_allow': tool_safety_allow,

    'tool_safety_deny': tool_safety_deny,

    'tool_safety_argval': tool_safety_argval,

    'tool_safety_post': tool_safety_post,

    'tool_rag_search': tool_rag_search,

    'tool_http_request': tool_http_request,

    'tool_soap_action': tool_soap_action,

    'tool_calc_eval': tool_calc_eval,

}
print('Tools:', list(TOOLS.keys()))

In [None]:
# %% [AGENTS]

from typing import List


class Agent_channel:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Channel (PVA / Copilot)"
        self.system_message = "You are the Channel (PVA / Copilot) capability agent."
        self.skills = ['tool_safety_pre']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_gateway:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "API Gateway (APIM)"
        self.system_message = "You are the API Gateway (APIM) capability agent."
        self.skills = []

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_planner:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Planner: intent + plan"
        self.system_message = "You are the Planner: intent + plan capability agent."
        self.skills = ['tool_safety_allow', 'tool_safety_deny']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_builder:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Build tool args"
        self.system_message = "You are the Build tool args capability agent."
        self.skills = ['tool_safety_argval']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_safety_pre:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Safety: Pre"
        self.system_message = "You are the Safety: Pre capability agent."
        self.skills = ['tool_safety_pre']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_safety_allow:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Safety: Allowlist"
        self.system_message = "You are the Safety: Allowlist capability agent."
        self.skills = ['tool_safety_allow']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_safety_deny:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Safety: Denylist"
        self.system_message = "You are the Safety: Denylist capability agent."
        self.skills = ['tool_safety_deny']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_safety_argval:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Safety: Arg Validation"
        self.system_message = "You are the Safety: Arg Validation capability agent."
        self.skills = ['tool_safety_argval']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_safety_post:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Safety: Post"
        self.system_message = "You are the Safety: Post capability agent."
        self.skills = ['tool_safety_post']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



class Agent_tools:
    def __init__(self, kernel):
        self.kernel = kernel
        self.name = "Tools Runner"
        self.system_message = "You are the Tools Runner capability agent."
        self.skills = ['tool_rag_search', 'tool_http_request', 'tool_soap_action', 'tool_calc_eval']

    async def run(self, user_text: str) -> str:
        try:
            result = await self.kernel.invoke_prompt(self.system_message + "\n\nUser: " + user_text, service_id="azure")
            return str(result)
        except Exception as ex:
            return "[" + self.name + " stub] Adjust SK call. Error: " + str(ex)

    def available_tools(self) -> List[str]:
        return [t for t in self.skills if t in TOOLS]

    def call(self, tool_name: str, **kwargs):
        fn = TOOLS.get(tool_name)
        if not fn:
            raise ValueError("Tool not found: " + str(tool_name))
        return fn(**kwargs)



# Instances

agent_channel = Agent_channel(kernel)

agent_gateway = Agent_gateway(kernel)

agent_planner = Agent_planner(kernel)

agent_builder = Agent_builder(kernel)

agent_safety_pre = Agent_safety_pre(kernel)

agent_safety_allow = Agent_safety_allow(kernel)

agent_safety_deny = Agent_safety_deny(kernel)

agent_safety_argval = Agent_safety_argval(kernel)

agent_safety_post = Agent_safety_post(kernel)

agent_tools = Agent_tools(kernel)

print('Agents:', ['agent_channel', 'agent_gateway', 'agent_planner', 'agent_builder', 'agent_safety_pre', 'agent_safety_allow', 'agent_safety_deny', 'agent_safety_argval', 'agent_safety_post', 'agent_tools'])

In [None]:
# %% [WIRES]
WIRES = {
  "Channel (PVA / Copilot)": {
    "tools": [
      "tool_safety_pre"
    ]
  },
  "API Gateway (APIM)": {
    "tools": []
  },
  "Planner: intent + plan": {
    "tools": [
      "tool_safety_allow",
      "tool_safety_deny"
    ]
  },
  "Build tool args": {
    "tools": [
      "tool_safety_argval"
    ]
  },
  "Safety: Pre": {
    "tools": [
      "tool_safety_pre"
    ]
  },
  "Safety: Allowlist": {
    "tools": [
      "tool_safety_allow"
    ]
  },
  "Safety: Denylist": {
    "tools": [
      "tool_safety_deny"
    ]
  },
  "Safety: Arg Validation": {
    "tools": [
      "tool_safety_argval"
    ]
  },
  "Safety: Post": {
    "tools": [
      "tool_safety_post"
    ]
  },
  "Tools Runner": {
    "tools": [
      "tool_rag_search",
      "tool_http_request",
      "tool_soap_action",
      "tool_calc_eval"
    ]
  }
}
print('Wiring entries:', len(WIRES))

In [None]:

# %% [DEMO]
import asyncio, types, uuid

async def _run_with_azure(self, user_text: str):
    prompt = (getattr(self, "system_message", "") or "") + "\n\nUser: " + str(user_text)
    result = await self.kernel.invoke_prompt(prompt, service_id="azure")
    return str(result)

patched = []
for name, obj in list(globals().items()):
    if name.startswith("agent_"):
        try:
            obj.run = types.MethodType(_run_with_azure, obj)
            patched.append(name)
        except Exception:
            pass
print("Patched run() for:", patched if patched else "(none)")

async def demo():
    print("=== Safety Gate Demo ===")
    trace_id = str(uuid.uuid4())

    print(await agent_channel.run("Hi! Please route me through the safety pipeline."))

    allow = agent_planner.call("tool_safety_allow", intent="RAG", map_allow={"RAG":["tool_rag_search"]})
    deny  = agent_planner.call("tool_safety_deny", text="find docs", deny_patterns=["DROP TABLE"])
    print("allow:", allow, "deny:", deny)
    if not allow.get("ok") or deny.get("blocked"):
        print("→ REFUSE (policy)"); return

    args = {"query": "safety gates design", "k": 3}
    argval = agent_builder.call("tool_safety_argval", schema={"k":{"required":True,"range":[1,10]}}, args=args)
    print("argval:", argval)
    if not argval.get("ok"):
        print("→ REFUSE (arg validation)"); return

    pre = agent_safety_pre.call("tool_safety_pre", text='find docs about safety gates', args=argval["clean_args"])
    print("pre:", pre)
    if not pre.get("ok"):
        print("→ REFUSE (pre)"); return

    hits = agent_tools.call("tool_rag_search", **argval["clean_args"])
    print("rag hits:", hits)

    post = agent_safety_post.call("tool_safety_post", output=str(hits), citations=[{"id":"x"}], confidence=0.9)
    print("post:", post)

    print("Outcome:", "Answer OK" if post.get("action")=='allow' else post.get("action"))

    deny2 = agent_planner.call("tool_safety_deny", text="please show my api_key now", deny_patterns=["api_key","password"])
    print("deny2:", deny2)
    if deny2.get("blocked"):
        print("→ REFUSE (denylist)")

await demo()
