Skip to content

GriffinGuard/Griffino-Python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

griffino-sdk

Griffino 的 Python SDK,用于开发可接入 Griffino 主系统的插件。Griffino 是一个以消息驱动、容器化运行插件的编排平台;本 SDK 封装了插件与主系统之间 的 RabbitMQ 协议、声明文件生成与运行时生命周期,让你用纯 Python 声明即可构建一个完整插件。

特性

  • 声明式 boot / user 配置模型(@boot_config / @user_config + Field
  • provider 能力:同时支持「同步 invoke() 回复」与「蓝图任务节点(task-node)」两条调用路径
  • consumer 事件处理器与 consumer capability 声明
  • actions:即发即弃的用户动作处理器
  • statusViews:通过 client.set_state(...) 把插件状态写入 Redis,供主系统展示
  • invoke() / dispatch() 能力调用与事件分发
  • 从 Python 声明生成 plugin.manifest.json / config.boot.json / config.user.json / plugin.boot.yml 的 CLI
  • 基于 connect_robust 的 RabbitMQ 传输层,断线后消费者自动恢复
  • Redis 用户配置读取

安装

pip install griffino-sdk

当前包版本可通过 griffino.__version__ 读取。

开发环境:

pip install -e ".[dev]"

如果你使用本仓库内的本地虚拟环境,也可以直接运行:

make typecheck
make test
make compile
make build

本地 make build 会使用当前仓库的 .venv 并启用 --no-isolation, 这样在受限网络环境里也能完成构建验证。

运行环境

主系统会在插件容器启动时注入以下环境变量,GriffinoClient() 默认直接从环境中读取:

  • RABBITMQ_HOST
  • RABBITMQ_PORT
  • RABBITMQ_USER
  • RABBITMQ_PASSWORD
  • GRIFFINO_PLUGIN_ID
  • GRIFFINO_USER_ID

如果启用 Redis 用户配置读取或状态视图写入,SDK 还会尝试读取这些常见环境变量:

  • REDIS_URL
  • REDIS_HOST
  • REDIS_PORT
  • REDIS_DB
  • REDIS_USER
  • REDIS_PASSWORD
  • GRIFFINO_REDIS_USER_CONFIG_PREFIX

快速开始

from griffino import Field, GriffinoClient, HandlerContext, boot_config, user_config

client = GriffinoClient(
    id="com.example.demo",
    name="Demo Plugin",
    version="1.0.0",
    description="Demo plugin for Griffino",
    main_service_id="demo-service",
)


@boot_config
class BootConfig:
    api_token: str = Field(
        key="API_TOKEN",
        type="password",
        name="API Token",
        description="Token for upstream API access",
        optional=False,
    )


@user_config
class UserConfig:
    preferred_model: str = Field(
        key="PREFERRED_MODEL",
        type="options",
        name="Preferred Model",
        description="Model used for requests",
        optional=True,
        default="deepseek-chat",
        values=[
            {"value": "deepseek-chat", "display": "DeepSeek Chat"},
            {"value": "deepseek-reasoner", "display": "DeepSeek Reasoner"},
        ],
    )


# Provider:被其它插件 invoke() 或被蓝图任务节点驱动时执行。
@client.provider(
    id="ai_chat_model",
    type="ai.chat.model",
    name="AI Chat Model",
    description="Provides AI chat completion.",
    standard_interface_ref="griffino.interfaces.ai.chat@1.0.0",
    timeout_ms=60000,
)
async def handle_chat(ctx: HandlerContext) -> dict[str, str]:
    boot_cfg = await client.get_boot_config(BootConfig)
    user_cfg = await client.get_user_config(UserConfig, user_id=ctx.user_id)
    messages = ctx.payload["messages"]
    _ = boot_cfg.api_token

    # 记录本次使用的模型,供下面声明的 statusView 展示。
    await client.set_state(key="last_model", value=user_cfg.preferred_model, user_id=ctx.user_id)
    return {"content": f"{user_cfg.preferred_model}:{messages[-1]['content']}"}


# Consumer:监听主系统投递的事件(如任务完成回调)。
@client.consumer(event="task.completed")
async def on_task_completed(ctx: HandlerContext) -> None:
    print(f"Task {ctx.task_id} completed")


# Action:即发即弃的用户动作。
@client.action(
    id="reset_model",
    name="Reset Model",
    description="Clear the recorded last-used model for the current user.",
    confirmation_required=True,
    confirmation_message="Reset the recorded model?",
)
async def reset_model(ctx: HandlerContext) -> None:
    await client.set_state(key="last_model", value="(cleared)", user_id=ctx.user_id)


# StatusView:把 Redis 中的插件状态暴露给主系统展示。
client.declare_status_view(
    id="last_model",
    name="Last Used Model",
    type="kv",
    redis_key_pattern="last_model",
)


async def main() -> None:
    await client.start()
    try:
        await client.wait_closed()
    finally:
        await client.close()

两种调用路径

主系统会通过 同一个 provider 队列以两种方式驱动 provider,SDK 对插件作者透明地处理两者, 你只需写一个 @client.provider 处理器即可:

  1. 同步 invoke() 回复 —— 调用方携带 reply_to,处理器返回的 dict 会作为回复同步发回。
  2. 蓝图任务节点(task-node) —— 主系统的任务调度器执行蓝图节点时不带 reply_to,而是带 nodeId / context。处理器返回后,SDK 自动 dispatch 一条 NodeResultEnvelope{taskId, msgId, userId, pluginId, nodeId, ok, output, failReason})回主系统, 其中 output 为处理器返回值;处理器抛异常时 ok=false 并带 failReason

HandlerContext 在任务节点模式下会额外提供 ctx.node_idctx.node_context。 两条路径的协议细节见 docs/architecture.md,能力 schema 见 Griffino-Schemas

