## Application Setup and Launch

In [None]:
import os
import sys
import nest_asyncio
import asyncio
from fastapi import FastAPI
from fastapi.middleware.wsgi import WSGIMiddleware
import httpx
from functools import wraps
import datetime

# 🧩 本地导入路径，根据放置的项目结构调整
sys.path.insert(0, r"E:\Github\a2a-samples\samples\python")

from components.api_key_dialog import api_key_dialog
from components.page_scaffold import page_scaffold
from pages.agent_list import agent_list_page
from pages.conversation import conversation_page
from pages.event_list import event_list_page
from pages.home import home_page_content
from pages.settings import settings_page_content
from pages.task_list import task_list_page
from service.server.server import ConversationServer
from state import host_agent_service
from state.state import AppState
import mesop as me
from contextlib import asynccontextmanager
from dotenv import load_dotenv
import uvicorn
import threading

# ✅ 允许在 Notebook 中多次运行 asyncio
nest_asyncio.apply()

# ✅ 加载 .env 环境变量
load_dotenv()

# ===== 1. 安全配置封装与审计模块 =====
class SecurityManager:
    """
    一个中心化的类，用于封装安全配置和处理审计日志。
    """
    def __init__(self):
        # (封装) 将安全策略作为配置封装在此类中
        self.policy = me.SecurityPolicy(allowed_script_srcs=['https://cdn.jsdelivr.net'])
        # (审计) 为审计日志创建一个简单的列表（在生产环境中，这应该是一个真正的日志文件或服务）
        self.audit_log = []

    def log_event(self, event_type: str, details: str):
        """
        记录一个带有时间戳的安全事件。
        """
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] - {event_type}: {details}"
        self.audit_log.append(log_entry)
        # 为了在 notebook 中立即看到反馈，我们也打印到控制台
        print(f"AUDIT LOG: {log_entry}")

# 创建 SecurityManager 的全局单例
security_manager = SecurityManager()


# ===== 2. 权限管理模块 (与审计模块集成) =====
class AuthService:
    def __init__(self, security_manager_instance: SecurityManager):
        # 权限现在包含一个新的 'audit' 页面，仅限 admin
        self._user_permissions = {
            "guest": ["home", "conversation"],
            "admin": ["home", "conversation", "agents", "event_list", "task_list", "settings", "audit"]
        }
        self.current_user_role = "guest"
        self.security_manager = security_manager_instance

    def check_permission(self, page_key: str) -> bool:
        return page_key in self._user_permissions.get(self.current_user_role, [])

    def set_user_role(self, role: str):
        if role in self._user_permissions and role != self.current_user_role:
            old_role = self.current_user_role
            self.current_user_role = role
            # (审计) 当角色发生变化时，记录事件
            self.security_manager.log_event(
                "ROLE_CHANGE_SUCCESS",
                f"User role changed from '{old_role}' to '{self.current_user_role}'"
            )
            return True
        elif role not in self._user_permissions:
            self.security_manager.log_event(
                "ROLE_CHANGE_FAILURE",
                f"Attempted to change to an invalid role: '{role}'"
            )
        return False

# 将 security_manager 实例传入 AuthService
auth_service = AuthService(security_manager)


# ⚙️ 初始化 FastAPI & 客户端封装
class HTTPXClientWrapper:
    async_client: httpx.AsyncClient = None
    def start(self):
        self.async_client = httpx.AsyncClient(timeout=30)
    async def stop(self):
        await self.async_client.aclose()
        self.async_client = None
    def __call__(self):
        assert self.async_client is not None
        return self.async_client

httpx_client_wrapper = HTTPXClientWrapper()


# ===== 3. 动态页面注册 (与审计模块集成) =====
def register_page(path, title, page_key, on_load_fn):
    def decorator(func):
        # 从 security_manager 获取封装的策略
        @me.page(path=path, title=title, on_load=on_load_fn, security_policy=security_manager.policy)
        @wraps(func)
        def wrapper():
            # (审计) 在检查权限之前，先记录访问尝试
            has_permission = auth_service.check_permission(page_key)
            if not has_permission:
                security_manager.log_event(
                    "ACCESS_DENIED",
                    f"User '{auth_service.current_user_role}' was denied access to page '{page_key}' at path '{path}'"
                )
                me.text(f"您没有权限访问 {title} 页面。")
                return

            security_manager.log_event(
                "ACCESS_GRANTED",
                f"User '{auth_service.current_user_role}' was granted access to page '{page_key}' at path '{path}'"
            )
            func()
        return wrapper
    return decorator

def on_load(e: me.LoadEvent):
    state = me.state(AppState)
    me.set_theme_mode(state.theme_mode)
    # ... (其余 on_load 代码保持不变)
    if 'conversation_id' in me.query_params:
        state.current_conversation_id = me.query_params['conversation_id']
    else:
        state.current_conversation_id = ''
    uses_vertex_ai = os.getenv('GOOGLE_GENAI_USE_VERTEXAI', '').upper() == 'TRUE'
    api_key = os.getenv('GOOGLE_API_KEY', '')
    if uses_vertex_ai:
        state.uses_vertex_ai = True
    elif api_key:
        state.api_key = api_key
    else:
        state.api_key_dialog_open = True


# ===== 4. Conversation Retry & Loading 功能 =====
async def handle_backend_call(user_input: str) -> str:
    """模拟与后端或AI模型的异步交互"""
    await asyncio.sleep(2)
    if "fail" in user_input.lower():
        raise ConnectionError("模拟API调用失败")
    return f"这是对 “{user_input}” 的回复。Timestamp: {datetime.datetime.now().isoformat()}"


async def on_submit_message(e: me.ClickEvent):
    """处理用户发送新消息的事件"""
    state = me.state(AppState)
    if not state.current_user_input.strip(): return
    if not state.current_conversation_id:
        state.current_conversation_id = str(uuid.uuid4())
        state.conversations[state.current_conversation_id] = []
    
    conv = state.conversations[state.current_conversation_id]
    user_msg = Message(id=str(uuid.uuid4()), content=state.current_user_input, role="user")
    conv.append(user_msg)
    
    placeholder_msg = Message(id=str(uuid.uuid4()), content="", role="model", status="loading")
    conv.append(placeholder_msg)

    user_input_for_backend = state.current_user_input
    state.current_user_input = ""
    yield

    try:
        response = await handle_backend_call(user_input_for_backend)
        placeholder_msg.content = response
        placeholder_msg.status = "sent"
    except Exception:
        user_msg.status = "error"
        conv.pop()
    yield


async def on_retry_click(e: me.ClickEvent):
    """处理用户点击重试按钮的事件"""
    state = me.state(AppState)
    message_id = e.key
    conv = state.conversations.get(state.current_conversation_id)
    if not conv: return
    failed_msg = next((msg for msg in conv if msg.id == message_id), None)
    if not failed_msg: return
    failed_msg.status = "sent"
    placeholder_msg = Message(id=str(uuid.uuid4()), content="", role="model", status="loading")
    conv.append(placeholder_msg)
    yield
    try:
        response = await handle_backend_call(failed_msg.content)
        placeholder_msg.content = response
        placeholder_msg.status = "sent"
    except Exception:
        failed_msg.status = "error"
        conv.pop()
    yield


# ===== 5. 页面定义 =====
@register_page(path='/', title='Chat', page_key='home', on_load_fn=on_load)
def home_page():
    api_key_dialog()
    with page_scaffold():
        home_page_content(me.state(AppState))

@register_page(path='/agents', title='Agents', page_key='agents', on_load_fn=on_load)
def agents_page():
    agent_list_page(me.state(AppState))

@register_page(path='/conversation', title='Conversation', page_key='conversation', on_load_fn=on_load)
def conversation():
    conversation_page(me.state(AppState))

@register_page(path='/event_list', title='Event List', page_key='event_list', on_load_fn=on_load)
def events():
    event_list_page(me.state(AppState))

@register_page(path='/task_list', title='Task List', page_key='task_list', on_load_fn=on_load)
def tasks():
    task_list_page(me.state(AppState))

@register_page(path='/settings', title='Settings', page_key='settings', on_load_fn=on_load)
def settings():
    settings_page_content()

# (审计) 新增一个 admin 专属的审计日志页面
@register_page(path='/audit', title='Audit Log', page_key='audit', on_load_fn=on_load)
def audit_page():
    with page_scaffold():
        me.h1("Security Audit Log")
        me.text("显示最新的事件在最上方。")
        if not security_manager.audit_log:
            me.text("还没有审计事件被记录。")
        else:
            # 以相反的顺序显示日志，最新的在最前面
            for entry in reversed(security_manager.audit_log):
                me.text(entry, style=me.Style(font_family="monospace", white_space="pre-wrap"))


# ✅ 定义 lifespan 生命周期（FastAPI 启动钩子）
@asynccontextmanager
async def lifespan(app: FastAPI):
    httpx_client_wrapper.start()
    ConversationServer(app, httpx_client_wrapper())
    app.openapi_schema = None
    app.mount("/", WSGIMiddleware(me.create_wsgi_app(debug_mode=False)))
    app.setup()
    yield
    await httpx_client_wrapper.stop()

# ===== 6. 启动服务 =====
# 在这里切换角色以进行测试
auth_service.set_user_role("admin")

app = FastAPI(lifespan=lifespan)
host = os.environ.get('A2A_UI_HOST', '127.0.0.1')
port = int(os.environ.get('A2A_UI_PORT', '12000'))
host_agent_service.server_url = f"http://{host}:{port}"

def run_uvicorn():
    uvicorn.run(app, host=host, port=port, log_level="info")

threading.Thread(target=run_uvicorn, daemon=True).start()
print(f"✅ Uvicorn 已在后台启动： http://{host}:{port}  （浏览器打开这个地址试试）")


AUDIT LOG: [2025-07-21 01:32:30] - ROLE_CHANGE_SUCCESS: User role changed from 'guest' to 'admin'
✅ Uvicorn 已在后台启动： http://127.0.0.1:12000  （浏览器打开这个地址试试）


INFO:     Started server process [20436]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:12000 (Press CTRL+C to quit)


INFO:     127.0.0.1:14330 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:14330 - "GET /styles.css HTTP/1.1" 304 Not Modified
INFO:     127.0.0.1:14331 - "GET /zone.js/bundles/zone.umd.js HTTP/1.1" 304 Not Modified
INFO:     127.0.0.1:14333 - "GET /prod_bundle.js HTTP/1.1" 304 Not Modified
INFO:     127.0.0.1:14333 - "POST /__ui__ HTTP/1.1" 200 OK
INFO:     127.0.0.1:14333 - "GET /__web-components-module__/components/async_poller.js HTTP/1.1" 304 Not Modified
INFO:     127.0.0.1:14333 - "POST /__ui__ HTTP/1.1" 200 OK
INFO:     127.0.0.1:14350 - "POST /conversation/list HTTP/1.1" 200 OK
INFO:     127.0.0.1:14352 - "POST /task/list HTTP/1.1" 200 OK
INFO:     127.0.0.1:14354 - "POST /message/pending HTTP/1.1" 200 OK
INFO:     127.0.0.1:14333 - "POST /__ui__ HTTP/1.1" 200 OK
INFO:     127.0.0.1:14360 - "POST /conversation/list HTTP/1.1" 200 OK
INFO:     127.0.0.1:14362 - "POST /task/list HTTP/1.1" 200 OK
INFO:     127.0.0.1:14364 - "POST /message/pending HTTP/1.1" 200 OK
INFO:     127.0.0.1:

## Asynchronous Task Framework

In [2]:
import asyncio, uuid, threading
from IPython.display import display, clear_output

# 1. 已有漫长的同步函数
def run_pipeline_sync(user_input: str):
    import time
    time.sleep(5)                      
    return f"✅ 已处理: {user_input}"

# 2. 把同步函数丢进线程池 -> 变异步
async def run_pipeline_async(user_input: str):
    return await asyncio.to_thread(run_pipeline_sync, user_input)

# 3. ===== 以下是异步任务框架 =====
TASK_STATUS, TASK_RESULT = {}, {}

async def _long_task(task_id: str, user_input: str):
    TASK_STATUS[task_id] = "Running"
    try:
        result = await run_pipeline_async(user_input)
        TASK_RESULT[task_id] = result
        TASK_STATUS[task_id] = "Success"
    except Exception as e:
        TASK_STATUS[task_id] = f"Failed: {e}"

def start_task(user_input: str):
    task_id = str(uuid.uuid4())
    TASK_STATUS[task_id] = "Pending"
    asyncio.create_task(_long_task(task_id, user_input))
    return task_id

def get_task_status(task_id: str):
    return TASK_STATUS.get(task_id, "Unknown Task")

async def run_and_display_task(user_input: str):
    task_id = start_task(user_input)
    while True:
        status = get_task_status(task_id)
        clear_output(wait=True)
        display(f"任务 {task_id[:8]}… 状态：{status}")
        if status == "Success" or status.startswith("Failed"):
            break
        await asyncio.sleep(1)
    display("任务结果：", TASK_RESULT.get(task_id, "（无结果）"))

## Demo

In [None]:
async def demo_multi():
    ids = [start_task(f"MSG {i}") for i in range(5)]
    while True:
        clear_output(wait=True)
        for tid in ids:
            print(f"{tid[:8]}  →  {get_task_status(tid)}")
        if all(get_task_status(tid).startswith(("Success", "Failed")) for tid in ids):
            break
        await asyncio.sleep(1)
    print("\n全部完成！结果预览：")
    for tid in ids:
        print(tid[:8], "=>", TASK_RESULT.get(tid))

await demo_multi()

89bda1d2  →  Success
dfaac8ca  →  Success
0979d7a8  →  Success
eeefe7c1  →  Success
dad75834  →  Success

全部完成！结果预览：
89bda1d2 => ✅ 已处理: MSG 0
dfaac8ca => ✅ 已处理: MSG 1
0979d7a8 => ✅ 已处理: MSG 2
eeefe7c1 => ✅ 已处理: MSG 3
dad75834 => ✅ 已处理: MSG 4


AUDIT LOG: [2025-07-21 01:32:54] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:55] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:56] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:56] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:56] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:56] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:56] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:57] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: [2025-07-21 01:32:57] - ACCESS_GRANTED: User 'admin' was granted access to page 'home' at path '/'
AUDIT LOG: