Skip to content

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 11, 2026

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

在将事件转换并构建主代理请求时,增加对飞书/Feishu/Lark 消息的更丰富处理,以支持媒体、文件、音频、视频以及引用消息。

新功能:

  • 支持将飞书/Feishu/Lark 消息资源(图片、文件、音频和视频)解析和下载为内部消息组件。
  • 支持从父消息构建回复/引用组件,包括其内容摘要和元数据。
  • 在构建主代理请求时,同时包含主消息和被引用消息中的媒体附件,使其可供下游提供方使用。

增强改进:

  • 将飞书/Feishu/Lark 消息解析重构为可复用的辅助方法,用于组件提取和消息字符串构建,从而提升内容处理和日志记录的健壮性。
Original summary in English

Summary by Sourcery

Add richer Feishu/Lark message handling to support media, files, audio, video, and quoted messages when converting events and building main agent requests.

New Features:

  • Support parsing and downloading Feishu/Lark message resources (images, files, audio, and video) into internal message components.
  • Support constructing reply/quote components from parent messages, including their content summary and metadata.
  • Include media attachments from both main and quoted messages when building main agent requests so they are available to downstream providers.

Enhancements:

  • Refactor Feishu/Lark message parsing into reusable helpers for component extraction and message string construction, improving robustness of content handling and logging.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Feb 11, 2026
@dosubot
Copy link

dosubot bot commented Feb 11, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题,并留下了一些整体性的反馈:

  • _download_file_resource_to_temp 中,file_name 被嵌入到了最终文件名里,同时 suffix 又是基于 file_name 再推导出来的,这会生成类似 ..._<file_name>_<uuid>.ext 的名字,而 <file_name> 本身已经包含扩展名(例如 video.mp4_abcd.mp4);建议在格式化前先从 file_name 中去掉扩展名,或者当 file_name 已经包含扩展名时就不要再追加 suffix
  • _parse_message_components 中处理 mediapostmedia 标签的逻辑,与处理 file/audio 分支几乎完全相同;建议抽取一个小的辅助函数,用于复用“下载到临时文件并构建组件”这一通用流程,避免重复代码,也能让分支逻辑更易维护。
面向 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- In `_download_file_resource_to_temp`, `file_name` is embedded in the filename and `suffix` is derived from `file_name` again, which will produce names like `..._<file_name>_<uuid>.ext` where `<file_name>` already contains the extension (e.g. `video.mp4_abcd.mp4`); consider stripping the extension from `file_name` before formatting or not appending `suffix` when `file_name` already has one.
- The logic in `_parse_message_components` for handling `media` and `post` `media` tags is nearly identical to the `file`/`audio` branches; consider extracting a small helper for the common 'download to temp and build component' workflow to avoid duplication and keep the branching logic easier to maintain.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/lark/lark_adapter.py:191` </location>
<code_context>
+
+        return at_map
+
+    async def _parse_message_components(
+        self,
+        *,
</code_context>

<issue_to_address>
**issue (complexity):** 考虑将新的消息解析和回复构建辅助函数拆分为更小的、按类型划分的函数以及共享工具函数,以避免出现体量过大的“上帝函数”,同时让调用流程更易理解和测试。

新的辅助函数是一个很好的改进,但现在 `_parse_message_components``_build_reply_from_parent_id` 变成了核心的“上帝函数”。你可以在保持现有功能不变的前提下,通过以下方式让代码更易理解:

### 1. 将 `_parse_message_components` 拆分为按类型划分的处理函数

目前 `_parse_message_components` 同时负责所有分支逻辑以及所有 IO。你可以保持对外的公共 API 完全不变,但在内部把处理委托给更小的函数:

```python
async def _parse_message_components(
    self,
    *,
    message_id: str | None,
    message_type: str,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    handlers: dict[str, Callable[..., Awaitable[list[Comp.BaseMessageComponent]]]] = {
        "text": self._parse_text_components,
        "post": self._parse_post_or_image_components,
        "image": self._parse_post_or_image_components,
        "file": self._parse_file_components,
        "audio": self._parse_audio_components,
        "media": self._parse_media_components,
    }

    handler = handlers.get(message_type)
    if handler is None:
        return []

    return await handler(message_id=message_id, content=content, at_map=at_map)
```

每个处理函数就可以只专注于自己的类型:

```python
async def _parse_text_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    message_str_raw = str(content.get("text", ""))
    at_pattern = r"(@_user_\d+)"
    parts = re.split(at_pattern, message_str_raw)

    components: list[Comp.BaseMessageComponent] = []
    for part in parts:
        segment = part.strip()
        if not segment:
            continue
        if segment in at_map:
            components.append(at_map[segment])
        else:
            components.append(Comp.Plain(segment))
    return components
```

```python
async def _parse_post_or_image_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    components: list[Comp.BaseMessageComponent] = []

    comp_list = (
        self._parse_post_content(content)
        if content.get("content") is not None  # post
        else [{"tag": "img", "image_key": content.get("image_key")}]
    )

    for comp in comp_list:
        tag = comp.get("tag")
        # reuse your existing tag-handling logic here...
        # (at/text/a) branches stay here
        if tag == "img" or tag == "media":
            media_comp = await self._build_media_component_from_tag(
                message_id=message_id,
                tag=tag,
                comp=comp,
                message_type="post_media" if tag == "media" else "image",
            )
            if media_comp:
                components.append(media_comp)

    return components
```

这样可以保持现有行为不变,同时降低单个方法的圈复杂度,让每个处理函数都可以单独进行测试。

### 2. 抽取一个可复用的“基于文件的消息”辅助函数

`file``audio``media` 分支几乎完全一致:校验 `message_id`/`file_key`,通过 `_download_file_resource_to_temp` 下载,然后用对应的组件进行封装。

你可以把这个模式抽取成一个小的辅助函数:

```python
async def _build_file_based_component(
    self,
    *,
    message_id: str | None,
    file_key: str | None,
    message_type: str,
    file_name: str,
    default_suffix: str,
    factory: Callable[[str], Comp.BaseMessageComponent],
) -> Comp.BaseMessageComponent | None:
    if not message_id:
        logger.error(f"[Lark] {message_type} 消息缺少 message_id")
        return None
    if not file_key:
        logger.error(f"[Lark] {message_type} 消息缺少 file_key")
        return None

    file_path = await self._download_file_resource_to_temp(
        message_id=message_id,
        file_key=file_key,
        message_type=message_type,
        file_name=file_name,
        default_suffix=default_suffix,
    )
    if not file_path:
        return None
    return factory(file_path)
```

这样,每种类型的分支都可以变得非常精简:

```python
async def _parse_file_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_file"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="file",
        file_name=file_name,
        default_suffix=".bin",
        factory=lambda path: Comp.File(name=file_name, file=path),
    )
    return [comp] if comp else []
```

```python
async def _parse_audio_components(...):
    file_key = str(content.get("file_key", "")).strip()
    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="audio",
        file_name="lark_audio",
        default_suffix=".opus",
        factory=lambda path: Comp.Record(file=path, url=path),
    )
    return [comp] if comp else []
```

```python
async def _parse_media_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_media.mp4"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="media",
        file_name=file_name,
        default_suffix=".mp4",
        factory=lambda path: Comp.Video(file=path, path=path),
    )
    return [comp] if comp else []
```

这样可以保留所有 I/O 和命名行为,同时把三个几乎相同的分支统一到一个模式中。

### 3. 拆分 `_build_reply_from_parent_id`

`_build_reply_from_parent_id` 目前同时执行拉取、归一化、解析以及 `Comp.Reply` 构造。你可以把它拆分成三个各自专注的辅助函数:

```python
async def _fetch_parent_message(
    self, parent_message_id: str
):
    if self.lark_api.im is None:
        logger.error("[Lark] API Client im 模块未初始化")
        return None

    request = GetMessageRequest.builder().message_id(parent_message_id).build()
    response = await self.lark_api.im.v1.message.aget(request)
    if not response.success() or not response.data or not response.data.items:
        logger.error(
            f"[Lark] 获取引用消息失败 id={parent_message_id}, "
            f"code={response.code}, msg={response.msg}"
        )
        return None
    return response.data.items[0]
```

```python
def _normalize_parent_message(self, parent_message) -> dict[str, Any]:
    quoted_message_id = parent_message.message_id or "unknown"
    quoted_sender_id = (
        parent_message.sender.id
        if parent_message.sender and parent_message.sender.id
        else "unknown"
    )
    quoted_time_raw = parent_message.create_time or 0
    quoted_time = (
        quoted_time_raw // 1000
        if isinstance(quoted_time_raw, int) and quoted_time_raw > 10**11
        else quoted_time_raw
    )

    raw_content = (parent_message.body.content if parent_message.body else "") or ""
    content_json: dict[str, Any] = {}
    if raw_content:
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, dict):
                content_json = parsed
        except json.JSONDecodeError:
            logger.warning(f"[Lark] 解析引用消息内容失败 id={quoted_message_id}")

    return {
        "id": quoted_message_id,
        "sender_id": quoted_sender_id,
        "time": quoted_time,
        "type": parent_message.msg_type or "",
        "content": content_json,
        "mentions": parent_message.mentions,
    }
```

然后 `_build_reply_from_parent_id` 就变成了一个“胶水函数”:

```python
async def _build_reply_from_parent_id(
    self, parent_message_id: str
) -> Comp.Reply | None:
    parent = await self._fetch_parent_message(parent_message_id)
    if parent is None:
        return None

    normalized = self._normalize_parent_message(parent)
    quoted_at_map = self._build_at_map(normalized["mentions"])
    quoted_chain = await self._parse_message_components(
        message_id=normalized["id"],
        message_type=normalized["type"],
        content=normalized["content"],
        at_map=quoted_at_map,
    )
    quoted_text = self._build_message_str_from_components(quoted_chain)
    sender_id = normalized["sender_id"]
    sender_nickname = sender_id[:8] if sender_id != "unknown" else "unknown"

    return Comp.Reply(
        id=normalized["id"],
        chain=quoted_chain,
        sender_id=sender_id,
        sender_nickname=sender_nickname,
        time=normalized["time"],
        message_str=quoted_text,
        text=quoted_text,
    )
```

这样既能复用你已有的辅助函数,又能保留全部行为(包括时间戳归一化和 JSON 解析),同时让引用消息处理逻辑更容易测试和维护。
</issue_to_address>

Sourcery 对开源项目是免费的——如果你喜欢我们的代码审查,请考虑帮我们分享一下 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈来改进代码审查质量。
Original comment in English

Hey - I've found 1 issue, and left some high level feedback:

  • In _download_file_resource_to_temp, file_name is embedded in the filename and suffix is derived from file_name again, which will produce names like ..._<file_name>_<uuid>.ext where <file_name> already contains the extension (e.g. video.mp4_abcd.mp4); consider stripping the extension from file_name before formatting or not appending suffix when file_name already has one.
  • The logic in _parse_message_components for handling media and post media tags is nearly identical to the file/audio branches; consider extracting a small helper for the common 'download to temp and build component' workflow to avoid duplication and keep the branching logic easier to maintain.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_download_file_resource_to_temp`, `file_name` is embedded in the filename and `suffix` is derived from `file_name` again, which will produce names like `..._<file_name>_<uuid>.ext` where `<file_name>` already contains the extension (e.g. `video.mp4_abcd.mp4`); consider stripping the extension from `file_name` before formatting or not appending `suffix` when `file_name` already has one.
- The logic in `_parse_message_components` for handling `media` and `post` `media` tags is nearly identical to the `file`/`audio` branches; consider extracting a small helper for the common 'download to temp and build component' workflow to avoid duplication and keep the branching logic easier to maintain.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/lark/lark_adapter.py:191` </location>
<code_context>
+
+        return at_map
+
+    async def _parse_message_components(
+        self,
+        *,
</code_context>

<issue_to_address>
**issue (complexity):** Consider decomposing the new message parsing and reply-building helpers into smaller, type-specific functions and shared utilities to avoid large god functions and make the flow easier to follow and test.

The new helpers are a good step, but `_parse_message_components` and `_build_reply_from_parent_id` are now central “god functions”. You can keep all functionality and make the code easier to follow by:

### 1. Split `_parse_message_components` into type-specific handlers

Right now `_parse_message_components` does all branching plus all IO. You can keep the public API exactly as-is, but internally delegate to smaller functions:

```python
async def _parse_message_components(
    self,
    *,
    message_id: str | None,
    message_type: str,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    handlers: dict[str, Callable[..., Awaitable[list[Comp.BaseMessageComponent]]]] = {
        "text": self._parse_text_components,
        "post": self._parse_post_or_image_components,
        "image": self._parse_post_or_image_components,
        "file": self._parse_file_components,
        "audio": self._parse_audio_components,
        "media": self._parse_media_components,
    }

    handler = handlers.get(message_type)
    if handler is None:
        return []

    return await handler(message_id=message_id, content=content, at_map=at_map)
```

Each handler can then focus only on that type:

```python
async def _parse_text_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    message_str_raw = str(content.get("text", ""))
    at_pattern = r"(@_user_\d+)"
    parts = re.split(at_pattern, message_str_raw)

    components: list[Comp.BaseMessageComponent] = []
    for part in parts:
        segment = part.strip()
        if not segment:
            continue
        if segment in at_map:
            components.append(at_map[segment])
        else:
            components.append(Comp.Plain(segment))
    return components
```

```python
async def _parse_post_or_image_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    components: list[Comp.BaseMessageComponent] = []

    comp_list = (
        self._parse_post_content(content)
        if content.get("content") is not None  # post
        else [{"tag": "img", "image_key": content.get("image_key")}]
    )

    for comp in comp_list:
        tag = comp.get("tag")
        # reuse your existing tag-handling logic here...
        # (at/text/a) branches stay here
        if tag == "img" or tag == "media":
            media_comp = await self._build_media_component_from_tag(
                message_id=message_id,
                tag=tag,
                comp=comp,
                message_type="post_media" if tag == "media" else "image",
            )
            if media_comp:
                components.append(media_comp)

    return components
```

This preserves behavior but reduces cyclomatic complexity in a single method and makes each handler testable on its own.

### 2. Extract a reusable “file-based message” helper

The `file`, `audio`, and `media` branches are nearly identical: validate `message_id`/`file_key`, download via `_download_file_resource_to_temp`, then wrap in the appropriate component.

You can factor that pattern into a small helper:

```python
async def _build_file_based_component(
    self,
    *,
    message_id: str | None,
    file_key: str | None,
    message_type: str,
    file_name: str,
    default_suffix: str,
    factory: Callable[[str], Comp.BaseMessageComponent],
) -> Comp.BaseMessageComponent | None:
    if not message_id:
        logger.error(f"[Lark] {message_type} 消息缺少 message_id")
        return None
    if not file_key:
        logger.error(f"[Lark] {message_type} 消息缺少 file_key")
        return None

    file_path = await self._download_file_resource_to_temp(
        message_id=message_id,
        file_key=file_key,
        message_type=message_type,
        file_name=file_name,
        default_suffix=default_suffix,
    )
    if not file_path:
        return None
    return factory(file_path)
```

Then your per-type branches become very small:

```python
async def _parse_file_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_file"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="file",
        file_name=file_name,
        default_suffix=".bin",
        factory=lambda path: Comp.File(name=file_name, file=path),
    )
    return [comp] if comp else []
```

```python
async def _parse_audio_components(...):
    file_key = str(content.get("file_key", "")).strip()
    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="audio",
        file_name="lark_audio",
        default_suffix=".opus",
        factory=lambda path: Comp.Record(file=path, url=path),
    )
    return [comp] if comp else []
```

```python
async def _parse_media_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_media.mp4"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="media",
        file_name=file_name,
        default_suffix=".mp4",
        factory=lambda path: Comp.Video(file=path, path=path),
    )
    return [comp] if comp else []
```

This keeps all I/O and naming behavior intact while collapsing three near-identical branches into one pattern.

### 3. Decompose `_build_reply_from_parent_id`

`_build_reply_from_parent_id` currently performs fetch, normalization, parsing, and `Comp.Reply` construction. You can break it down into three focused helpers:

```python
async def _fetch_parent_message(
    self, parent_message_id: str
):
    if self.lark_api.im is None:
        logger.error("[Lark] API Client im 模块未初始化")
        return None

    request = GetMessageRequest.builder().message_id(parent_message_id).build()
    response = await self.lark_api.im.v1.message.aget(request)
    if not response.success() or not response.data or not response.data.items:
        logger.error(
            f"[Lark] 获取引用消息失败 id={parent_message_id}, "
            f"code={response.code}, msg={response.msg}"
        )
        return None
    return response.data.items[0]
```

```python
def _normalize_parent_message(self, parent_message) -> dict[str, Any]:
    quoted_message_id = parent_message.message_id or "unknown"
    quoted_sender_id = (
        parent_message.sender.id
        if parent_message.sender and parent_message.sender.id
        else "unknown"
    )
    quoted_time_raw = parent_message.create_time or 0
    quoted_time = (
        quoted_time_raw // 1000
        if isinstance(quoted_time_raw, int) and quoted_time_raw > 10**11
        else quoted_time_raw
    )

    raw_content = (parent_message.body.content if parent_message.body else "") or ""
    content_json: dict[str, Any] = {}
    if raw_content:
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, dict):
                content_json = parsed
        except json.JSONDecodeError:
            logger.warning(f"[Lark] 解析引用消息内容失败 id={quoted_message_id}")

    return {
        "id": quoted_message_id,
        "sender_id": quoted_sender_id,
        "time": quoted_time,
        "type": parent_message.msg_type or "",
        "content": content_json,
        "mentions": parent_message.mentions,
    }
```

Then `_build_reply_from_parent_id` becomes glue:

```python
async def _build_reply_from_parent_id(
    self, parent_message_id: str
) -> Comp.Reply | None:
    parent = await self._fetch_parent_message(parent_message_id)
    if parent is None:
        return None

    normalized = self._normalize_parent_message(parent)
    quoted_at_map = self._build_at_map(normalized["mentions"])
    quoted_chain = await self._parse_message_components(
        message_id=normalized["id"],
        message_type=normalized["type"],
        content=normalized["content"],
        at_map=quoted_at_map,
    )
    quoted_text = self._build_message_str_from_components(quoted_chain)
    sender_id = normalized["sender_id"]
    sender_nickname = sender_id[:8] if sender_id != "unknown" else "unknown"

    return Comp.Reply(
        id=normalized["id"],
        chain=quoted_chain,
        sender_id=sender_id,
        sender_nickname=sender_nickname,
        time=normalized["time"],
        message_str=quoted_text,
        text=quoted_text,
    )
```

This reuses your existing helpers, preserves all behaviors (including timestamp normalization and JSON parsing), but makes parent-message handling much easier to test and maintain.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.


return at_map

async def _parse_message_components(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑将新的消息解析和回复构建辅助函数拆分为更小的、按类型划分的函数以及共享工具函数,以避免出现体量过大的“上帝函数”,同时让调用流程更易理解和测试。

新的辅助函数是一个很好的改进,但现在 _parse_message_components_build_reply_from_parent_id 变成了核心的“上帝函数”。你可以在保持现有功能不变的前提下,通过以下方式让代码更易理解:

1. 将 _parse_message_components 拆分为按类型划分的处理函数

目前 _parse_message_components 同时负责所有分支逻辑以及所有 IO。你可以保持对外的公共 API 完全不变,但在内部把处理委托给更小的函数:

async def _parse_message_components(
    self,
    *,
    message_id: str | None,
    message_type: str,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    handlers: dict[str, Callable[..., Awaitable[list[Comp.BaseMessageComponent]]]] = {
        "text": self._parse_text_components,
        "post": self._parse_post_or_image_components,
        "image": self._parse_post_or_image_components,
        "file": self._parse_file_components,
        "audio": self._parse_audio_components,
        "media": self._parse_media_components,
    }

    handler = handlers.get(message_type)
    if handler is None:
        return []

    return await handler(message_id=message_id, content=content, at_map=at_map)

每个处理函数就可以只专注于自己的类型:

async def _parse_text_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    message_str_raw = str(content.get("text", ""))
    at_pattern = r"(@_user_\d+)"
    parts = re.split(at_pattern, message_str_raw)

    components: list[Comp.BaseMessageComponent] = []
    for part in parts:
        segment = part.strip()
        if not segment:
            continue
        if segment in at_map:
            components.append(at_map[segment])
        else:
            components.append(Comp.Plain(segment))
    return components
async def _parse_post_or_image_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    components: list[Comp.BaseMessageComponent] = []

    comp_list = (
        self._parse_post_content(content)
        if content.get("content") is not None  # post
        else [{"tag": "img", "image_key": content.get("image_key")}]
    )

    for comp in comp_list:
        tag = comp.get("tag")
        # reuse your existing tag-handling logic here...
        # (at/text/a) branches stay here
        if tag == "img" or tag == "media":
            media_comp = await self._build_media_component_from_tag(
                message_id=message_id,
                tag=tag,
                comp=comp,
                message_type="post_media" if tag == "media" else "image",
            )
            if media_comp:
                components.append(media_comp)

    return components

这样可以保持现有行为不变,同时降低单个方法的圈复杂度,让每个处理函数都可以单独进行测试。

2. 抽取一个可复用的“基于文件的消息”辅助函数

fileaudiomedia 分支几乎完全一致:校验 message_id/file_key,通过 _download_file_resource_to_temp 下载,然后用对应的组件进行封装。

你可以把这个模式抽取成一个小的辅助函数:

async def _build_file_based_component(
    self,
    *,
    message_id: str | None,
    file_key: str | None,
    message_type: str,
    file_name: str,
    default_suffix: str,
    factory: Callable[[str], Comp.BaseMessageComponent],
) -> Comp.BaseMessageComponent | None:
    if not message_id:
        logger.error(f"[Lark] {message_type} 消息缺少 message_id")
        return None
    if not file_key:
        logger.error(f"[Lark] {message_type} 消息缺少 file_key")
        return None

    file_path = await self._download_file_resource_to_temp(
        message_id=message_id,
        file_key=file_key,
        message_type=message_type,
        file_name=file_name,
        default_suffix=default_suffix,
    )
    if not file_path:
        return None
    return factory(file_path)

这样,每种类型的分支都可以变得非常精简:

async def _parse_file_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_file"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="file",
        file_name=file_name,
        default_suffix=".bin",
        factory=lambda path: Comp.File(name=file_name, file=path),
    )
    return [comp] if comp else []
async def _parse_audio_components(...):
    file_key = str(content.get("file_key", "")).strip()
    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="audio",
        file_name="lark_audio",
        default_suffix=".opus",
        factory=lambda path: Comp.Record(file=path, url=path),
    )
    return [comp] if comp else []
async def _parse_media_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_media.mp4"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="media",
        file_name=file_name,
        default_suffix=".mp4",
        factory=lambda path: Comp.Video(file=path, path=path),
    )
    return [comp] if comp else []

这样可以保留所有 I/O 和命名行为,同时把三个几乎相同的分支统一到一个模式中。

3. 拆分 _build_reply_from_parent_id

_build_reply_from_parent_id 目前同时执行拉取、归一化、解析以及 Comp.Reply 构造。你可以把它拆分成三个各自专注的辅助函数:

async def _fetch_parent_message(
    self, parent_message_id: str
):
    if self.lark_api.im is None:
        logger.error("[Lark] API Client im 模块未初始化")
        return None

    request = GetMessageRequest.builder().message_id(parent_message_id).build()
    response = await self.lark_api.im.v1.message.aget(request)
    if not response.success() or not response.data or not response.data.items:
        logger.error(
            f"[Lark] 获取引用消息失败 id={parent_message_id}, "
            f"code={response.code}, msg={response.msg}"
        )
        return None
    return response.data.items[0]
def _normalize_parent_message(self, parent_message) -> dict[str, Any]:
    quoted_message_id = parent_message.message_id or "unknown"
    quoted_sender_id = (
        parent_message.sender.id
        if parent_message.sender and parent_message.sender.id
        else "unknown"
    )
    quoted_time_raw = parent_message.create_time or 0
    quoted_time = (
        quoted_time_raw // 1000
        if isinstance(quoted_time_raw, int) and quoted_time_raw > 10**11
        else quoted_time_raw
    )

    raw_content = (parent_message.body.content if parent_message.body else "") or ""
    content_json: dict[str, Any] = {}
    if raw_content:
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, dict):
                content_json = parsed
        except json.JSONDecodeError:
            logger.warning(f"[Lark] 解析引用消息内容失败 id={quoted_message_id}")

    return {
        "id": quoted_message_id,
        "sender_id": quoted_sender_id,
        "time": quoted_time,
        "type": parent_message.msg_type or "",
        "content": content_json,
        "mentions": parent_message.mentions,
    }

然后 _build_reply_from_parent_id 就变成了一个“胶水函数”:

async def _build_reply_from_parent_id(
    self, parent_message_id: str
) -> Comp.Reply | None:
    parent = await self._fetch_parent_message(parent_message_id)
    if parent is None:
        return None

    normalized = self._normalize_parent_message(parent)
    quoted_at_map = self._build_at_map(normalized["mentions"])
    quoted_chain = await self._parse_message_components(
        message_id=normalized["id"],
        message_type=normalized["type"],
        content=normalized["content"],
        at_map=quoted_at_map,
    )
    quoted_text = self._build_message_str_from_components(quoted_chain)
    sender_id = normalized["sender_id"]
    sender_nickname = sender_id[:8] if sender_id != "unknown" else "unknown"

    return Comp.Reply(
        id=normalized["id"],
        chain=quoted_chain,
        sender_id=sender_id,
        sender_nickname=sender_nickname,
        time=normalized["time"],
        message_str=quoted_text,
        text=quoted_text,
    )

这样既能复用你已有的辅助函数,又能保留全部行为(包括时间戳归一化和 JSON 解析),同时让引用消息处理逻辑更容易测试和维护。

Original comment in English

issue (complexity): Consider decomposing the new message parsing and reply-building helpers into smaller, type-specific functions and shared utilities to avoid large god functions and make the flow easier to follow and test.

The new helpers are a good step, but _parse_message_components and _build_reply_from_parent_id are now central “god functions”. You can keep all functionality and make the code easier to follow by:

1. Split _parse_message_components into type-specific handlers

Right now _parse_message_components does all branching plus all IO. You can keep the public API exactly as-is, but internally delegate to smaller functions:

async def _parse_message_components(
    self,
    *,
    message_id: str | None,
    message_type: str,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    handlers: dict[str, Callable[..., Awaitable[list[Comp.BaseMessageComponent]]]] = {
        "text": self._parse_text_components,
        "post": self._parse_post_or_image_components,
        "image": self._parse_post_or_image_components,
        "file": self._parse_file_components,
        "audio": self._parse_audio_components,
        "media": self._parse_media_components,
    }

    handler = handlers.get(message_type)
    if handler is None:
        return []

    return await handler(message_id=message_id, content=content, at_map=at_map)

Each handler can then focus only on that type:

async def _parse_text_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    message_str_raw = str(content.get("text", ""))
    at_pattern = r"(@_user_\d+)"
    parts = re.split(at_pattern, message_str_raw)

    components: list[Comp.BaseMessageComponent] = []
    for part in parts:
        segment = part.strip()
        if not segment:
            continue
        if segment in at_map:
            components.append(at_map[segment])
        else:
            components.append(Comp.Plain(segment))
    return components
async def _parse_post_or_image_components(
    self,
    *,
    message_id: str | None,
    content: dict[str, Any],
    at_map: dict[str, Comp.At],
) -> list[Comp.BaseMessageComponent]:
    components: list[Comp.BaseMessageComponent] = []

    comp_list = (
        self._parse_post_content(content)
        if content.get("content") is not None  # post
        else [{"tag": "img", "image_key": content.get("image_key")}]
    )

    for comp in comp_list:
        tag = comp.get("tag")
        # reuse your existing tag-handling logic here...
        # (at/text/a) branches stay here
        if tag == "img" or tag == "media":
            media_comp = await self._build_media_component_from_tag(
                message_id=message_id,
                tag=tag,
                comp=comp,
                message_type="post_media" if tag == "media" else "image",
            )
            if media_comp:
                components.append(media_comp)

    return components

This preserves behavior but reduces cyclomatic complexity in a single method and makes each handler testable on its own.

2. Extract a reusable “file-based message” helper

The file, audio, and media branches are nearly identical: validate message_id/file_key, download via _download_file_resource_to_temp, then wrap in the appropriate component.

You can factor that pattern into a small helper:

async def _build_file_based_component(
    self,
    *,
    message_id: str | None,
    file_key: str | None,
    message_type: str,
    file_name: str,
    default_suffix: str,
    factory: Callable[[str], Comp.BaseMessageComponent],
) -> Comp.BaseMessageComponent | None:
    if not message_id:
        logger.error(f"[Lark] {message_type} 消息缺少 message_id")
        return None
    if not file_key:
        logger.error(f"[Lark] {message_type} 消息缺少 file_key")
        return None

    file_path = await self._download_file_resource_to_temp(
        message_id=message_id,
        file_key=file_key,
        message_type=message_type,
        file_name=file_name,
        default_suffix=default_suffix,
    )
    if not file_path:
        return None
    return factory(file_path)

Then your per-type branches become very small:

async def _parse_file_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_file"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="file",
        file_name=file_name,
        default_suffix=".bin",
        factory=lambda path: Comp.File(name=file_name, file=path),
    )
    return [comp] if comp else []
async def _parse_audio_components(...):
    file_key = str(content.get("file_key", "")).strip()
    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="audio",
        file_name="lark_audio",
        default_suffix=".opus",
        factory=lambda path: Comp.Record(file=path, url=path),
    )
    return [comp] if comp else []
async def _parse_media_components(...):
    file_key = str(content.get("file_key", "")).strip()
    file_name = str(content.get("file_name", "")).strip() or "lark_media.mp4"

    comp = await self._build_file_based_component(
        message_id=message_id,
        file_key=file_key,
        message_type="media",
        file_name=file_name,
        default_suffix=".mp4",
        factory=lambda path: Comp.Video(file=path, path=path),
    )
    return [comp] if comp else []

This keeps all I/O and naming behavior intact while collapsing three near-identical branches into one pattern.

3. Decompose _build_reply_from_parent_id

_build_reply_from_parent_id currently performs fetch, normalization, parsing, and Comp.Reply construction. You can break it down into three focused helpers:

async def _fetch_parent_message(
    self, parent_message_id: str
):
    if self.lark_api.im is None:
        logger.error("[Lark] API Client im 模块未初始化")
        return None

    request = GetMessageRequest.builder().message_id(parent_message_id).build()
    response = await self.lark_api.im.v1.message.aget(request)
    if not response.success() or not response.data or not response.data.items:
        logger.error(
            f"[Lark] 获取引用消息失败 id={parent_message_id}, "
            f"code={response.code}, msg={response.msg}"
        )
        return None
    return response.data.items[0]
def _normalize_parent_message(self, parent_message) -> dict[str, Any]:
    quoted_message_id = parent_message.message_id or "unknown"
    quoted_sender_id = (
        parent_message.sender.id
        if parent_message.sender and parent_message.sender.id
        else "unknown"
    )
    quoted_time_raw = parent_message.create_time or 0
    quoted_time = (
        quoted_time_raw // 1000
        if isinstance(quoted_time_raw, int) and quoted_time_raw > 10**11
        else quoted_time_raw
    )

    raw_content = (parent_message.body.content if parent_message.body else "") or ""
    content_json: dict[str, Any] = {}
    if raw_content:
        try:
            parsed = json.loads(raw_content)
            if isinstance(parsed, dict):
                content_json = parsed
        except json.JSONDecodeError:
            logger.warning(f"[Lark] 解析引用消息内容失败 id={quoted_message_id}")

    return {
        "id": quoted_message_id,
        "sender_id": quoted_sender_id,
        "time": quoted_time,
        "type": parent_message.msg_type or "",
        "content": content_json,
        "mentions": parent_message.mentions,
    }

Then _build_reply_from_parent_id becomes glue:

async def _build_reply_from_parent_id(
    self, parent_message_id: str
) -> Comp.Reply | None:
    parent = await self._fetch_parent_message(parent_message_id)
    if parent is None:
        return None

    normalized = self._normalize_parent_message(parent)
    quoted_at_map = self._build_at_map(normalized["mentions"])
    quoted_chain = await self._parse_message_components(
        message_id=normalized["id"],
        message_type=normalized["type"],
        content=normalized["content"],
        at_map=quoted_at_map,
    )
    quoted_text = self._build_message_str_from_components(quoted_chain)
    sender_id = normalized["sender_id"]
    sender_nickname = sender_id[:8] if sender_id != "unknown" else "unknown"

    return Comp.Reply(
        id=normalized["id"],
        chain=quoted_chain,
        sender_id=sender_id,
        sender_nickname=sender_nickname,
        time=normalized["time"],
        message_str=quoted_text,
        text=quoted_text,
    )

This reuses your existing helpers, preserves all behaviors (including timestamp normalization and JSON parsing), but makes parent-message handling much easier to test and maintain.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Feb 11, 2026
@Soulter Soulter merged commit 80e1231 into master Feb 11, 2026
6 checks passed
@Soulter Soulter deleted the feat/feishu-supports-receive-media-files branch February 11, 2026 05:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant