Skip to content

feat/dingtalk stream channel#174

Merged
xiami762 merged 4 commits intomainfrom
feature/dingtalk-stream-channel
Apr 24, 2026
Merged

feat/dingtalk stream channel#174
xiami762 merged 4 commits intomainfrom
feature/dingtalk-stream-channel

Conversation

@duguwanglong
Copy link
Copy Markdown
Contributor

fix(channel/dingtalk): harden stream runner against silent stalls and inbound floods

Production triage of the DingTalk stream channel surfaced two P0
risks that would only fire under specific real-world conditions:

R1 — Silent stall (gateway rate-limit / region block / suspended app)
The dingtalk-stream SDK's start() swallows every error from
open_connection() and returns None instead of raising. When
the gateway accepts our ticket but immediately tears down the
websocket (e.g. risk-control trips, app suspended), the runner
burns one preflight + one reconnect cycle every ~2-60s forever
without surfacing any permanent error — the channel looks
"connected" while delivering zero messages.

R2 — Unbounded inbound concurrency (group bursts)
ChatbotHandler.process() previously dispatched each frame via
asyncio.create_task() with no cap. A noisy group flooding 100+
messages would spawn 100+ concurrent on_message coroutines, each
holding an LLM round-trip — easy path to OOM, rate-limit storms
on the upstream model API, and starvation of other accounts
sharing the event loop.

Fixes

  • R1 — stall detection
    Track inbound delivery + per-run duration around every start()
    call. Treat "clean return + run < 30s + zero messages received"
    as a stall signal. After 5 consecutive short clean returns,
    raise the new DingTalkStreamStallError so the channel layer drops
    the account from the reconnect schedule (same path used today
    for permanent auth failures). Counter resets on the first
    healthy run so transient flakes can't accumulate strikes.

  • R3 — bounded inbound dispatch
    Replace the ad-hoc create_task fan-out with a bounded
    asyncio.Queue (default 256) drained by a fixed worker pool
    (default 8). SDK process() uses put_nowait so the heartbeat
    ack path stays non-blocking; QueueFull drops the new message
    and bumps a counter for telemetry. Both sizes are tunable per
    account via dispatchWorkers / dispatchQueueSize.

    Counting _messages_received at enqueue time (before back-pressure)
    preserves the R1 stall signal even when shedding load — "gateway
    is pushing us things we can't handle" is NOT the same failure
    mode as "gateway has gone silent".

Refactor

  • Introduce DingTalkPermanentError as the common base for
    non-retryable runner failures; DingTalkPermanentAuthError and the
    new DingTalkStreamStallError both inherit from it. channel.py's
    _classify_and_raise switches to the base class so future
    permanent error types are handled automatically.

Reliability bonus (P1.R4)

  • Bound the synchronous client.close() teardown with a 5s
    asyncio.wait_for so a hung socket can't stall a channel restart
    indefinitely. Times out → log warning, proceed with cancel.

…Mode plugin

Drops the legacy `.flocks/plugins/channels/dingtalk/` Node.js connector
in favour of a pure-Python `DingTalkChannel` built on the official
`dingtalk-stream` SDK (>= 0.20).  Inbound runs over the Stream Mode
WebSocket; outbound reuses the existing OAPI app-robot send library.

* Multi-account stream runner with message gating (require_mention /
  free_response_chats / mention_patterns / allowed_users) and
  rich-text / image extraction across SDK versions.
* Pre-flight gateway probe surfaces 4xx auth failures (bad clientId/
  clientSecret, app revoked, Stream Mode subscription not enabled) so
  the channel stops retrying unfixable credentials instead of looping
  endlessly inside the SDK.
* `_wait_until_done` raises a transient error when runners are
  cancelled by a concurrent `plugin.stop()` while `abort_event` is
  still clear -- fixes a gateway race where rapid restart calls left
  the channel stuck in "passive mode" with no live stream.
* DM `chat_id` is the sender's `staffId` (not the `conversation_id`)
  so replies route through `/v1.0/robot/oToMessages/batchSend` instead
  of failing with "robot 不存在" on `/groupMessages/send`.

Adds 62 unit tests covering config parsing, payload building, send
routing, gating, message extraction, pre-flight error classification,
and the gateway-race / DM-routing regressions.

Made-with: Cursor
…tup observability

Follow-up cleanup on top of the Stream Mode rewrite:

* Extract `_resolve_chat_id` / `_is_group_message` helpers and reuse
  them from both `chatbot_message_to_inbound` (outbound routing) and
  `DingTalkStreamRunner._dispatch` (inbound gating).  The two paths
  previously disagreed: `_dispatch` used `conversation_id or sender_id`
  unconditionally, while the inbound builder branched on DM vs group.
  This was harmless today but a latent bug — an admin whitelisting a
  chat in `free_response_chats` could see gating accept a DM and the
  reply still get routed somewhere else.
