Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .flocks/plugins/channels/dingtalk/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ function startProxy(): Promise<number> {
return;
}

// Record inbound activity as early as possible, before any Flocks API
// calls that might fail (session creation, inference, etc.).
fetch(`${FLOCKS_BASE}/api/channel/dingtalk/record-inbound`, { method: 'POST' }).catch(() => {});

res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Expand Down
2 changes: 1 addition & 1 deletion flocks/channel/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,4 @@ def mark_disconnected(self, error: Optional[str] = None) -> None:

def record_message(self) -> None:
"""Update the last-message timestamp (called by inbound dispatcher)."""
self._status.last_message_at = time.monotonic()
self._status.last_message_at = time.time()
7 changes: 3 additions & 4 deletions flocks/channel/builtin/feishu/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import asyncio
import json
import time
from collections import OrderedDict
from typing import Optional

Expand Down Expand Up @@ -95,7 +94,7 @@ async def _send_static(self, ctx: OutboundContext) -> DeliveryResult:
reply_to_id=ctx.reply_to_id,
account_id=ctx.account_id,
)
self._status.last_message_at = time.monotonic()
self.record_message()
return DeliveryResult(
channel_id="feishu",
message_id=result["message_id"],
Expand Down Expand Up @@ -130,7 +129,7 @@ async def _send_streaming(self, ctx: OutboundContext) -> DeliveryResult:
try:
# Write full text at once (send_text scenario has no per-chunk streaming data)
await card.finalize(ctx.text)
self._status.last_message_at = time.monotonic()
self.record_message()
return DeliveryResult(
channel_id="feishu",
message_id=message_id,
Expand Down Expand Up @@ -158,7 +157,7 @@ async def send_media(self, ctx: OutboundContext) -> DeliveryResult:
reply_to_id=ctx.reply_to_id,
account_id=ctx.account_id,
)
self._status.last_message_at = time.monotonic()
self.record_message()
return DeliveryResult(
channel_id="feishu",
message_id=result["message_id"],
Expand Down
21 changes: 19 additions & 2 deletions flocks/channel/gateway/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def start_all(self) -> None:
channel_id=channel_id,
plugin=plugin,
config=config_dict,
on_message=self._dispatcher.dispatch,
on_message=self._make_on_message(plugin, self._dispatcher.dispatch),
abort_event=abort_event,
),
name=f"channel-{channel_id}",
Expand Down Expand Up @@ -182,7 +182,7 @@ async def start_channel(self, channel_id: str) -> None:
channel_id=channel_id,
plugin=plugin,
config=config_dict,
on_message=self._dispatcher.dispatch,
on_message=self._make_on_message(plugin, self._dispatcher.dispatch),
abort_event=abort_event,
),
name=f"channel-{channel_id}",
Expand Down Expand Up @@ -328,6 +328,23 @@ async def _run_with_reconnect(
# Reconnect helpers
# ------------------------------------------------------------------

@staticmethod
def _make_on_message(
plugin: ChannelPlugin,
dispatch: Callable,
) -> Callable:
"""Wrap *dispatch* so that each inbound message records a timestamp."""
async def _on_message(msg) -> None:
plugin.record_message()
await dispatch(msg)
return _on_message

def record_message(self, channel_id: str) -> None:
"""Update last_message_at on a running channel plugin (e.g. from an HTTP handler)."""
plugin = self._running_plugins.get(channel_id) or self._registry.get(channel_id)
if plugin:
plugin.record_message()

@staticmethod
async def _mark_connected(plugin: ChannelPlugin, channel_id: str) -> None:
plugin.mark_connected()
Expand Down
11 changes: 11 additions & 0 deletions flocks/server/routes/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ async def list_channels():
]


@router.post("/{channel_id}/record-inbound")
async def record_inbound(channel_id: str):
"""Notify the gateway that a message was received on this channel.

Used by out-of-process bridges (e.g. DingTalk's runner.ts) that bypass the
InboundDispatcher so that last_message_at is updated on the plugin status.
"""
default_manager.record_message(channel_id)
return {"ok": True}


@router.post("/{channel_id}/restart")
async def restart_channel(channel_id: str):
"""Restart a single channel connection with the latest config.
Expand Down
Loading