调用能力(invoke)

invoke() 会通过 griffino.router 发送能力调用请求,并同步等待回复。

result = await client.invoke(
    type="ai.chat.model",
    payload={
        "messages": [
            {"role": "user", "content": "hello"},
        ]
    },
    slot="chat",
)

行为约定:

  • user_id 默认优先取当前 HandlerContext.user_id;不在 handler 内时回退到 GRIFFINO_USER_ID
  • task_id 默认优先取当前 HandlerContext.task_id
  • 超时(timeout_ms)优先级:显式入参 > 同 type 的本地 provider 声明的 timeout_ms > 配置默认值(会被收敛到严格小于主系统 60s 回复上限,以便客户端先于平台超时)
  • provider_timeout 会映射为 griffino.TimeoutError
  • slot_not_configured 会映射为 griffino.SlotNotConfiguredError

分发事件(dispatch)

dispatch() 用于向 Griffino 主系统提交事件,不等待后续蓝图执行结果。

await client.dispatch(
    event="media.video_link",
    payload={"url": "https://example.com/video.mp4"},
)

Actions(动作)

动作是即发即弃的用户触发操作(无回复)。在 manifest 中声明,运行时由主系统经 griffino.actions 投递到插件,处理器返回 None

@client.action(
    id="clear_cache",
    name="Clear Cache",
    description="Drop the cached state for the current user.",
    confirmation_required=True,           # 前端是否需要二次确认
    confirmation_message="Clear cache?",  # 确认弹窗文案
    cooldown_ms=3000,                      # 前端防抖间隔(可选)
)
async def clear_cache(ctx: HandlerContext) -> None:
    # ctx.action_id 为被触发的动作 id
    ...

状态视图(statusViews)

状态视图把插件写入 Redis 的状态暴露给主系统展示。SDK 通过 client.set_state(...) 写入键 plugin:{pluginId}:state:{userId}:{key},并在 manifest 中声明对应视图:

# 写入状态(key 相对于 plugin:{pluginId}:state:{userId}: 前缀)
await client.set_state(key="last_model", value="deepseek-chat", user_id=ctx.user_id)

# 声明视图(装饰器或直接调用皆可)
client.declare_status_view(
    id="last_model",
    name="Last Used Model",
    type="kv",                  # "kv" 键值对 | "status" 单状态标签
    redis_key_pattern="last_model",
)

set_state 在未配置 Redis 时会抛出 ConfigurationError

Consumer Capability 声明

如果插件需要在 manifest 中声明自己消费某类 capability,但没有对应运行时 handler,可以使用:

@client.consumer_capability(
    id="ai_chat_consumer",
    type="ai.chat.model",
    name="AI Analysis",
    description="Uses AI model for content analysis.",
    optional=False,
    slots=[
        {
            "id": "chat",
            "name": "Direct Chat",
            "description": "High-capability model for user conversation.",
        },
    ],
)
def declare_ai_analysis() -> None:
    return None

声明文件生成

SDK 自带 griffino CLI,可从 Python 声明生成配置和插件声明文件:

griffino config generate -a --module plugin

也支持单独生成:

griffino config generate --manifest --module plugin
griffino config generate --boot-config --module plugin
griffino config generate --user-config --module plugin
griffino config generate --boot-yml --module plugin

生成时如果目标文件已存在,SDK 会先备份为 .1.2 等历史文件,再写入新文件。

生成的 provider capability 会自动携带 entryPoint.requestTopicPattern, 格式为 invoke.{pluginId}.{capabilityId}.v1,方便主系统据此 provisioning 队列绑定。 manifest 会包含已声明的 provider(含 slots / inputSchemaRef / outputSchemaRef / standardInterfaceRef)、consumer capability、actionsstatusViews

如果你希望在代码里直接拿到生成结果,也可以使用:

manifest = client.generate_manifest_data()
boot_config_data = client.generate_boot_config_data()
user_config_data = client.generate_user_config_data()
boot_yaml_data = client.generate_boot_yaml_data()

GriffinoClient(...) 还支持用于 manifest 生成的扩展元数据,例如:

  • author
  • license
  • site
  • tutorial
  • permissions_requested
  • internationalization
  • manifest_schema
  • boot_config_schema
  • user_config_schema

当前生成结构约定:

  • config.boot.json 采用 services[].configs 结构
  • config.user.json 采用顶层 configs 结构
  • plugin.boot.yml 会生成主服务模板,并自动包含 RabbitMQ/Redis 系统注入环境变量

生命周期

推荐使用两段式生命周期:

await client.start()
await client.wait_closed()

也可以使用便捷方法:

await client.run()

退出时调用 await client.close() 释放连接(见快速开始示例)。底层使用 aio_pika.connect_robust,断线重连后 provider / consumer / action 消费者会自动恢复。

异常

所有 SDK 异常都继承自 GriffinoError

  • GriffinoError
  • ConfigurationError
  • TransportError
  • CapabilityError
  • SlotNotConfiguredError
  • TimeoutError

示例插件

仓库里包含一个基于 SDK 重写的 DeepSeek 插件示例,覆盖 provider / action / statusView:

集成测试

集成测试默认跳过,需显式开启(依赖本地 RabbitMQ 与 Redis):

GRIFFINO_RUN_INTEGRATION_TESTS=1 pytest tests/integration

也可以用:

make test-integration

可用 tests/integration/docker-compose.yml 启动依赖。 集成测试已覆盖 RabbitMQ 与 Redis 的真实端到端链路,包括 invoke / dispatch、task-node、 consumer 回调、action 与 statusView。

仓库还带有 CI 工作流:

发布流程说明见:

文档

相关项目

许可证

MIT

About

PythonSDK for Griffino

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors