diff --git a/.flocks/plugins/channels/dingtalk/runner.ts b/.flocks/plugins/channels/dingtalk/runner.ts index bf97d2f..1364fa4 100644 --- a/.flocks/plugins/channels/dingtalk/runner.ts +++ b/.flocks/plugins/channels/dingtalk/runner.ts @@ -258,6 +258,10 @@ function startProxy(): Promise { return; } + // Record inbound activity as early as possible, before any Flocks API + // calls that might fail (session creation, inference, etc.). + fetch(`${FLOCKS_BASE}/api/channel/dingtalk/record-inbound`, { method: 'POST' }).catch(() => {}); + res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', diff --git a/flocks/channel/base.py b/flocks/channel/base.py index 8bc37b7..7bac9f2 100644 --- a/flocks/channel/base.py +++ b/flocks/channel/base.py @@ -286,4 +286,4 @@ def mark_disconnected(self, error: Optional[str] = None) -> None: def record_message(self) -> None: """Update the last-message timestamp (called by inbound dispatcher).""" - self._status.last_message_at = time.monotonic() + self._status.last_message_at = time.time() diff --git a/flocks/channel/builtin/feishu/channel.py b/flocks/channel/builtin/feishu/channel.py index e811a3e..1db994a 100644 --- a/flocks/channel/builtin/feishu/channel.py +++ b/flocks/channel/builtin/feishu/channel.py @@ -10,7 +10,6 @@ import asyncio import json -import time from collections import OrderedDict from typing import Optional @@ -95,7 +94,7 @@ async def _send_static(self, ctx: OutboundContext) -> DeliveryResult: reply_to_id=ctx.reply_to_id, account_id=ctx.account_id, ) - self._status.last_message_at = time.monotonic() + self.record_message() return DeliveryResult( channel_id="feishu", message_id=result["message_id"], @@ -130,7 +129,7 @@ async def _send_streaming(self, ctx: OutboundContext) -> DeliveryResult: try: # Write full text at once (send_text scenario has no per-chunk streaming data) await card.finalize(ctx.text) - self._status.last_message_at = time.monotonic() + self.record_message() return DeliveryResult( channel_id="feishu", message_id=message_id, @@ -158,7 +157,7 @@ async def send_media(self, ctx: OutboundContext) -> DeliveryResult: reply_to_id=ctx.reply_to_id, account_id=ctx.account_id, ) - self._status.last_message_at = time.monotonic() + self.record_message() return DeliveryResult( channel_id="feishu", message_id=result["message_id"], diff --git a/flocks/channel/gateway/manager.py b/flocks/channel/gateway/manager.py index 555762e..2d26856 100644 --- a/flocks/channel/gateway/manager.py +++ b/flocks/channel/gateway/manager.py @@ -79,7 +79,7 @@ async def start_all(self) -> None: channel_id=channel_id, plugin=plugin, config=config_dict, - on_message=self._dispatcher.dispatch, + on_message=self._make_on_message(plugin, self._dispatcher.dispatch), abort_event=abort_event, ), name=f"channel-{channel_id}", @@ -182,7 +182,7 @@ async def start_channel(self, channel_id: str) -> None: channel_id=channel_id, plugin=plugin, config=config_dict, - on_message=self._dispatcher.dispatch, + on_message=self._make_on_message(plugin, self._dispatcher.dispatch), abort_event=abort_event, ), name=f"channel-{channel_id}", @@ -328,6 +328,23 @@ async def _run_with_reconnect( # Reconnect helpers # ------------------------------------------------------------------ + @staticmethod + def _make_on_message( + plugin: ChannelPlugin, + dispatch: Callable, + ) -> Callable: + """Wrap *dispatch* so that each inbound message records a timestamp.""" + async def _on_message(msg) -> None: + plugin.record_message() + await dispatch(msg) + return _on_message + + def record_message(self, channel_id: str) -> None: + """Update last_message_at on a running channel plugin (e.g. from an HTTP handler).""" + plugin = self._running_plugins.get(channel_id) or self._registry.get(channel_id) + if plugin: + plugin.record_message() + @staticmethod async def _mark_connected(plugin: ChannelPlugin, channel_id: str) -> None: plugin.mark_connected() diff --git a/flocks/server/routes/channel.py b/flocks/server/routes/channel.py index bbf5a81..3bce11a 100644 --- a/flocks/server/routes/channel.py +++ b/flocks/server/routes/channel.py @@ -178,6 +178,17 @@ async def list_channels(): ] +@router.post("/{channel_id}/record-inbound") +async def record_inbound(channel_id: str): + """Notify the gateway that a message was received on this channel. + + Used by out-of-process bridges (e.g. DingTalk's runner.ts) that bypass the + InboundDispatcher so that last_message_at is updated on the plugin status. + """ + default_manager.record_message(channel_id) + return {"ok": True} + + @router.post("/{channel_id}/restart") async def restart_channel(channel_id: str): """Restart a single channel connection with the latest config.