* Drop the unused `CallbackMessage` import (and its `None` shim in the
  `ImportError` fallback).
* Bump `dingtalk.stream.starting` from DEBUG to INFO and add a
  matching `dingtalk.stream.stopped` INFO when the SDK's `start()`
  returns cleanly.  Channel startup is a low-frequency, high-signal
  event; losing it in production logs makes it impossible to tell
  whether the SDK ever attempted to open a websocket.

Tests: 4 new unit tests in `tests/channel/test_dingtalk.py` covering
the helper's DM/group/fallback behaviour and a regression that pins
`_dispatch` and `chatbot_message_to_inbound` to the same `chat_id`
for any given message.  Full suite: 66 passed.

Made-with: Cursor
The "Gateway Token" field was a leftover from the legacy Node.js
OpenClaw connector (removed in 5812a31). The new pure-Python Stream
Mode channel talks to wss-open.dingtalk.com directly using only
clientId / clientSecret, so the backend never reads gatewayToken.
Keeping it in the UI only misled users into thinking an extra
credential was required.

- Remove gatewayToken from DingTalkChannelConfig interface and form
- Remove dingtalk.gatewayToken / gatewayTokenHint from zh-CN and en-US
  channel locales

Existing flocks.json entries with this field are silently ignored by
ChannelConfig (extra="allow") and will be cleaned up on next save.

Made-with: Cursor
… inbound floods

Production triage of the DingTalk stream channel surfaced two P0
risks that would only fire under specific real-world conditions:

R1 — Silent stall (gateway rate-limit / region block / suspended app)
    The dingtalk-stream SDK's start() swallows every error from
    open_connection() and returns ``None`` instead of raising.  When
    the gateway accepts our ticket but immediately tears down the
    websocket (e.g. risk-control trips, app suspended), the runner
    burns one preflight + one reconnect cycle every ~2-60s forever
    without surfacing any permanent error — the channel looks
    "connected" while delivering zero messages.

R2 — Unbounded inbound concurrency (group bursts)
    ChatbotHandler.process() previously dispatched each frame via
    asyncio.create_task() with no cap.  A noisy group flooding 100+
    messages would spawn 100+ concurrent on_message coroutines, each
    holding an LLM round-trip — easy path to OOM, rate-limit storms
    on the upstream model API, and starvation of other accounts
    sharing the event loop.

Fixes

* R1 — stall detection
  Track inbound delivery + per-run duration around every start()
  call.  Treat "clean return + run < 30s + zero messages received"
  as a stall signal.  After 5 consecutive short clean returns,
  raise the new DingTalkStreamStallError so the channel layer drops
  the account from the reconnect schedule (same path used today
  for permanent auth failures).  Counter resets on the first
  healthy run so transient flakes can't accumulate strikes.

* R3 — bounded inbound dispatch
  Replace the ad-hoc create_task fan-out with a bounded
  asyncio.Queue (default 256) drained by a fixed worker pool
  (default 8).  SDK process() uses put_nowait so the heartbeat
  ack path stays non-blocking; QueueFull drops the new message
  and bumps a counter for telemetry.  Both sizes are tunable per
  account via dispatchWorkers / dispatchQueueSize.

  Counting _messages_received at enqueue time (before back-pressure)
  preserves the R1 stall signal even when shedding load — "gateway
  is pushing us things we can't handle" is NOT the same failure
  mode as "gateway has gone silent".

Refactor

* Introduce DingTalkPermanentError as the common base for
  non-retryable runner failures; DingTalkPermanentAuthError and the
  new DingTalkStreamStallError both inherit from it.  channel.py's
  _classify_and_raise switches to the base class so future
  permanent error types are handled automatically.

Reliability bonus (P1.R4)

* Bound the synchronous client.close() teardown with a 5s
  asyncio.wait_for so a hung socket can't stall a channel restart
  indefinitely.  Times out → log warning, proceed with cancel.

Tests

* test_stall_detection_escalates_after_consecutive_short_clean_returns
* test_stall_counter_resets_after_inbound_message
* test_dispatch_queue_drops_overflow_without_blocking
* test_enqueue_before_pool_started_does_not_crash
* test_permanent_error_hierarchy

71/71 passing in tests/channel/test_dingtalk.py.

Made-with: Cursor
@xiami762 xiami762 self-requested a review April 24, 2026 06:41
@xiami762 xiami762 merged commit 9ff8076 into main Apr 24, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants