Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 112 additions & 21 deletions .flocks/plugins/channels/dingtalk/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
"clientId": "dingXXXXXX",
"clientSecret": "your_secret",
"defaultAgent": "rex"
// Active outbound (channel_message tool, agent-initiated push)
// reuses the same credential pair — robotCode defaults to clientId
// for the standard "enterprise internal app robot" setup. Override
// by adding "robotCode": "..." only if your app hosts multiple
// robots and the console issues a distinct code.
}
}
}
Expand Down Expand Up @@ -140,8 +145,14 @@ async def start(
Design note: DingTalk inbound messages are handled entirely inside the
runner.ts ↔ plugin.ts layer, which calls the flocks Session API directly.
The `on_message` / InboundDispatcher path (used by Feishu, WeCom, Telegram)
is intentionally NOT used here; this means dedup, debounce, channel.inbound
hooks and session-binding are the responsibility of plugin.ts itself.
is intentionally NOT used here; this means dedup, debounce and
channel.inbound hooks are the responsibility of plugin.ts itself.

Channel binding is *not* skipped: runner.ts calls
``POST /api/channel/dingtalk/bind`` after each session creation, so
``channel_bindings`` stays in sync with the rest of flocks and the
``channel_message`` tool can route outbound replies back through
:meth:`send_text`.
"""
self._config = config
self._on_message = on_message
Expand All @@ -151,8 +162,10 @@ async def start(

env = {
**os.environ,
"DINGTALK_CLIENT_ID": config.get("clientId", ""),
"DINGTALK_CLIENT_SECRET": config.get("clientSecret", ""),
# Accept appKey/appSecret as aliases so a single credential pair
# works for both Stream inbound (Node.js) and OAPI outbound (Python).
"DINGTALK_CLIENT_ID": config.get("clientId") or config.get("appKey", ""),
"DINGTALK_CLIENT_SECRET": config.get("clientSecret") or config.get("appSecret", ""),
"FLOCKS_PORT": str(flocks_port),
"FLOCKS_AGENT": config.get("defaultAgent", ""),
"FLOCKS_GATEWAY_TOKEN": config.get("gatewayToken", ""),
Expand All @@ -169,7 +182,7 @@ async def start(
log.info("dingtalk.start", {
"runner": str(_RUNNER_TS),
"flocks_port": flocks_port,
"client_id": config.get("clientId", ""),
"client_id": config.get("clientId") or config.get("appKey", ""),
})

self._start_process(npm, env)
Expand All @@ -188,26 +201,104 @@ async def stop(self) -> None:
self.mark_disconnected()

# ── Outbound messages ─────────────────────────────────────────────────────
# plugin.ts replies to DingTalk directly via sessionWebhook; flocks does not
# need to route through send_text. This method is required by the framework
# and is kept as a placeholder for proactive push support.
# Passive replies (agent → user, in response to an inbound message) are
# handled inside runner.ts ↔ plugin.ts via sessionWebhook and never reach
# send_text. Active push (e.g. the channel_message tool, or an agent
# proactively notifying a DingTalk user) goes through the shared OAPI
# send library at flocks.channel.builtin.dingtalk.send_message_app, which
# mirrors how Feishu / WeCom expose their outbound surfaces.

async def send_text(self, ctx: OutboundContext) -> DeliveryResult:
"""Push a text/markdown message to DingTalk via the OAPI app robot.

Reuses the inbound credentials (``clientId``/``clientSecret``, also
accepted as ``appKey``/``appSecret``). ``robotCode`` defaults to
``clientId`` — only set it explicitly when one app hosts multiple
robots. Targets must be ``user:<staffId>`` or
``chat:<openConversationId>``.

The channel config is re-read from :class:`flocks.config.config.Config`
on every call rather than from ``self._config``: PluginLoader scans
project-local plugins more than once on startup (default scan's
``project_subdir`` step + an explicit project scan), and each pass
registers a *fresh* ``DingTalkChannel()`` instance into the registry,
overwriting the one ``GatewayManager`` had run ``start()`` on. The
outbound path then receives an instance whose ``self._config`` is
``None``. Reading the config live also means UI edits take effect
without restarting the runner.
"""
Proactively push a text message (for agent-initiated DingTalk messages).
Passive replies from plugin.ts go through sessionWebhook and bypass this path.
Reserved for future extension; currently returns not-supported.
try:
from flocks.channel.builtin.dingtalk import (
DingTalkApiError,
send_message_app,
strip_target_prefix,
)
except ImportError as exc:
return DeliveryResult(
channel_id="dingtalk", message_id="",
success=False,
error=f"DingTalk send library unavailable: {exc}",
)

send_config = await self._resolve_outbound_config()
if not ctx.to or not strip_target_prefix(ctx.to):
return DeliveryResult(
channel_id="dingtalk", message_id="",
success=False,
error="DingTalk active outbound requires 'to' (user:<id> or chat:<id>)",
)

try:
result = await send_message_app(
config=send_config,
to=ctx.to,
text=ctx.text,
account_id=ctx.account_id,
)
self.record_message()
return DeliveryResult(
channel_id="dingtalk",
message_id=str(result.get("message_id", "")),
chat_id=result.get("chat_id"),
)
except Exception as e:
retryable = getattr(e, "retryable", False)
if not retryable and not isinstance(e, DingTalkApiError):
msg = str(e).lower()
retryable = "rate limit" in msg or "timeout" in msg
log.warning("dingtalk.send_text.failed", {
"to": ctx.to, "error": str(e), "retryable": retryable,
})
return DeliveryResult(
channel_id="dingtalk", message_id="",
success=False, error=str(e), retryable=retryable,
)

async def _resolve_outbound_config(self) -> dict:
"""Live-read the dingtalk channel config block from flocks.json.

Falls back to ``self._config`` (set by ``start()``) when the global
config can't be loaded — this keeps unit tests that bypass the global
config working.
"""
log.warning("dingtalk.send_text.not_implemented", {
"to": ctx.to,
"hint": "Proactive push requires the dingtalk-connector.send GatewayMethod",
})
return DeliveryResult(
channel_id="dingtalk",
message_id="",
success=False,
error="Proactive push not yet implemented; plugin.ts passive replies go through sessionWebhook",
)
try:
from flocks.config.config import Config
cfg = await Config.get()
channels = cfg.channels or {}
# Only treat the live config as authoritative when ``dingtalk`` is
# actually declared — ``get_channel_config`` synthesises a default
# ``ChannelConfig()`` for missing channels, whose model_dump still
# contains non-credential fields and would mask self._config.
if "dingtalk" in channels:
# by_alias=True keeps clientId/clientSecret as their JSON
# names (rather than pydantic's snake_case attributes) so
# _merged_app_key/_merged_app_secret pick them up unchanged.
return channels["dingtalk"].model_dump(
by_alias=True, exclude_none=True,
)
except Exception as e:
log.warning("dingtalk.send_text.config_fallback", {"error": str(e)})
return dict(self._config or {})

# ── Internal methods ──────────────────────────────────────────────────────

Expand Down
93 changes: 93 additions & 0 deletions .flocks/plugins/channels/dingtalk/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,93 @@ function buildSessionTitle(sessionKey: string): string {
}
}

/**
* Register the (channel, conversation) → session mapping in flocks's
* channel_bindings table so that the channel_message tool /
* POST /api/channel/session-send can route outbound replies back to this
* DingTalk conversation.
*
* This mirrors what InboundDispatcher.binding_service.resolve_or_create
* does for Feishu / WeCom / Telegram — DingTalk creates its session
* out-of-process, so we have to register the binding explicitly.
*
* IMPORTANT — chat_id resolution for groups:
* plugin.ts builds `peerId` differently depending on groupSessionScope:
* - `group` (default): peerId = conversationId (routable)
* - `group_sender`: peerId = `${conversationId}:${senderId}`
* A SESSION-ISOLATION composite, NOT a send target — OAPI expects the
* bare `openConversationId`.
* So for groups we must take `info.conversationId` and never fall back to
* `peerId`. `peerId` continues to drive session isolation on the
* sessionKey side; the binding row only stores the outbound-routable id.
*
* For direct chats peerId == senderId == staffId, which is the correct
* user target for `/v1.0/robot/oToMessages/batchSend`.
*
* Edge case — `separateSessionByConversation=false` + group:
* The connector omits `conversationId` from the sessionKey entirely and
* uses `peerId = senderId`, so there is no way to recover the
* openConversationId here. We skip binding with a warn rather than write
* a record that would resolve to the wrong target.
*
* Best-effort: a failure here only means the channel_message tool will 404
* for this session, the inbound reply path keeps working.
*/
async function registerChannelBinding(sessionKey: string, sessionId: string): Promise<void> {
let chatType: 'direct' | 'group' = 'direct';
let chatId = '';

try {
const info = JSON.parse(sessionKey);
chatType = info.chatType === 'group' ? 'group' : 'direct';

if (chatType === 'group') {
chatId = info.conversationId || '';
} else {
chatId = info.peerId || info.senderId || '';
}
} catch {
// sessionKey is not JSON (legacy plain string) — treat as an opaque
// direct id.
chatId = sessionKey;
}

if (!chatId) {
console.warn(
`[runner] bind skipped: no routable chat_id for sessionKey=${sessionKey} ` +
`(typical cause: separateSessionByConversation=false + group, where ` +
`the session cannot be mapped back to an openConversationId)`
);
return;
}

// The flocks-side channel_id is "dingtalk" (see ChannelMeta.id). Other
// values like "dingtalk-connector" are aliases declared on ChannelMeta and
// are accepted by the registry but the canonical binding row uses the id.
const url = `${FLOCKS_BASE}/api/channel/dingtalk/bind`;
const body = {
session_id: sessionId,
chat_id: chatId,
chat_type: chatType,
account_id: ACCOUNT_ID === '__default__' ? 'default' : ACCOUNT_ID,
};

try {
const r = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
if (!r.ok) {
console.warn(`[runner] bind failed: ${r.status} ${await r.text()}`);
} else if (DEBUG) {
console.log(`[runner] bind ok: chat_id=${chatId} chat_type=${chatType} session=${sessionId}`);
}
} catch (e: any) {
console.warn(`[runner] bind error: ${e?.message || e}`);
}
}

async function getOrCreateSession(sessionKey: string, agentName: string): Promise<string> {
const existing = sessionMap.get(sessionKey);
if (existing) {
Expand All @@ -100,6 +187,12 @@ async function getOrCreateSession(sessionKey: string, agentName: string): Promis
const sessionId: string = data.id;
sessionMap.set(sessionKey, sessionId);
console.log(`[runner] session created: key=${sessionKey} id=${sessionId}`);

// Register the channel binding so outbound tools can reach this session.
// Done after the in-memory cache write so a slow/failing bind never
// forces a duplicate session on the next message.
await registerChannelBinding(sessionKey, sessionId);

return sessionId;
}

Expand Down
32 changes: 32 additions & 0 deletions flocks/channel/builtin/dingtalk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
DingTalk outbound send library.

This package intentionally does **not** ship a ``ChannelPlugin`` class —
the inbound side is owned by the project-local plugin
``.flocks/plugins/channels/dingtalk/dingtalk.py`` (Node.js connector).

Other code (channel plugins, tools, hooks, …) can drive active outbound
messages by importing :func:`send_message_app` directly. Only the
enterprise app robot OAPI path is supported; custom group webhooks are
intentionally out of scope.
"""

from flocks.channel.builtin.dingtalk.client import (
DingTalkApiError,
close_http_client,
)
from flocks.channel.builtin.dingtalk.config import (
strip_target_prefix,
)
from flocks.channel.builtin.dingtalk.send import (
build_app_payload,
send_message_app,
)

__all__ = [
"DingTalkApiError",
"build_app_payload",
"close_http_client",
"send_message_app",
"strip_target_prefix",
]
Loading
Loading