Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ class ChatProviderTemplate(TypedDict):
"type": "kook",
"enable": False,
"kook_bot_token": "",
"kook_bot_nickname": "",
"kook_reconnect_delay": 1,
"kook_max_reconnect_delay": 60,
"kook_max_retry_delay": 60,
Expand Down Expand Up @@ -809,11 +808,6 @@ class ChatProviderTemplate(TypedDict):
"type": "string",
"hint": "必填项。从 KOOK 开发者平台获取的机器人 Token。",
},
"kook_bot_nickname": {
"description": "Bot Nickname",
"type": "string",
"hint": "可选项。若发送者昵称与此值一致,将忽略该消息以避免广播风暴。",
},
"kook_reconnect_delay": {
"description": "重连延迟",
"type": "int",
Expand Down
217 changes: 131 additions & 86 deletions astrbot/core/platform/sources/kook/kook_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,28 @@
PlatformMetadata,
register_platform_adapter,
)
from astrbot.core.message.components import File, Record, Video
from astrbot.core.platform.astr_message_event import MessageSesion

from .kook_client import KookClient
from .kook_config import KookConfig
from .kook_event import KookEvent
from .kook_types import (
ContainerModule,
FileModule,
HeaderModule,
ImageGroupModule,
KmarkdownElement,
KookCardMessageContainer,
KookChannelType,
KookMessageEventData,
KookMessageType,
KookModuleType,
PlainTextElement,
SectionModule,
)

KOOK_AT_SELECTOR_REGEX = re.compile(r"\(met\)([^()]+)\(met\)")


@register_platform_adapter(
Expand Down Expand Up @@ -57,35 +74,26 @@ def meta(self) -> PlatformMetadata:
name="kook", description="KOOK 适配器", id=self.kook_config.id
)

def _should_ignore_event_by_bot_nickname(self, payload: dict) -> bool:
bot_nickname = self.kook_config.bot_nickname.strip()
if not bot_nickname:
return False

author = payload.get("extra", {}).get("author", {})
if not isinstance(author, dict):
return False

author_nickname = author.get("nickname") or author.get("username") or ""
if not isinstance(author_nickname, str):
author_nickname = str(author_nickname)

return author_nickname.strip().casefold() == bot_nickname.casefold()

async def _on_received(self, data: dict):
logger.debug(f"KOOK 收到数据: {data}")
if "d" in data and data["s"] == 0:
payload = data["d"]
event_type = payload.get("type")
# 支持type=9(文本)和type=10(卡片)
if event_type in (9, 10):
if self._should_ignore_event_by_bot_nickname(payload):
return
try:
abm = await self.convert_message(payload)
await self.handle_msg(abm)
except Exception as e:
logger.error(f"[KOOK] 消息处理异常: {e}")
def _should_ignore_event_by_bot_nickname(self, author_id: str) -> bool:
return self.client.bot_id == author_id

async def _on_received(self, event: KookMessageEventData):
logger.debug(
f'[KOOK] 收到来自"{event.channel_type.name}"渠道的消息, 消息类型为: {event.type.name}({event.type.value})'
)
event_type = event.type
if event_type in (KookMessageType.KMARKDOWN, KookMessageType.CARD):
if self._should_ignore_event_by_bot_nickname(event.author_id):
logger.debug("[KOOK] 收到来自机器人自身的消息, 忽略此消息")
return
try:
abm = await self.convert_message(event)
await self.handle_msg(abm)
except Exception as e:
logger.error(f"[KOOK] 消息处理异常: {e}")
elif event_type == KookMessageType.SYSTEM:
logger.debug(f'[KOOK] 消息为系统通知, 通知类型为: "{event.extra.type}"')
logger.debug(f"[KOOK] 原始消息数据: {event.to_json()}")

async def run(self):
"""主运行循环"""
Expand Down Expand Up @@ -184,18 +192,26 @@ async def _cleanup(self):
logger.info("[KOOK] 资源清理完成")

def _parse_kmarkdown_text_message(
self, data: dict, self_id: str
self, data: KookMessageEventData, self_id: str
) -> tuple[list, str]:
kmarkdown = data.get("extra", {}).get("kmarkdown", {})
content = data.get("content") or ""
raw_content = kmarkdown.get("raw_content") or content
kmarkdown = data.extra.kmarkdown
content = data.content or ""
if kmarkdown is None:
logger.error(
f'[KOOK] 无法转换"{KookMessageType.KMARKDOWN.name}"消息, 消息中找不到kmarkdown字段'
)
logger.error(f"[KOOK] 原始消息内容: {data.to_json()}")
return [], ""

raw_content = kmarkdown.raw_content or content
if not isinstance(content, str):
content = str(content)
if not isinstance(raw_content, str):
raw_content = str(raw_content)

# TODO 后面的pydantic类型替换,以后再来探索吧 :(
mention_name_map: dict[str, str] = {}
mention_part = kmarkdown.get("mention_part", [])
mention_part = kmarkdown.mention_part
if isinstance(mention_part, list):
for item in mention_part:
if not isinstance(item, dict):
Expand All @@ -207,7 +223,7 @@ def _parse_kmarkdown_text_message(

components = []
cursor = 0
for match in re.finditer(r"\(met\)([^()]+)\(met\)", content):
for match in KOOK_AT_SELECTOR_REGEX.finditer(content):
if match.start() > cursor:
plain_text = content[cursor : match.start()]
if plain_text:
Expand Down Expand Up @@ -254,77 +270,109 @@ def _parse_kmarkdown_text_message(

return components, message_str

def _parse_card_message(self, data: dict) -> tuple[list, str]:
content = data.get("content", "[]")
def _parse_card_message(self, data: KookMessageEventData) -> tuple[list, str]:
content = data.content
if not isinstance(content, str):
content = str(content)
card_list = json.loads(content)

card_list = KookCardMessageContainer.from_dict(json.loads(content))

text_parts: list[str] = []
images: list[str] = []
files: list[tuple[KookModuleType, str, str]] = []

for card in card_list:
if not isinstance(card, dict):
continue
for module in card.get("modules", []):
if not isinstance(module, dict):
continue
for module in card.modules:
match module:
case SectionModule():
if content := self._handle_section_text(module):
text_parts.append(content)

module_type = module.get("type")
if module_type == "section":
section_text = module.get("text", {}).get("content", "")
if section_text:
text_parts.append(str(section_text))
continue
case ContainerModule() | ImageGroupModule():
urls = self._handle_image_group(module)
images.extend(urls)
text_parts.append(" [image]" * len(urls))

if module_type != "container":
continue
case HeaderModule():
text_parts.append(module.text.content)

for element in module.get("elements", []):
if not isinstance(element, dict):
continue
if element.get("type") != "image":
continue
case FileModule():
files.append((module.type, module.title, module.src))
text_parts.append(f" [{module.type.value}]")

image_src = element.get("src")
if not isinstance(image_src, str):
logger.warning(
f'[KOOK] 处理卡片中的图片时发生错误,图片url "{image_src}" 应该为str类型, 而不是 "{type(image_src)}" '
)
continue
if not image_src.startswith(("http://", "https://")):
logger.warning(f"[KOOK] 屏蔽非http图片url: {image_src}")
continue
images.append(image_src)
case _:
logger.debug(f"[KOOK] 跳过或未处理模块: {module.type}")

text = "".join(text_parts)
message = []

if text:
for search in KOOK_AT_SELECTOR_REGEX.finditer(text):
search_text = search.group(1).strip()
if search_text == "all":
message.append(AtAll())
continue
message.append(At(qq=search_text))
text = text.replace(f"(met){search_text}(met)", "")

message.append(Plain(text=text))

for img_url in images:
message.append(Image(file=img_url))
for file in files:
file_type = file[0]
file_name = file[1]
file_url = file[2]
if file_type == KookModuleType.FILE:
message.append(File(name=file_name, file=file_url))
elif file_type == KookModuleType.VIDEO:
message.append(Video(file=file_url))
elif file_type == KookModuleType.AUDIO:
message.append(Record(file=file_url))
else:
logger.warning(f"[KOOK] 跳过未知文件类型: {file_type.name}")

return message, text

async def convert_message(self, data: dict) -> AstrBotMessage:
def _handle_section_text(self, module: SectionModule) -> str:
"""专门处理 Section 里的文本提取"""
if isinstance(module.text, (KmarkdownElement, PlainTextElement)):
return module.text.content or ""
return ""

def _handle_image_group(
self, module: ContainerModule | ImageGroupModule
) -> list[str]:
"""专门处理图片组/容器里的合法 URL 提取"""
valid_urls = []
for el in module.elements:
image_src = el.src
if not el.src.startswith(("http://", "https://")):
logger.warning(f"[KOOK] 屏蔽非http图片url: {image_src}")
continue
valid_urls.append(el.src)
return valid_urls

async def convert_message(self, data: KookMessageEventData) -> AstrBotMessage:
abm = AstrBotMessage()
abm.raw_message = data
abm.raw_message = data.to_dict()
abm.self_id = self.client.bot_id

channel_type = data.get("channel_type")
author_id = data.get("author_id", "unknown")
channel_type = data.channel_type
author_id = data.author_id
# channel_type定义: https://developer.kookapp.cn/doc/event/event-introduction
match channel_type:
case "GROUP":
session_id = data.get("target_id") or "unknown"
case KookChannelType.GROUP:
session_id = data.target_id or "unknown"
abm.type = MessageType.GROUP_MESSAGE
abm.group_id = session_id
abm.session_id = session_id
case "PERSON":
case KookChannelType.PERSON:
abm.type = MessageType.FRIEND_MESSAGE
abm.group_id = ""
abm.session_id = data.get("author_id", "unknown")
case "BROADCAST":
session_id = data.get("target_id") or "unknown"
abm.session_id = data.author_id or "unknown"
case KookChannelType.BROADCAST:
session_id = data.target_id or "unknown"
abm.type = MessageType.OTHER_MESSAGE
abm.group_id = session_id
abm.session_id = session_id
Expand All @@ -333,28 +381,25 @@ async def convert_message(self, data: dict) -> AstrBotMessage:

abm.sender = MessageMember(
user_id=author_id,
nickname=data.get("extra", {}).get("author", {}).get("username", ""),
nickname=data.extra.author.username if data.extra.author else "unknown",
)

abm.message_id = data.get("msg_id", "unknown")
abm.message_id = data.msg_id or "unknown"

# 普通文本消息
if data.get("type") == 9:
message, message_str = self._parse_kmarkdown_text_message(
data, str(abm.self_id)
)
if data.type == KookMessageType.KMARKDOWN:
message, message_str = self._parse_kmarkdown_text_message(data, abm.self_id)
abm.message = message
abm.message_str = message_str
# 卡片消息
elif data.get("type") == 10:
elif data.type == KookMessageType.CARD:
try:
abm.message, abm.message_str = self._parse_card_message(data)
except Exception as exp:
logger.error(f"[KOOK] 卡片消息解析失败: {exp}")
logger.error(f"[KOOK] 原始消息内容: {data.to_json()}")
abm.message_str = "[卡片消息解析失败]"
abm.message = [Plain(text="[卡片消息解析失败]")]
else:
logger.warning(f'[KOOK] 不支持的kook消息类型: "{data.get("type")}"')
logger.warning(f'[KOOK] 不支持的kook消息类型: "{data.type.name}"')
abm.message_str = "[不支持的消息类型]"
abm.message = [Plain(text="[不支持的消息类型]")]

Expand Down
Loading