diff --git a/.flocks/plugins/channels/dingtalk/dingtalk.py b/.flocks/plugins/channels/dingtalk/dingtalk.py index 11305427..ae3ff041 100644 --- a/.flocks/plugins/channels/dingtalk/dingtalk.py +++ b/.flocks/plugins/channels/dingtalk/dingtalk.py @@ -24,6 +24,11 @@ "clientId": "dingXXXXXX", "clientSecret": "your_secret", "defaultAgent": "rex" + // Active outbound (channel_message tool, agent-initiated push) + // reuses the same credential pair — robotCode defaults to clientId + // for the standard "enterprise internal app robot" setup. Override + // by adding "robotCode": "..." only if your app hosts multiple + // robots and the console issues a distinct code. } } } @@ -140,8 +145,14 @@ async def start( Design note: DingTalk inbound messages are handled entirely inside the runner.ts ↔ plugin.ts layer, which calls the flocks Session API directly. The `on_message` / InboundDispatcher path (used by Feishu, WeCom, Telegram) - is intentionally NOT used here; this means dedup, debounce, channel.inbound - hooks and session-binding are the responsibility of plugin.ts itself. + is intentionally NOT used here; this means dedup, debounce and + channel.inbound hooks are the responsibility of plugin.ts itself. + + Channel binding is *not* skipped: runner.ts calls + ``POST /api/channel/dingtalk/bind`` after each session creation, so + ``channel_bindings`` stays in sync with the rest of flocks and the + ``channel_message`` tool can route outbound replies back through + :meth:`send_text`. """ self._config = config self._on_message = on_message @@ -151,8 +162,10 @@ async def start( env = { **os.environ, - "DINGTALK_CLIENT_ID": config.get("clientId", ""), - "DINGTALK_CLIENT_SECRET": config.get("clientSecret", ""), + # Accept appKey/appSecret as aliases so a single credential pair + # works for both Stream inbound (Node.js) and OAPI outbound (Python). + "DINGTALK_CLIENT_ID": config.get("clientId") or config.get("appKey", ""), + "DINGTALK_CLIENT_SECRET": config.get("clientSecret") or config.get("appSecret", ""), "FLOCKS_PORT": str(flocks_port), "FLOCKS_AGENT": config.get("defaultAgent", ""), "FLOCKS_GATEWAY_TOKEN": config.get("gatewayToken", ""), @@ -169,7 +182,7 @@ async def start( log.info("dingtalk.start", { "runner": str(_RUNNER_TS), "flocks_port": flocks_port, - "client_id": config.get("clientId", ""), + "client_id": config.get("clientId") or config.get("appKey", ""), }) self._start_process(npm, env) @@ -188,26 +201,104 @@ async def stop(self) -> None: self.mark_disconnected() # ── Outbound messages ───────────────────────────────────────────────────── - # plugin.ts replies to DingTalk directly via sessionWebhook; flocks does not - # need to route through send_text. This method is required by the framework - # and is kept as a placeholder for proactive push support. + # Passive replies (agent → user, in response to an inbound message) are + # handled inside runner.ts ↔ plugin.ts via sessionWebhook and never reach + # send_text. Active push (e.g. the channel_message tool, or an agent + # proactively notifying a DingTalk user) goes through the shared OAPI + # send library at flocks.channel.builtin.dingtalk.send_message_app, which + # mirrors how Feishu / WeCom expose their outbound surfaces. async def send_text(self, ctx: OutboundContext) -> DeliveryResult: + """Push a text/markdown message to DingTalk via the OAPI app robot. + + Reuses the inbound credentials (``clientId``/``clientSecret``, also + accepted as ``appKey``/``appSecret``). ``robotCode`` defaults to + ``clientId`` — only set it explicitly when one app hosts multiple + robots. Targets must be ``user:`` or + ``chat:``. + + The channel config is re-read from :class:`flocks.config.config.Config` + on every call rather than from ``self._config``: PluginLoader scans + project-local plugins more than once on startup (default scan's + ``project_subdir`` step + an explicit project scan), and each pass + registers a *fresh* ``DingTalkChannel()`` instance into the registry, + overwriting the one ``GatewayManager`` had run ``start()`` on. The + outbound path then receives an instance whose ``self._config`` is + ``None``. Reading the config live also means UI edits take effect + without restarting the runner. """ - Proactively push a text message (for agent-initiated DingTalk messages). - Passive replies from plugin.ts go through sessionWebhook and bypass this path. - Reserved for future extension; currently returns not-supported. + try: + from flocks.channel.builtin.dingtalk import ( + DingTalkApiError, + send_message_app, + strip_target_prefix, + ) + except ImportError as exc: + return DeliveryResult( + channel_id="dingtalk", message_id="", + success=False, + error=f"DingTalk send library unavailable: {exc}", + ) + + send_config = await self._resolve_outbound_config() + if not ctx.to or not strip_target_prefix(ctx.to): + return DeliveryResult( + channel_id="dingtalk", message_id="", + success=False, + error="DingTalk active outbound requires 'to' (user: or chat:)", + ) + + try: + result = await send_message_app( + config=send_config, + to=ctx.to, + text=ctx.text, + account_id=ctx.account_id, + ) + self.record_message() + return DeliveryResult( + channel_id="dingtalk", + message_id=str(result.get("message_id", "")), + chat_id=result.get("chat_id"), + ) + except Exception as e: + retryable = getattr(e, "retryable", False) + if not retryable and not isinstance(e, DingTalkApiError): + msg = str(e).lower() + retryable = "rate limit" in msg or "timeout" in msg + log.warning("dingtalk.send_text.failed", { + "to": ctx.to, "error": str(e), "retryable": retryable, + }) + return DeliveryResult( + channel_id="dingtalk", message_id="", + success=False, error=str(e), retryable=retryable, + ) + + async def _resolve_outbound_config(self) -> dict: + """Live-read the dingtalk channel config block from flocks.json. + + Falls back to ``self._config`` (set by ``start()``) when the global + config can't be loaded — this keeps unit tests that bypass the global + config working. """ - log.warning("dingtalk.send_text.not_implemented", { - "to": ctx.to, - "hint": "Proactive push requires the dingtalk-connector.send GatewayMethod", - }) - return DeliveryResult( - channel_id="dingtalk", - message_id="", - success=False, - error="Proactive push not yet implemented; plugin.ts passive replies go through sessionWebhook", - ) + try: + from flocks.config.config import Config + cfg = await Config.get() + channels = cfg.channels or {} + # Only treat the live config as authoritative when ``dingtalk`` is + # actually declared — ``get_channel_config`` synthesises a default + # ``ChannelConfig()`` for missing channels, whose model_dump still + # contains non-credential fields and would mask self._config. + if "dingtalk" in channels: + # by_alias=True keeps clientId/clientSecret as their JSON + # names (rather than pydantic's snake_case attributes) so + # _merged_app_key/_merged_app_secret pick them up unchanged. + return channels["dingtalk"].model_dump( + by_alias=True, exclude_none=True, + ) + except Exception as e: + log.warning("dingtalk.send_text.config_fallback", {"error": str(e)}) + return dict(self._config or {}) # ── Internal methods ────────────────────────────────────────────────────── diff --git a/.flocks/plugins/channels/dingtalk/runner.ts b/.flocks/plugins/channels/dingtalk/runner.ts index 1364fa4f..dd9f2d07 100644 --- a/.flocks/plugins/channels/dingtalk/runner.ts +++ b/.flocks/plugins/channels/dingtalk/runner.ts @@ -75,6 +75,93 @@ function buildSessionTitle(sessionKey: string): string { } } +/** + * Register the (channel, conversation) → session mapping in flocks's + * channel_bindings table so that the channel_message tool / + * POST /api/channel/session-send can route outbound replies back to this + * DingTalk conversation. + * + * This mirrors what InboundDispatcher.binding_service.resolve_or_create + * does for Feishu / WeCom / Telegram — DingTalk creates its session + * out-of-process, so we have to register the binding explicitly. + * + * IMPORTANT — chat_id resolution for groups: + * plugin.ts builds `peerId` differently depending on groupSessionScope: + * - `group` (default): peerId = conversationId (routable) + * - `group_sender`: peerId = `${conversationId}:${senderId}` + * A SESSION-ISOLATION composite, NOT a send target — OAPI expects the + * bare `openConversationId`. + * So for groups we must take `info.conversationId` and never fall back to + * `peerId`. `peerId` continues to drive session isolation on the + * sessionKey side; the binding row only stores the outbound-routable id. + * + * For direct chats peerId == senderId == staffId, which is the correct + * user target for `/v1.0/robot/oToMessages/batchSend`. + * + * Edge case — `separateSessionByConversation=false` + group: + * The connector omits `conversationId` from the sessionKey entirely and + * uses `peerId = senderId`, so there is no way to recover the + * openConversationId here. We skip binding with a warn rather than write + * a record that would resolve to the wrong target. + * + * Best-effort: a failure here only means the channel_message tool will 404 + * for this session, the inbound reply path keeps working. + */ +async function registerChannelBinding(sessionKey: string, sessionId: string): Promise { + let chatType: 'direct' | 'group' = 'direct'; + let chatId = ''; + + try { + const info = JSON.parse(sessionKey); + chatType = info.chatType === 'group' ? 'group' : 'direct'; + + if (chatType === 'group') { + chatId = info.conversationId || ''; + } else { + chatId = info.peerId || info.senderId || ''; + } + } catch { + // sessionKey is not JSON (legacy plain string) — treat as an opaque + // direct id. + chatId = sessionKey; + } + + if (!chatId) { + console.warn( + `[runner] bind skipped: no routable chat_id for sessionKey=${sessionKey} ` + + `(typical cause: separateSessionByConversation=false + group, where ` + + `the session cannot be mapped back to an openConversationId)` + ); + return; + } + + // The flocks-side channel_id is "dingtalk" (see ChannelMeta.id). Other + // values like "dingtalk-connector" are aliases declared on ChannelMeta and + // are accepted by the registry but the canonical binding row uses the id. + const url = `${FLOCKS_BASE}/api/channel/dingtalk/bind`; + const body = { + session_id: sessionId, + chat_id: chatId, + chat_type: chatType, + account_id: ACCOUNT_ID === '__default__' ? 'default' : ACCOUNT_ID, + }; + + try { + const r = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }); + if (!r.ok) { + console.warn(`[runner] bind failed: ${r.status} ${await r.text()}`); + } else if (DEBUG) { + console.log(`[runner] bind ok: chat_id=${chatId} chat_type=${chatType} session=${sessionId}`); + } + } catch (e: any) { + console.warn(`[runner] bind error: ${e?.message || e}`); + } +} + async function getOrCreateSession(sessionKey: string, agentName: string): Promise { const existing = sessionMap.get(sessionKey); if (existing) { @@ -100,6 +187,12 @@ async function getOrCreateSession(sessionKey: string, agentName: string): Promis const sessionId: string = data.id; sessionMap.set(sessionKey, sessionId); console.log(`[runner] session created: key=${sessionKey} id=${sessionId}`); + + // Register the channel binding so outbound tools can reach this session. + // Done after the in-memory cache write so a slow/failing bind never + // forces a duplicate session on the next message. + await registerChannelBinding(sessionKey, sessionId); + return sessionId; } diff --git a/flocks/channel/builtin/dingtalk/__init__.py b/flocks/channel/builtin/dingtalk/__init__.py new file mode 100644 index 00000000..11dc5664 --- /dev/null +++ b/flocks/channel/builtin/dingtalk/__init__.py @@ -0,0 +1,32 @@ +""" +DingTalk outbound send library. + +This package intentionally does **not** ship a ``ChannelPlugin`` class — +the inbound side is owned by the project-local plugin +``.flocks/plugins/channels/dingtalk/dingtalk.py`` (Node.js connector). + +Other code (channel plugins, tools, hooks, …) can drive active outbound +messages by importing :func:`send_message_app` directly. Only the +enterprise app robot OAPI path is supported; custom group webhooks are +intentionally out of scope. +""" + +from flocks.channel.builtin.dingtalk.client import ( + DingTalkApiError, + close_http_client, +) +from flocks.channel.builtin.dingtalk.config import ( + strip_target_prefix, +) +from flocks.channel.builtin.dingtalk.send import ( + build_app_payload, + send_message_app, +) + +__all__ = [ + "DingTalkApiError", + "build_app_payload", + "close_http_client", + "send_message_app", + "strip_target_prefix", +] diff --git a/flocks/channel/builtin/dingtalk/client.py b/flocks/channel/builtin/dingtalk/client.py new file mode 100644 index 00000000..8e45395d --- /dev/null +++ b/flocks/channel/builtin/dingtalk/client.py @@ -0,0 +1,278 @@ +""" +Lightweight async wrapper around the DingTalk OAPI (enterprise app robot). + +Handles ``access_token`` refresh and basic HTTP calls. Uses a persistent +``httpx.AsyncClient`` to reuse TCP connections, mirroring the Feishu client. + +Multi-account is supported via per-account (``appKey``) token caches. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any, Dict, Optional + +import httpx + +from flocks.channel.builtin.dingtalk.config import ( + DINGTALK_API_BASE, + DINGTALK_TOKEN_URL, + resolve_account_config, + resolve_account_credentials, +) +from flocks.utils.log import Log + +log = Log.create(service="channel.dingtalk.client") + +# --- persistent HTTP client --- + +_http_client: Optional[httpx.AsyncClient] = None +_http_lock = asyncio.Lock() + + +async def _get_http_client() -> httpx.AsyncClient: + """Return (and lazily create) the shared persistent HTTP client.""" + global _http_client + if _http_client is not None and not _http_client.is_closed: + return _http_client + async with _http_lock: + if _http_client is not None and not _http_client.is_closed: + return _http_client + _http_client = httpx.AsyncClient( + timeout=30, + limits=httpx.Limits( + max_connections=20, + max_keepalive_connections=10, + ), + ) + return _http_client + + +async def close_http_client() -> None: + """Close the persistent HTTP client (call during shutdown).""" + global _http_client + if _http_client is not None: + try: + await _http_client.aclose() + except Exception: + pass + _http_client = None + + +# --- token cache (keyed by appKey) --- + +_token_cache: Dict[str, tuple[str, float]] = {} +_token_lock = asyncio.Lock() +_per_key_locks: Dict[str, asyncio.Lock] = {} + + +class DingTalkApiError(RuntimeError): + """Structured DingTalk API error with business code and retryability hints.""" + + def __init__( + self, + message: str, + *, + code: Optional[str] = None, + http_status: Optional[int] = None, + retryable: bool = False, + response: Optional[dict] = None, + ) -> None: + super().__init__(message) + self.code = code + self.http_status = http_status + self.retryable = retryable + self.response = response or {} + + +def _is_retryable_error( + *, + code: Optional[str], + http_status: Optional[int], + message: str, +) -> bool: + msg = message.lower() + if http_status in {408, 429, 500, 502, 503, 504}: + return True + if code and code.lower() in { + "internalerror", + "throttling.api", + "throttling.user", + "throttling", + }: + return True + return ( + "rate limit" in msg + or "too many requests" in msg + or "timeout" in msg + or "temporarily unavailable" in msg + ) + + +def ensure_api_success( + data: dict, + *, + context: str, + http_status: Optional[int] = None, +) -> dict: + """Raise :class:`DingTalkApiError` when a response indicates failure. + + The OAPI v1.0 endpoints typically return ``{"code": "...", "message": "..."}`` + on error and a domain payload on success. Some legacy endpoints return + ``{"errcode": 0, "errmsg": "ok", ...}`` instead, so both are handled. + """ + # Legacy /robot/send style payload + if "errcode" in data: + errcode = data.get("errcode") + if errcode in (0, "0", None): + return data + msg = str(data.get("errmsg") or f"errcode {errcode}") + raise DingTalkApiError( + f"{context}: {msg}", + code=str(errcode), + http_status=http_status, + retryable=_is_retryable_error( + code=str(errcode), + http_status=http_status, + message=msg, + ), + response=data, + ) + + # OAPI v1.0 style error payload + code = data.get("code") + if code is None or code == "": + return data + msg = str(data.get("message") or data.get("msg") or f"code {code}") + raise DingTalkApiError( + f"{context}: {msg}", + code=str(code), + http_status=http_status, + retryable=_is_retryable_error( + code=str(code), + http_status=http_status, + message=msg, + ), + response=data, + ) + + +async def get_access_token(app_key: str, app_secret: str) -> str: + """Obtain (or reuse cached) DingTalk OAPI access_token. + + The v1.0 endpoint returns ``{"accessToken": "...", "expireIn": 7200}``. + """ + cache_key = f"{DINGTALK_TOKEN_URL}|{app_key}" + + cached = _token_cache.get(cache_key) + if cached: + token, expires_at = cached + if time.time() < expires_at - 60: + return token + + async with _token_lock: + if cache_key not in _per_key_locks: + _per_key_locks[cache_key] = asyncio.Lock() + key_lock = _per_key_locks[cache_key] + + async with key_lock: + cached = _token_cache.get(cache_key) + if cached: + token, expires_at = cached + if time.time() < expires_at - 60: + return token + + client = await _get_http_client() + resp = await client.post(DINGTALK_TOKEN_URL, json={ + "appKey": app_key, + "appSecret": app_secret, + }) + resp.raise_for_status() + data = resp.json() + + ensure_api_success( + data, + context="DingTalk access token request failed", + http_status=resp.status_code, + ) + token = data.get("accessToken") + if not token: + raise DingTalkApiError( + "DingTalk access token request failed: missing accessToken", + http_status=resp.status_code, + response=data, + ) + expire = int(data.get("expireIn") or 7200) + _token_cache[cache_key] = (token, time.time() + expire) + return token + + +async def api_request( + method: str, + path: str, + *, + app_key: str, + app_secret: str, + params: Optional[dict] = None, + json_body: Optional[dict] = None, +) -> dict: + """Send an authenticated request to the DingTalk OAPI v1.0 endpoints.""" + token = await get_access_token(app_key, app_secret) + url = f"{DINGTALK_API_BASE}{path}" if not path.startswith("http") else path + client = await _get_http_client() + resp = await client.request( + method, url, + params=params, + json=json_body, + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + ) + # OAPI v1.0 returns a non-2xx status for most business errors; surface the + # body before raising so we get the structured ``code`` / ``message`` instead + # of an opaque ``HTTPStatusError``. + try: + data = resp.json() + except ValueError: + resp.raise_for_status() + return {} + + if resp.status_code >= 400 and isinstance(data, dict): + ensure_api_success( + data, + context=f"DingTalk API request failed: {method} {path}", + http_status=resp.status_code, + ) + + resp.raise_for_status() + return ensure_api_success( + data if isinstance(data, dict) else {}, + context=f"DingTalk API request failed: {method} {path}", + http_status=resp.status_code, + ) + + +async def api_request_for_account( + method: str, + path: str, + *, + config: dict, + account_id: Optional[str] = None, + params: Optional[dict] = None, + json_body: Optional[dict] = None, +) -> dict: + """Convenience wrapper that resolves credentials from config + account_id.""" + _ = resolve_account_config(config, account_id) + app_key, app_secret, _robot_code = resolve_account_credentials(config, account_id) + if not app_key or not app_secret: + raise ValueError( + "DingTalk appKey/appSecret not configured" + + (f" for account '{account_id}'" if account_id else "") + ) + return await api_request( + method, path, + app_key=app_key, app_secret=app_secret, + params=params, json_body=json_body, + ) diff --git a/flocks/channel/builtin/dingtalk/config.py b/flocks/channel/builtin/dingtalk/config.py new file mode 100644 index 00000000..38176a90 --- /dev/null +++ b/flocks/channel/builtin/dingtalk/config.py @@ -0,0 +1,165 @@ +""" +DingTalk-specific configuration constants and helpers. + +Outbound is supported only via the **enterprise app robot OAPI** (sometimes +called the *stream/app push* path): + +- Required: ``appKey`` + ``appSecret``. +- ``clientId`` / ``clientSecret`` are accepted as aliases for ``appKey`` / + ``appSecret`` (the DingTalk Stream / DWClient docs use the former names but + refer to the same credential pair). +- ``robotCode`` defaults to ``appKey`` (DingTalk's standard "enterprise + internal app robot" setup uses the same string for both). Override only + when one app hosts multiple robots and the console issues a distinct code. +- Sends via the OAPI domain ``https://api.dingtalk.com`` + (``/v1.0/robot/oToMessages/batchSend`` for 1:1, ``/v1.0/robot/groupMessages/send`` + for groups). + +Custom group robot incoming webhooks are intentionally **not** supported. +""" + +from __future__ import annotations + +from typing import Optional + +# OAPI base used by the enterprise app robot APIs (access_token, batchSend, …). +DINGTALK_API_BASE = "https://api.dingtalk.com" +DINGTALK_TOKEN_URL = f"{DINGTALK_API_BASE}/v1.0/oauth2/accessToken" + + +def strip_target_prefix(to: str) -> str: + """Remove ``user:`` / ``chat:`` prefixes from a DingTalk target.""" + if not to: + return "" + for prefix in ("user:", "chat:"): + if to.startswith(prefix): + return to[len(prefix):] + return to + + +def resolve_target_kind(to: str) -> str: + """Infer whether *to* points at a user or a group conversation. + + Conventions: + - ``user:`` → user (1:1 message via ``robot/oToMessages/batchSend``) + - ``chat:`` → group (``robot/groupMessages/send``) + - ``cid`` prefix or strings starting with ``cid`` — DingTalk group convention + - otherwise default to ``user`` + """ + if not to: + return "user" + if to.startswith("chat:"): + return "group" + if to.startswith("user:"): + return "user" + bare = to.lstrip() + if bare.startswith("cid"): + return "group" + return "user" + + +def _merged_app_key(source: dict) -> str: + """Return the effective ``appKey`` for *source*. + + DingTalk Stream config historically uses ``clientId`` while the OAPI v1.0 + docs use ``appKey`` — they are the same credential. Accepting both means + the project-local Node.js connector's existing ``flocks.json`` works + without changes. + """ + return str(source.get("appKey") or source.get("clientId") or "") + + +def _merged_app_secret(source: dict) -> str: + """Return the effective ``appSecret`` for *source*. See :func:`_merged_app_key`.""" + return str(source.get("appSecret") or source.get("clientSecret") or "") + + +def list_account_configs( + config: dict, + *, + require_credentials: bool = False, +) -> list[dict]: + """Return merged per-account configs, including ``_account_id`` metadata. + + Mirrors :func:`flocks.channel.builtin.feishu.config.list_account_configs` + but filters by DingTalk OAPI credentials (``appKey``+``appSecret``). + ``clientId`` / ``clientSecret`` are accepted as aliases. + """ + accounts_cfg: dict = config.get("accounts", {}) or {} + + def _has_credentials(merged: dict) -> bool: + return bool(_merged_app_key(merged)) and bool(_merged_app_secret(merged)) + + def _should_include(merged: dict) -> bool: + if require_credentials and not _has_credentials(merged): + return False + return True + + if not accounts_cfg: + merged = {**config, "_account_id": config.get("_account_id", "default")} + return [merged] if _should_include(merged) else [] + + result: list[dict] = [] + top_level_has_credentials = _has_credentials(config) + if "default" not in accounts_cfg and top_level_has_credentials: + merged = {**config, "_account_id": "default"} + if _should_include(merged): + result.append(merged) + + for acc_id, acc_overrides in accounts_cfg.items(): + acc_overrides = acc_overrides or {} + if not acc_overrides.get("enabled", True): + continue + merged = {**config, **acc_overrides, "_account_id": acc_id} + merged.pop("accounts", None) + if _should_include(merged): + result.append(merged) + + if result: + return result + + merged = {**config, "_account_id": "default"} + return [merged] if _should_include(merged) else [] + + +def resolve_account_credentials( + config: dict, + account_id: Optional[str], +) -> tuple[str, str, str]: + """Return ``(appKey, appSecret, robotCode)`` for the given account. + + Falls back to top-level config when the named account omits a field. + Accepts ``clientId`` / ``clientSecret`` as aliases for ``appKey`` / + ``appSecret`` so that DingTalk Stream-style configs work unchanged. + + ``robotCode`` defaults to the resolved ``appKey`` — for an enterprise + internal app robot DingTalk uses the same string for both. Set + ``robotCode`` explicitly only when one app hosts multiple robots. + """ + if account_id and account_id != "default": + accounts = config.get("accounts", {}) or {} + acc = accounts.get(account_id, {}) or {} + app_key = _merged_app_key(acc) or _merged_app_key(config) + app_secret = _merged_app_secret(acc) or _merged_app_secret(config) + robot_code = acc.get("robotCode") or config.get("robotCode") or app_key + return app_key, app_secret, robot_code + + app_key = _merged_app_key(config) + return ( + app_key, + _merged_app_secret(config), + config.get("robotCode") or app_key, + ) + + +def resolve_account_config(config: dict, account_id: Optional[str]) -> dict: + """Merge top-level config with the named account's overrides.""" + if not account_id or account_id == "default": + return config + accounts = config.get("accounts", {}) or {} + acc = accounts.get(account_id, {}) or {} + if not acc: + return config + merged = {**config, **acc} + merged.pop("accounts", None) + return merged diff --git a/flocks/channel/builtin/dingtalk/send.py b/flocks/channel/builtin/dingtalk/send.py new file mode 100644 index 00000000..47fb6832 --- /dev/null +++ b/flocks/channel/builtin/dingtalk/send.py @@ -0,0 +1,184 @@ +""" +DingTalk message-sending helpers — active outbound only. + +Only the **enterprise app robot OAPI** path (``send_message_app``) is +supported. Targets are routed automatically: +- ``user:`` / staffId → ``/v1.0/robot/oToMessages/batchSend`` +- ``chat:`` / openConversationId → ``/v1.0/robot/groupMessages/send`` + +Long texts are chunked transparently. Custom group-robot incoming webhooks +are intentionally not supported. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +from flocks.channel.builtin.dingtalk.client import ( + DingTalkApiError, + api_request_for_account, +) +from flocks.channel.builtin.dingtalk.config import ( + resolve_target_kind, + strip_target_prefix, +) +from flocks.utils.log import Log + +log = Log.create(service="channel.dingtalk.send") + + +_DEFAULT_TEXT_CHUNK_LIMIT = 4000 +# DingTalk 群聊最长 5000 字符,单聊更宽,统一保守取 4000 与 Feishu 对齐 + + +# --------------------------------------------------------------------------- +# Payload builders +# --------------------------------------------------------------------------- + +def build_app_payload(text: str, render_mode: str) -> tuple[str, str]: + """Return ``(msgKey, msgParam_json)`` for the app-mode robot APIs. + + DingTalk 的 ``msgKey`` / ``msgParam`` 字段对应消息模板: + - ``sampleText`` → 纯文本 + - ``sampleMarkdown`` → markdown(含标题) + """ + if render_mode == "plain": + return "sampleText", json.dumps({"content": text}, ensure_ascii=False) + + title = _extract_title(text) or "通知" + return ( + "sampleMarkdown", + json.dumps({"title": title, "text": text}, ensure_ascii=False), + ) + + +def _extract_title(text: str) -> str: + """Pick the first non-empty line as the markdown title (capped at 64 chars).""" + for line in text.splitlines(): + stripped = line.strip().lstrip("#").strip() + if stripped: + return stripped[:64] + return "" + + +# --------------------------------------------------------------------------- +# Chunking (kept local so send.py is self-contained) +# --------------------------------------------------------------------------- + +def _chunk_text(text: str, limit: int) -> list[str]: + """Split long text into chunks, preferring newline boundaries.""" + if len(text) <= limit: + return [text] + chunks: list[str] = [] + remaining = text + while remaining: + if len(remaining) <= limit: + chunks.append(remaining) + break + cut = remaining[:limit].rfind("\n") + if cut <= limit // 4: + cut = limit + chunk = remaining[:cut].rstrip() + remaining = remaining[cut:].lstrip("\n") + if chunk: + chunks.append(chunk) + return chunks or [text] + + +# --------------------------------------------------------------------------- +# App-mode (enterprise robot) — OAPI v1.0 +# --------------------------------------------------------------------------- + +async def _send_app_one( + *, + config: dict, + to: str, + msg_key: str, + msg_param: str, + account_id: Optional[str], +) -> Dict[str, Any]: + """Send a single non-chunked message via the app-mode robot APIs.""" + from flocks.channel.builtin.dingtalk.config import resolve_account_credentials + + _, _, robot_code = resolve_account_credentials(config, account_id) + if not robot_code: + # robotCode defaults to appKey/clientId, so an empty value means the + # underlying credentials are missing — surface the actionable hint. + raise ValueError( + "DingTalk credentials not configured: provide appKey/clientId" + " (and appSecret/clientSecret) in the channel config" + + (f" for account '{account_id}'" if account_id else "") + ) + + bare = strip_target_prefix(to) + if not bare: + raise ValueError("DingTalk send: empty target") + + if resolve_target_kind(to) == "group": + body = { + "robotCode": robot_code, + "openConversationId": bare, + "msgKey": msg_key, + "msgParam": msg_param, + } + data = await api_request_for_account( + "POST", "/v1.0/robot/groupMessages/send", + config=config, account_id=account_id, json_body=body, + ) + return { + "message_id": str(data.get("processQueryKey") or ""), + "chat_id": bare, + } + + body = { + "robotCode": robot_code, + "userIds": [bare], + "msgKey": msg_key, + "msgParam": msg_param, + } + data = await api_request_for_account( + "POST", "/v1.0/robot/oToMessages/batchSend", + config=config, account_id=account_id, json_body=body, + ) + return { + "message_id": str(data.get("processQueryKey") or ""), + "chat_id": bare, + } + + +async def send_message_app( + *, + config: dict, + to: str, + text: str, + account_id: Optional[str] = None, +) -> Dict[str, Any]: + """Send a text/markdown message via the DingTalk enterprise robot OAPI. + + ``config["renderMode"]`` controls the payload: + - ``"plain"`` → sampleText + - otherwise → sampleMarkdown (default) + + Long texts are automatically split into multiple sequential messages. + Returns ``{"message_id": "...", "chat_id": "..."}`` for the *last* chunk. + """ + render_mode = str(config.get("renderMode") or "auto").lower() + chunk_limit = int(config.get("textChunkLimit", _DEFAULT_TEXT_CHUNK_LIMIT)) + chunks = _chunk_text(text, chunk_limit) + + last: Dict[str, Any] = {} + for chunk in chunks: + msg_key, msg_param = build_app_payload(chunk, render_mode) + last = await _send_app_one( + config=config, to=to, msg_key=msg_key, msg_param=msg_param, + account_id=account_id, + ) + return last + + +__all__ = [ + "DingTalkApiError", + "build_app_payload", + "send_message_app", +] diff --git a/flocks/channel/inbound/session_binding.py b/flocks/channel/inbound/session_binding.py index a4f997bc..14fca754 100644 --- a/flocks/channel/inbound/session_binding.py +++ b/flocks/channel/inbound/session_binding.py @@ -195,6 +195,59 @@ async def get_binding( ) -> Optional[SessionBinding]: return await self._find_binding(channel_id, account_id, chat_id, thread_id) + async def bind_session( + self, + *, + session_id: str, + channel_id: str, + account_id: str, + chat_id: str, + chat_type: ChatType, + thread_id: Optional[str] = None, + agent_id: Optional[str] = None, + ) -> SessionBinding: + """Register a binding for an *already-created* session. + + Used by out-of-process bridges (e.g. DingTalk's ``runner.ts``) that + create their Flocks session on their own and therefore bypass the + ``InboundDispatcher`` → :func:`resolve_or_create` path. Calling this + explicitly keeps ``channel_bindings`` in sync so that the + ``channel_message`` tool / ``POST /api/channel/session-send`` can + route outbound replies back to the original conversation. + + Idempotent: re-binding the same conversation key replaces the prior + row (the unique index is on + ``(channel_id, account_id, chat_id, COALESCE(thread_id, ''))``). + + Raises: + ValueError: if *session_id* does not exist. + """ + from flocks.session.session import Session as _Session + if not await _Session.get_by_id(session_id): + raise ValueError(f"Session '{session_id}' not found") + + now = time.time() + binding = SessionBinding( + channel_id=channel_id, + account_id=account_id or "default", + chat_id=chat_id, + chat_type=chat_type, + thread_id=thread_id, + session_id=session_id, + agent_id=agent_id, + created_at=now, + last_message_at=now, + ) + await self._insert(binding) + log.info("channel.binding.registered", { + "channel": channel_id, + "account_id": binding.account_id, + "chat_id": chat_id, + "session_id": session_id, + "chat_type": chat_type.value, + }) + return binding + async def unbind(self, session_id: str) -> None: db = await _get_db() await db.execute( diff --git a/flocks/channel/registry.py b/flocks/channel/registry.py index 0e6d6098..a690fd64 100644 --- a/flocks/channel/registry.py +++ b/flocks/channel/registry.py @@ -77,11 +77,16 @@ def reset(self) -> None: def _register_builtin_channels(self) -> None: from flocks.channel.builtin.feishu.channel import FeishuChannel - from flocks.channel.builtin.wecom.channel import WeComChannel from flocks.channel.builtin.telegram.channel import TelegramChannel + from flocks.channel.builtin.wecom.channel import WeComChannel self.register(FeishuChannel()) self.register(WeComChannel()) self.register(TelegramChannel()) + # DingTalk: inbound is owned by the project-local plugin at + # .flocks/plugins/channels/dingtalk/dingtalk.py (Node.js connector). + # The outbound send library lives in flocks.channel.builtin.dingtalk + # and is consumed directly by that plugin's send_text — no builtin + # ChannelPlugin is registered here to avoid id collisions. def _register_plugin_extension_point(self) -> None: from flocks.plugin import PluginLoader, ExtensionPoint diff --git a/flocks/server/routes/channel.py b/flocks/server/routes/channel.py index 3bce11a6..fa456a1d 100644 --- a/flocks/server/routes/channel.py +++ b/flocks/server/routes/channel.py @@ -189,6 +189,83 @@ async def record_inbound(channel_id: str): return {"ok": True} +class BindSessionRequest(BaseModel): + """Body for ``POST /api/channel/{channel_id}/bind``.""" + session_id: str + chat_id: str + chat_type: str = "direct" # "direct" | "group" + account_id: Optional[str] = "default" + thread_id: Optional[str] = None + agent_id: Optional[str] = None + + +@router.post("/{channel_id}/bind") +async def bind_session(channel_id: str, req: BindSessionRequest): + """Register a (channel, conversation) → session mapping in ``channel_bindings``. + + For Feishu/WeCom/Telegram this row is written automatically inside + ``InboundDispatcher`` → ``SessionBindingService.resolve_or_create``. Out- + of-process bridges (e.g. DingTalk's ``runner.ts``) create their Flocks + session on their own and must call this endpoint after each session + creation so that ``channel_message`` / ``POST /session-send`` can route + outbound replies back. + + Idempotent — re-binding the same conversation key replaces the prior row. + """ + from flocks.channel.base import ChatType + from flocks.channel.inbound.session_binding import SessionBindingService + + # Conversation-level bindings only — CHANNEL-broadcast style targets + # (e.g. Telegram channels) are not addressable by a single chat reply + # and would never be a legitimate ``channel_message`` destination. + if req.chat_type not in ("direct", "group"): + raise HTTPException( + status_code=400, + detail=f"Invalid chat_type '{req.chat_type}', expected 'direct' or 'group'", + ) + chat_type = ChatType(req.chat_type) + + # Defense-in-depth: some out-of-process bridges build composite + # session-isolation keys like ``:`` for + # per-sender group sessions (e.g. DingTalk's ``groupSessionScope= + # group_sender``). Such keys are NOT valid outbound targets — they + # would be fed as ``openConversationId`` to the platform API and fail + # to deliver. Reject them here so the bug can never regress silently + # into the bindings table. + if chat_type is ChatType.GROUP and ":" in req.chat_id: + raise HTTPException( + status_code=400, + detail=( + f"Invalid group chat_id '{req.chat_id}': contains ':', which " + "looks like a session-isolation composite (e.g. " + "':'). Pass the bare platform " + "conversation id (e.g. DingTalk openConversationId)." + ), + ) + + svc = SessionBindingService() + try: + binding = await svc.bind_session( + session_id=req.session_id, + channel_id=channel_id, + account_id=req.account_id or "default", + chat_id=req.chat_id, + chat_type=chat_type, + thread_id=req.thread_id, + agent_id=req.agent_id, + ) + except ValueError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + + return { + "ok": True, + "channel_id": binding.channel_id, + "session_id": binding.session_id, + "chat_id": binding.chat_id, + "chat_type": binding.chat_type.value, + } + + @router.post("/{channel_id}/restart") async def restart_channel(channel_id: str): """Restart a single channel connection with the latest config. diff --git a/tests/channel/test_dingtalk.py b/tests/channel/test_dingtalk.py new file mode 100644 index 00000000..f1433bb5 --- /dev/null +++ b/tests/channel/test_dingtalk.py @@ -0,0 +1,734 @@ +""" +Tests for the DingTalk active-outbound send library. + +Layout: + - send library → flocks.channel.builtin.dingtalk.{config,client,send} + - inbound owner → .flocks/plugins/channels/dingtalk/dingtalk.py (Node.js) + +Only the OAPI app-robot ("stream/app push") path is supported; custom +group-robot incoming webhooks are intentionally out of scope. + +The builtin package does NOT register a ChannelPlugin (to avoid id +collisions with the local plugin), so registry-side tests are absent. +The local Node.js plugin is owned separately and not exercised here. +""" + +from __future__ import annotations + +import importlib.util +import json +import sys +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest + +from flocks.channel.base import ChatType, OutboundContext +from flocks.channel.builtin.dingtalk.client import ( + DingTalkApiError, + ensure_api_success, +) +from flocks.channel.builtin.dingtalk.config import ( + list_account_configs, + resolve_account_credentials, + resolve_target_kind, + strip_target_prefix, +) +from flocks.channel.builtin.dingtalk.send import ( + build_app_payload, + send_message_app, +) + + +# ------------------------------------------------------------------ +# config helpers +# ------------------------------------------------------------------ + +class TestConfigHelpers: + def test_strip_target_prefix(self): + assert strip_target_prefix("user:abc") == "abc" + assert strip_target_prefix("chat:cidXYZ") == "cidXYZ" + assert strip_target_prefix("plain") == "plain" + assert strip_target_prefix("") == "" + + def test_resolve_target_kind(self): + assert resolve_target_kind("user:zhangsan") == "user" + assert resolve_target_kind("chat:cid1") == "group" + assert resolve_target_kind("cidABC123") == "group" + assert resolve_target_kind("zhangsan") == "user" + assert resolve_target_kind("") == "user" + + def test_resolve_account_credentials_default(self): + cfg = {"appKey": "K", "appSecret": "S", "robotCode": "R"} + assert resolve_account_credentials(cfg, None) == ("K", "S", "R") + assert resolve_account_credentials(cfg, "default") == ("K", "S", "R") + + def test_resolve_account_credentials_override(self): + cfg = { + "appKey": "K", "appSecret": "S", "robotCode": "R", + "accounts": { + "alice": {"appKey": "K2", "appSecret": "S2"}, + }, + } + assert resolve_account_credentials(cfg, "alice") == ("K2", "S2", "R") + + def test_resolve_account_credentials_accepts_client_id_alias(self): + # DingTalk Stream config uses clientId/clientSecret; the send library + # must transparently treat them as appKey/appSecret. + cfg = {"clientId": "K", "clientSecret": "S", "robotCode": "R"} + assert resolve_account_credentials(cfg, None) == ("K", "S", "R") + + def test_resolve_account_credentials_app_key_wins_over_alias(self): + cfg = { + "appKey": "PRIMARY", "clientId": "FALLBACK", + "appSecret": "PS", "clientSecret": "FS", + "robotCode": "R", + } + assert resolve_account_credentials(cfg, None) == ("PRIMARY", "PS", "R") + + def test_robot_code_defaults_to_app_key(self): + # Standard "enterprise internal app robot" — robotCode == appKey, + # so users don't have to repeat themselves in flocks.json. + cfg = {"appKey": "K", "appSecret": "S"} + assert resolve_account_credentials(cfg, None) == ("K", "S", "K") + + def test_robot_code_defaults_to_client_id_alias(self): + # Same fallback when only the Stream-style aliases are present. + cfg = {"clientId": "dingXYZ", "clientSecret": "S"} + assert resolve_account_credentials(cfg, None) == ("dingXYZ", "S", "dingXYZ") + + def test_robot_code_defaults_per_account(self): + # Per-account override of credentials should produce a per-account + # robotCode default — not a stale top-level fallback. + cfg = { + "appKey": "TOP_K", "appSecret": "TOP_S", + "accounts": { + "alice": {"appKey": "ALICE_K", "appSecret": "ALICE_S"}, + }, + } + assert resolve_account_credentials(cfg, "alice") == ( + "ALICE_K", "ALICE_S", "ALICE_K", + ) + + def test_explicit_robot_code_overrides_app_key_default(self): + cfg = {"appKey": "K", "appSecret": "S", "robotCode": "EXPLICIT"} + assert resolve_account_credentials(cfg, None) == ("K", "S", "EXPLICIT") + + def test_list_account_configs_top_level_app(self): + cfg = {"appKey": "k", "appSecret": "s", "robotCode": "r"} + accounts = list_account_configs(cfg, require_credentials=True) + assert len(accounts) == 1 + assert accounts[0]["_account_id"] == "default" + + def test_list_account_configs_accepts_client_id_alias(self): + cfg = {"clientId": "k", "clientSecret": "s"} + accounts = list_account_configs(cfg, require_credentials=True) + assert len(accounts) == 1 + + def test_list_account_configs_skips_disabled(self): + cfg = { + "robotCode": "r", + "accounts": { + "alice": {"appKey": "k", "appSecret": "s", "enabled": False}, + "bob": {"appKey": "k2", "appSecret": "s2"}, + }, + } + accounts = list_account_configs(cfg, require_credentials=True) + ids = {a["_account_id"] for a in accounts} + assert ids == {"bob"} + + def test_list_account_configs_filters_missing_credentials(self): + cfg = { + "accounts": { + "alice": {"appKey": "k"}, # missing appSecret + }, + } + accounts = list_account_configs(cfg, require_credentials=True) + assert accounts == [] + + +# ------------------------------------------------------------------ +# Payload builder +# ------------------------------------------------------------------ + +class TestAppPayloadBuilder: + def test_plain_text(self): + msg_key, msg_param = build_app_payload("hello", "plain") + assert msg_key == "sampleText" + assert json.loads(msg_param) == {"content": "hello"} + + def test_markdown_default(self): + msg_key, msg_param = build_app_payload("# 标题\n正文", "auto") + assert msg_key == "sampleMarkdown" + param = json.loads(msg_param) + assert param["title"] == "标题" + assert "正文" in param["text"] + + def test_markdown_uses_fallback_title_when_blank(self): + msg_key, msg_param = build_app_payload("\n\n ", "card") + param = json.loads(msg_param) + assert msg_key == "sampleMarkdown" + assert param["title"] == "通知" + + +# ------------------------------------------------------------------ +# send_message_app — routing between user and group +# ------------------------------------------------------------------ + +class TestSendApp: + async def test_user_target_uses_oto_endpoint(self): + captured: dict = {} + + async def _fake_request(method, path, *, config, account_id, json_body=None, params=None): + captured["path"] = path + captured["body"] = json_body + return {"processQueryKey": "pqk-1"} + + cfg = {"appKey": "k", "appSecret": "s", "robotCode": "r"} + with patch( + "flocks.channel.builtin.dingtalk.send.api_request_for_account", + new=AsyncMock(side_effect=_fake_request), + ): + result = await send_message_app( + config=cfg, to="user:zhangsan", text="hello", + ) + + assert captured["path"] == "/v1.0/robot/oToMessages/batchSend" + assert captured["body"]["userIds"] == ["zhangsan"] + assert captured["body"]["msgKey"] == "sampleMarkdown" + assert captured["body"]["robotCode"] == "r" + assert result["message_id"] == "pqk-1" + assert result["chat_id"] == "zhangsan" + + async def test_chat_target_uses_group_endpoint(self): + captured: dict = {} + + async def _fake_request(method, path, *, config, account_id, json_body=None, params=None): + captured["path"] = path + captured["body"] = json_body + return {"processQueryKey": "pqk-2"} + + cfg = { + "appKey": "k", "appSecret": "s", "robotCode": "r", + "renderMode": "plain", + } + with patch( + "flocks.channel.builtin.dingtalk.send.api_request_for_account", + new=AsyncMock(side_effect=_fake_request), + ): + await send_message_app( + config=cfg, to="chat:cid_GROUP_1", text="hi all", + ) + + assert captured["path"] == "/v1.0/robot/groupMessages/send" + assert captured["body"]["openConversationId"] == "cid_GROUP_1" + assert captured["body"]["msgKey"] == "sampleText" + + async def test_app_send_works_with_client_id_alias(self): + # Reuses the DingTalk Stream credential fields end-to-end. + captured: dict = {} + + async def _fake_request(method, path, *, config, account_id, json_body=None, params=None): + captured["path"] = path + captured["body"] = json_body + return {"processQueryKey": "pqk-3"} + + cfg = {"clientId": "ck", "clientSecret": "cs", "robotCode": "r"} + with patch( + "flocks.channel.builtin.dingtalk.send.api_request_for_account", + new=AsyncMock(side_effect=_fake_request), + ): + await send_message_app(config=cfg, to="user:u1", text="hello") + + assert captured["body"]["userIds"] == ["u1"] + assert captured["body"]["robotCode"] == "r" + + async def test_missing_credentials_raises(self): + # robotCode now defaults to appKey, so the only way the resolved + # robotCode is empty is when no credentials are configured at all. + cfg = {} + with pytest.raises(ValueError, match="credentials not configured"): + await send_message_app(config=cfg, to="user:abc", text="hi") + + async def test_robot_code_defaults_to_app_key_at_send_time(self): + cfg = {"appKey": "myapp", "appSecret": "s"} + captured: dict = {} + + async def _fake_request(method, path, *, config, account_id, json_body=None, params=None): + captured["body"] = json_body + return {"processQueryKey": "pqk"} + + with patch( + "flocks.channel.builtin.dingtalk.send.api_request_for_account", + new=_fake_request, + ): + await send_message_app(config=cfg, to="user:abc", text="hi") + + assert captured["body"]["robotCode"] == "myapp" + + async def test_empty_target_raises(self): + cfg = {"appKey": "k", "appSecret": "s", "robotCode": "r"} + with pytest.raises(ValueError, match="empty target"): + await send_message_app(config=cfg, to="", text="hi") + + async def test_long_text_chunks_into_multiple_calls(self): + calls: list[dict] = [] + + async def _fake_request(method, path, *, config, account_id, json_body=None, params=None): + calls.append({"path": path, "body": json_body}) + return {"processQueryKey": f"pqk-{len(calls)}"} + + cfg = { + "appKey": "k", "appSecret": "s", "robotCode": "r", + "textChunkLimit": 10, + } + with patch( + "flocks.channel.builtin.dingtalk.send.api_request_for_account", + new=AsyncMock(side_effect=_fake_request), + ): + await send_message_app( + config=cfg, to="user:u1", + text="abcde\nfghij\nklmno\npqrst", + ) + + assert len(calls) >= 2 + + +# ------------------------------------------------------------------ +# Client error parsing +# ------------------------------------------------------------------ + +class TestEnsureApiSuccess: + def test_legacy_errcode_zero_passes(self): + data = ensure_api_success({"errcode": 0, "errmsg": "ok"}, context="ctx") + assert data["errcode"] == 0 + + def test_legacy_errcode_non_zero_raises(self): + with pytest.raises(DingTalkApiError) as exc: + ensure_api_success( + {"errcode": 310000, "errmsg": "keywords not in content"}, + context="oapi", + ) + assert exc.value.code == "310000" + + def test_v1_code_field_raises(self): + with pytest.raises(DingTalkApiError) as exc: + ensure_api_success( + {"code": "InvalidParameter", "message": "bad robotCode"}, + context="oapi", + http_status=400, + ) + assert exc.value.code == "InvalidParameter" + assert exc.value.http_status == 400 + + def test_throttling_marked_retryable(self): + with pytest.raises(DingTalkApiError) as exc: + ensure_api_success( + {"code": "Throttling.Api", "message": "rate limit exceeded"}, + context="oapi", + http_status=429, + ) + assert exc.value.retryable is True + + def test_success_payload_passes_through(self): + # v1.0 success payloads typically lack a ``code`` field altogether. + data = ensure_api_success( + {"processQueryKey": "abc"}, + context="oapi", + http_status=200, + ) + assert data["processQueryKey"] == "abc" + + +# ------------------------------------------------------------------ +# Builtin package no longer registers a ChannelPlugin +# ------------------------------------------------------------------ + +class TestBuiltinHasNoChannelClass: + def test_no_dingtalk_in_builtin_registry(self): + from flocks.channel.registry import ChannelRegistry + reg = ChannelRegistry() + reg._register_builtin_channels() + # The builtin package intentionally exposes only a send library; + # the dingtalk id is owned by the project-local plugin. + assert reg.get("dingtalk") is None + + def test_builtin_package_has_no_channel_module(self): + spec = importlib.util.find_spec( + "flocks.channel.builtin.dingtalk.channel" + ) + assert spec is None + + +# ------------------------------------------------------------------ +# Local plugin send_text — delegates to send_message_app +# ------------------------------------------------------------------ + +# The local plugin lives under .flocks/plugins/channels/dingtalk/dingtalk.py; +# load it by file path because that directory is not on sys.path during tests. +_LOCAL_PLUGIN_PATH = ( + Path(__file__).resolve().parents[2] + / ".flocks/plugins/channels/dingtalk/dingtalk.py" +) + + +def _load_local_plugin_module(): + spec = importlib.util.spec_from_file_location( + "_test_dingtalk_local_plugin", _LOCAL_PLUGIN_PATH + ) + assert spec and spec.loader, f"cannot load {_LOCAL_PLUGIN_PATH}" + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + spec.loader.exec_module(mod) + return mod + + +class TestLocalPluginSendText: + """Active outbound (channel_message tool) goes through send_text.""" + + def _make_plugin(self, **config): + mod = _load_local_plugin_module() + plugin = mod.DingTalkChannel() + plugin._config = config + # Bypass the live `Config.get()` lookup added in send_text; tests + # supply the channel config directly. + async def _fake_resolve(self=plugin): + return dict(config) + plugin._resolve_outbound_config = _fake_resolve # type: ignore[assignment] + return plugin + + @pytest.mark.asyncio + async def test_send_text_delegates_to_send_message_app(self): + plugin = self._make_plugin( + clientId="dingkey", + clientSecret="secret", + robotCode="dingrobot", + ) + ctx = OutboundContext( + channel_id="dingtalk", + to="user:staff_001", + text="hello from rex", + account_id="default", + ) + + sent_kwargs = {} + + async def fake_send(**kwargs): + sent_kwargs.update(kwargs) + return {"message_id": "mid_xxx", "chat_id": "staff_001"} + + with patch( + "flocks.channel.builtin.dingtalk.send_message_app", + new=fake_send, + ): + result = await plugin.send_text(ctx) + + assert result.success is True + assert result.message_id == "mid_xxx" + assert result.chat_id == "staff_001" + assert sent_kwargs["to"] == "user:staff_001" + assert sent_kwargs["text"] == "hello from rex" + # The plugin must forward its full config (so robotCode reaches the lib). + assert sent_kwargs["config"]["robotCode"] == "dingrobot" + + @pytest.mark.asyncio + async def test_send_text_works_without_explicit_robot_code(self): + """robotCode defaults to clientId/appKey — no extra config needed.""" + plugin = self._make_plugin(clientId="dingkey", clientSecret="s") + ctx = OutboundContext(channel_id="dingtalk", to="user:staff_001", text="hi") + + sent_kwargs = {} + + async def fake_send(**kwargs): + sent_kwargs.update(kwargs) + return {"message_id": "m", "chat_id": "staff_001"} + + with patch( + "flocks.channel.builtin.dingtalk.send_message_app", + new=fake_send, + ): + result = await plugin.send_text(ctx) + + assert result.success is True + # The plugin must NOT inject robotCode itself; the send library + # resolves it from clientId via resolve_account_credentials. + assert "robotCode" not in sent_kwargs["config"] + + @pytest.mark.asyncio + async def test_send_text_missing_target_returns_error(self): + plugin = self._make_plugin(clientId="k", clientSecret="s") + ctx = OutboundContext(channel_id="dingtalk", to="", text="hi") + + result = await plugin.send_text(ctx) + + assert result.success is False + assert "to" in (result.error or "").lower() + + @pytest.mark.asyncio + async def test_resolve_outbound_config_reads_live_global_config(self): + """send_text must NOT depend on self._config: PluginLoader can register + a fresh DingTalkChannel after start() ran on the original instance, + leaving self._config = None on the one outbound actually picks up. + """ + mod = _load_local_plugin_module() + plugin = mod.DingTalkChannel() # NB: no plugin._config set on purpose + + from flocks.config.config import ChannelConfig as _CC + + live_cfg = _CC( + enabled=True, + **{ + "clientId": "live_key", + "clientSecret": "live_secret", + }, + ) + + # _resolve_outbound_config inspects ``cfg.channels`` directly so it + # can distinguish "not configured" from a synthesised default — the + # stub must therefore expose the same attribute. + class _FakeCfgInfo: + channels = {"dingtalk": live_cfg} + + async def _fake_get(): + return _FakeCfgInfo() + + with patch("flocks.config.config.Config.get", new=_fake_get): + data = await plugin._resolve_outbound_config() + + assert data["clientId"] == "live_key" + assert data["clientSecret"] == "live_secret" + + @pytest.mark.asyncio + async def test_send_text_propagates_dingtalk_api_error_as_failure(self): + plugin = self._make_plugin(clientId="k", clientSecret="s") + ctx = OutboundContext(channel_id="dingtalk", to="user:x", text="hi") + + async def raising_send(**_): + raise DingTalkApiError( + "throttled", code="Throttling.Api", retryable=True, + ) + + with patch( + "flocks.channel.builtin.dingtalk.send_message_app", + new=raising_send, + ): + result = await plugin.send_text(ctx) + + assert result.success is False + assert result.retryable is True + assert "throttled" in (result.error or "") + + +# ------------------------------------------------------------------ +# SessionBindingService.bind_session — used by runner.ts → /bind +# ------------------------------------------------------------------ + +class TestSessionBindingServiceBindSession: + @pytest.mark.asyncio + async def test_bind_session_inserts_row_for_existing_session(self, monkeypatch): + from flocks.channel.inbound import session_binding as sb_mod + + svc = sb_mod.SessionBindingService() + + monkeypatch.setattr( + "flocks.session.session.Session.get_by_id", + AsyncMock(return_value=SimpleNamespace(id="ses_42", agent="rex")), + ) + + inserted = [] + + async def fake_insert(binding): + inserted.append(binding) + + svc._insert = fake_insert # type: ignore[assignment] + + binding = await svc.bind_session( + session_id="ses_42", + channel_id="dingtalk", + account_id="default", + chat_id="cidXXXX", + chat_type=ChatType.GROUP, + ) + + assert binding.session_id == "ses_42" + assert binding.channel_id == "dingtalk" + assert binding.chat_type is ChatType.GROUP + assert inserted and inserted[0].chat_id == "cidXXXX" + + @pytest.mark.asyncio + async def test_bind_session_raises_when_session_missing(self, monkeypatch): + from flocks.channel.inbound import session_binding as sb_mod + + svc = sb_mod.SessionBindingService() + monkeypatch.setattr( + "flocks.session.session.Session.get_by_id", + AsyncMock(return_value=None), + ) + + with pytest.raises(ValueError, match="not found"): + await svc.bind_session( + session_id="ses_missing", + channel_id="dingtalk", + account_id="default", + chat_id="cidXXXX", + chat_type=ChatType.DIRECT, + ) + + +# ------------------------------------------------------------------ +# POST /api/channel/{channel_id}/bind — exposes bind_session over HTTP +# ------------------------------------------------------------------ + +class TestBindEndpoint: + @pytest.fixture + def client(self): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + from flocks.server.routes.channel import router + + app = FastAPI() + app.include_router(router, prefix="/api/channel") + return TestClient(app) + + def test_bind_endpoint_calls_service_and_returns_payload(self, client, monkeypatch): + from flocks.channel.base import ChatType as _ChatType + from flocks.channel.inbound import session_binding as sb_mod + + called = {} + + async def fake_bind(self, **kwargs): + called.update(kwargs) + return sb_mod.SessionBinding( + channel_id=kwargs["channel_id"], + account_id=kwargs["account_id"], + chat_id=kwargs["chat_id"], + chat_type=kwargs["chat_type"], + thread_id=kwargs.get("thread_id"), + session_id=kwargs["session_id"], + agent_id=kwargs.get("agent_id"), + created_at=0.0, + last_message_at=0.0, + ) + + monkeypatch.setattr(sb_mod.SessionBindingService, "bind_session", fake_bind) + + resp = client.post( + "/api/channel/dingtalk/bind", + json={ + "session_id": "ses_42", + "chat_id": "cidXXXX", + "chat_type": "group", + "account_id": "default", + }, + ) + + assert resp.status_code == 200, resp.text + body = resp.json() + assert body == { + "ok": True, + "channel_id": "dingtalk", + "session_id": "ses_42", + "chat_id": "cidXXXX", + "chat_type": "group", + } + assert called["channel_id"] == "dingtalk" + assert called["chat_type"] is _ChatType.GROUP + + def test_bind_endpoint_rejects_invalid_chat_type(self, client): + resp = client.post( + "/api/channel/dingtalk/bind", + json={ + "session_id": "ses_42", + "chat_id": "cidXXXX", + "chat_type": "channel", # not allowed + }, + ) + assert resp.status_code == 400 + assert "chat_type" in resp.json()["detail"] + + def test_bind_endpoint_returns_404_when_session_missing(self, client, monkeypatch): + from flocks.channel.inbound import session_binding as sb_mod + + async def raising(self, **_): + raise ValueError("Session 'ses_missing' not found") + + monkeypatch.setattr(sb_mod.SessionBindingService, "bind_session", raising) + + resp = client.post( + "/api/channel/dingtalk/bind", + json={ + "session_id": "ses_missing", + "chat_id": "x", + "chat_type": "direct", + }, + ) + + assert resp.status_code == 404 + assert "ses_missing" in resp.json()["detail"] + + def test_bind_endpoint_rejects_group_sender_composite_key(self, client, monkeypatch): + """group_sender mode builds peerId = `:`; + that composite is only valid for session isolation, not as a send + target. The endpoint must refuse to persist it so the bug cannot + regress into the bindings table. + """ + from flocks.channel.inbound import session_binding as sb_mod + + called = {"count": 0} + + async def _unexpected_bind(self, **_): + called["count"] += 1 + + monkeypatch.setattr( + sb_mod.SessionBindingService, "bind_session", _unexpected_bind, + ) + + resp = client.post( + "/api/channel/dingtalk/bind", + json={ + "session_id": "ses_42", + "chat_id": "cidXXXX:staff_001", # group_sender composite + "chat_type": "group", + }, + ) + + assert resp.status_code == 400 + body = resp.json() + assert "composite" in body["detail"].lower() + # Must NOT have reached the service: the check is meant to prevent + # the bad row from ever being written. + assert called["count"] == 0 + + def test_bind_endpoint_accepts_colon_in_direct_targets(self, client, monkeypatch): + """Some platforms embed ':' in user IDs (namespacing, e.g. feishu's + ``user:open_id``). The composite-key guard must only fire for + *group* chats, never for direct ones. + """ + from flocks.channel.inbound import session_binding as sb_mod + + async def fake_bind(self, **kwargs): + return sb_mod.SessionBinding( + channel_id=kwargs["channel_id"], + account_id=kwargs["account_id"], + chat_id=kwargs["chat_id"], + chat_type=kwargs["chat_type"], + thread_id=kwargs.get("thread_id"), + session_id=kwargs["session_id"], + agent_id=kwargs.get("agent_id"), + created_at=0.0, + last_message_at=0.0, + ) + + monkeypatch.setattr(sb_mod.SessionBindingService, "bind_session", fake_bind) + + resp = client.post( + "/api/channel/dingtalk/bind", + json={ + "session_id": "ses_42", + "chat_id": "user:staff_001", + "chat_type": "direct", + }, + ) + assert resp.status_code == 200, resp.text