diff --git a/.env.example b/.env.example index b61f26f..92e9648 100644 --- a/.env.example +++ b/.env.example @@ -9,5 +9,12 @@ FEISHU_APP_SECRET= BINDING_DB_PATH=data/stickerhub.db BIND_MAGIC_TTL_SECONDS=600 +# 飞书 Webhook 域名白名单(JSON 格式的字符串数组) +# 不设置:使用默认白名单 ["open.feishu.cn","open.larksuite.com"] +# 设为 []:禁用白名单校验(允许任意域名,请谨慎使用) +# 设为自定义列表:如 ["open.feishu.cn","custom.domain.com"] +# FEISHU_WEBHOOK_ALLOWED_HOSTS=["open.feishu.cn","open.larksuite.com"] +FEISHU_WEBHOOK_ALLOWED_HOSTS= + # 日志级别(DEBUG / INFO / WARNING / ERROR) LOG_LEVEL=INFO diff --git a/README.md b/README.md index ebc3211..7b3ea44 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,13 @@ FEISHU_APP_SECRET= BINDING_DB_PATH=data/stickerhub.db BIND_MAGIC_TTL_SECONDS=600 + +# 飞书 Webhook 域名白名单(JSON 格式的字符串数组) +# 不设置:使用默认白名单 ["open.feishu.cn","open.larksuite.com"] +# 设为 []:禁用白名单校验(允许任意域名,请谨慎使用) +# 设为自定义列表:如 ["open.feishu.cn","custom.domain.com"] +FEISHU_WEBHOOK_ALLOWED_HOSTS= + LOG_LEVEL=INFO ``` @@ -62,6 +69,10 @@ LOG_LEVEL=INFO - `TELEGRAM_BOT_API_TOKEN`:必填,Telegram Bot API Token - `FEISHU_APP_ID` / `FEISHU_APP_SECRET`:可选,填写后启用飞书转发和 `/bind` 功能(包括 webhook 绑定所需的图片上传能力) +- `FEISHU_WEBHOOK_ALLOWED_HOSTS`:飞书 Webhook 域名白名单(JSON 格式,防止 SSRF 攻击) + - 不设置:使用默认白名单 `["open.feishu.cn", "open.larksuite.com"]` + - 设为 `[]`:禁用白名单校验(允许任意域名,**请谨慎使用**) + - 设为自定义列表:如 `["open.feishu.cn", "custom.domain.com"]` - 飞书需在应用后台开启机器人收发消息权限(im:message)以及获取与上传图片或文件资源权限(im:resource) ![lark_permission.png](docs/lark_permission.png) - 飞书事件添加接收消息(im.message.receive_v1)并启用长连接事件能力。 ![lark_event.png](docs/lark_event.png) diff --git a/src/stickerhub/adapters/feishu_sender.py b/src/stickerhub/adapters/feishu_sender.py index e2de91b..e672a5c 100644 --- a/src/stickerhub/adapters/feishu_sender.py +++ b/src/stickerhub/adapters/feishu_sender.py @@ -1,9 +1,11 @@ import json import logging +from typing import Literal import httpx from stickerhub.core.models import StickerAsset +from stickerhub.utils.url_masking import mask_url logger = logging.getLogger(__name__) @@ -18,11 +20,15 @@ def __init__( self._app_secret = app_secret self._base_url = "https://open.feishu.cn/open-apis" - async def send(self, asset: StickerAsset, target_mode: str, target: str) -> None: + async def send( + self, asset: StickerAsset, target_mode: Literal["bot", "webhook"], target: str + ) -> None: + # 避免在日志中暴露 webhook URL 中的敏感 token + safe_target = target if target_mode == "bot" else mask_url(target) logger.debug( "准备发送图片到飞书: mode=%s target=%s file=%s mime=%s size=%s", target_mode, - target, + safe_target, asset.file_name, asset.mime_type, len(asset.content), diff --git a/src/stickerhub/adapters/telegram_source.py b/src/stickerhub/adapters/telegram_source.py index b2ef8c0..5b8a107 100644 --- a/src/stickerhub/adapters/telegram_source.py +++ b/src/stickerhub/adapters/telegram_source.py @@ -122,6 +122,8 @@ async def handle_bind(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non if not update.message or not update.effective_user: return + _cleanup_pending_webhook_requests(pending_webhook_requests) + try: arg = context.args[0] if context.args else None telegram_user_id = str(update.effective_user.id) @@ -150,6 +152,8 @@ async def handle_bind_mode_callback(update: Update, context: ContextTypes.DEFAUL return await query.answer() + _cleanup_pending_webhook_requests(pending_webhook_requests) + data = query.data or "" parsed = _parse_bind_mode_callback_data(data) if not parsed: @@ -194,6 +198,7 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> return _cleanup_pending_requests(pending_pack_requests) + _cleanup_pending_webhook_requests(pending_webhook_requests) try: asset = await _extract_asset(update.message, context) diff --git a/src/stickerhub/config.py b/src/stickerhub/config.py index 890cc49..30d2a54 100644 --- a/src/stickerhub/config.py +++ b/src/stickerhub/config.py @@ -21,6 +21,16 @@ class Settings(BaseSettings): validation_alias=AliasChoices("BINDING_DB_PATH", "BINDING_STORE_PATH"), ) bind_magic_ttl_seconds: int = Field(default=600, alias="BIND_MAGIC_TTL_SECONDS") + feishu_webhook_allowed_hosts: list[str] | None = Field( + default=None, + alias="FEISHU_WEBHOOK_ALLOWED_HOSTS", + description=( + "飞书 Webhook 域名白名单(JSON 格式,如 " + '["open.feishu.cn","open.larksuite.com"])。' + "设为 null 或空列表 [] 禁用白名单校验。" + "不设置时使用默认白名单。" + ), + ) log_level: str = Field(default="INFO", alias="LOG_LEVEL") model_config = SettingsConfigDict( @@ -29,3 +39,14 @@ class Settings(BaseSettings): case_sensitive=False, extra="ignore", ) + + def get_webhook_allowed_hosts(self) -> list[str] | None: + """ + 获取 webhook 域名白名单。 + - None: 使用默认白名单 ["open.feishu.cn", "open.larksuite.com"] + - []: 禁用白名单校验 + - [...]: 使用自定义白名单 + """ + if self.feishu_webhook_allowed_hosts is None: + return ["open.feishu.cn", "open.larksuite.com"] + return self.feishu_webhook_allowed_hosts diff --git a/src/stickerhub/core/ports.py b/src/stickerhub/core/ports.py index b5b91e6..bad3031 100644 --- a/src/stickerhub/core/ports.py +++ b/src/stickerhub/core/ports.py @@ -1,4 +1,4 @@ -from typing import Protocol +from typing import Literal, Protocol from stickerhub.core.models import StickerAsset @@ -9,5 +9,7 @@ async def normalize(self, asset: StickerAsset) -> StickerAsset: class TargetPlatformSender(Protocol): - async def send(self, asset: StickerAsset, target_mode: str, target: str) -> None: + async def send( + self, asset: StickerAsset, target_mode: Literal["bot", "webhook"], target: str + ) -> None: """将素材发送到目标平台。""" diff --git a/src/stickerhub/main.py b/src/stickerhub/main.py index ed1a0eb..a77f5a4 100644 --- a/src/stickerhub/main.py +++ b/src/stickerhub/main.py @@ -27,6 +27,7 @@ async def async_main() -> None: binding_service = BindingService( store=BindingStore(settings.binding_db_path), magic_ttl_seconds=settings.bind_magic_ttl_seconds, + webhook_allowed_hosts=settings.get_webhook_allowed_hosts(), ) await binding_service.initialize() diff --git a/src/stickerhub/services/binding.py b/src/stickerhub/services/binding.py index d1b376f..90946e3 100644 --- a/src/stickerhub/services/binding.py +++ b/src/stickerhub/services/binding.py @@ -9,6 +9,8 @@ from typing import Literal from urllib.parse import urlparse +from stickerhub.utils.url_masking import mask_url + logger = logging.getLogger(__name__) @@ -344,9 +346,16 @@ def _connect(self) -> sqlite3.Connection: class BindingService: - def __init__(self, store: BindingStore, magic_ttl_seconds: int = 600) -> None: + def __init__( + self, + store: BindingStore, + magic_ttl_seconds: int = 600, + webhook_allowed_hosts: list[str] | None = None, + ) -> None: self._store = store self._magic_ttl_seconds = magic_ttl_seconds + # None 表示使用默认白名单,[] 表示禁用白名单,其他表示自定义白名单 + self._webhook_allowed_hosts = webhook_allowed_hosts async def initialize(self) -> None: await self._store.ensure_initialized() @@ -407,15 +416,19 @@ async def handle_bind_webhook( source_user_id: str, webhook_url: str, ) -> str: - normalized_url = _normalize_feishu_webhook_url(webhook_url) + normalized_url = _normalize_feishu_webhook_url(webhook_url, self._webhook_allowed_hosts) if not normalized_url: + # 脱敏 URL 用于日志 + masked_url = mask_url(webhook_url) logger.warning( - "Webhook 绑定失败: 平台=%s user=%s 原因=URL格式不合法", + "Webhook 绑定失败: 平台=%s user=%s 原因=URL格式不合法或域名不在白名单内 url=%s", source_platform, source_user_id, + masked_url, ) + # 不在用户消息中暴露完整白名单,避免为攻击者提供信息 return ( - "绑定失败: Webhook 地址格式不合法。\n" + "绑定失败: Webhook 地址格式不合法或域名不在白名单内。\n" "请填写飞书自定义机器人 Webhook 地址,例如:\n" "https://open.feishu.cn/open-apis/bot/v2/hook/xxxx" ) @@ -426,6 +439,10 @@ async def handle_bind_webhook( await self._store.bind_platform(source_platform, source_user_id, hub_id) details = await self._store.bind_feishu_webhook(hub_id, normalized_url) + # 脱敏 previous_webhook 避免泄露旧凭据 + previous_webhook_masked = ( + mask_url(details["previous_webhook"]) if details.get("previous_webhook") else None + ) logger.info( ( "Webhook 绑定成功: source_platform=%s source_user=%s " @@ -433,7 +450,7 @@ async def handle_bind_webhook( ), source_platform, source_user_id, - details.get("previous_webhook"), + previous_webhook_masked, details.get("replaced_user_id"), ) return "绑定成功,已切换为飞书 Webhook 转发模式" @@ -488,7 +505,20 @@ async def get_feishu_target( return None -def _normalize_feishu_webhook_url(url: str) -> str | None: +def _normalize_feishu_webhook_url(url: str, allowed_hosts: list[str] | None) -> str | None: + """ + 验证并归一化飞书 Webhook URL。 + - 必须是 https 协议 + - 域名必须在白名单内(防止 SSRF),除非白名单为空列表(禁用白名单) + - 路径必须包含 /open-apis/bot/v2/hook/ + + Args: + url: 待验证的 webhook URL + allowed_hosts: 域名白名单。 + - None: 使用默认白名单 ["open.feishu.cn", "open.larksuite.com"] + - []: 禁用白名单校验(允许任意域名) + - [...]: 使用指定的自定义白名单 + """ normalized = url.strip() if not normalized: return None @@ -496,8 +526,18 @@ def _normalize_feishu_webhook_url(url: str) -> str | None: parsed = urlparse(normalized) if parsed.scheme.lower() != "https": return None - if not parsed.netloc: + + # 必须有合法主机名 + if not parsed.hostname: return None + + # 域名白名单校验(SSRF 防护)——仅基于 hostname,不限制端口 + # allowed_hosts 为空列表或 None 时的处理在外层逻辑中已完成 + if allowed_hosts: + hostname = parsed.hostname.lower() + if hostname not in [host.lower() for host in allowed_hosts]: + return None + if "/open-apis/bot/v2/hook/" not in parsed.path: return None return normalized diff --git a/src/stickerhub/utils/url_masking.py b/src/stickerhub/utils/url_masking.py new file mode 100644 index 0000000..4f14cb5 --- /dev/null +++ b/src/stickerhub/utils/url_masking.py @@ -0,0 +1,36 @@ +"""Shared constants and utilities for URL masking to prevent credential leakage in logs.""" + +from urllib.parse import urlparse + +# URL masking constants +PATH_PREFIX_LENGTH = 20 +PATH_SUFFIX_LENGTH = 8 +PATH_MASK_THRESHOLD = PATH_PREFIX_LENGTH + PATH_SUFFIX_LENGTH + + +def mask_url(url: str) -> str: + """ + 脱敏 URL 用于日志输出,避免泄露敏感 token。 + + 仅保留协议、域名(hostname)、路径前 20 个字符和后 8 个字符,中间用 ... 替代。 + 不包含 userinfo、端口、query、fragment 等敏感信息。 + """ + try: + parsed = urlparse(url) + + # 仅在解析结果有明确的协议和主机名时才返回拼接后的 URL + if not parsed.scheme or not parsed.hostname: + return "[url_masked]" + + if parsed.path and len(parsed.path) > PATH_MASK_THRESHOLD: + masked_path = ( + f"{parsed.path[:PATH_PREFIX_LENGTH]}...{parsed.path[-PATH_SUFFIX_LENGTH:]}" + ) + else: + masked_path = parsed.path + + # 仅使用 hostname,避免将 userinfo、端口等敏感信息写入日志 + return f"{parsed.scheme}://{parsed.hostname}{masked_path}" + except (ValueError, TypeError, AttributeError): + # urlparse 可能抛出 ValueError,或传入 None 导致 TypeError/AttributeError + return "[url_masked]" diff --git a/tests/test_binding_sqlite.py b/tests/test_binding_sqlite.py index 9e88bb0..619d43c 100644 --- a/tests/test_binding_sqlite.py +++ b/tests/test_binding_sqlite.py @@ -137,6 +137,54 @@ async def _bind_webhook_invalid_url(db_path: str) -> None: assert "格式不合法" in reply +async def _bind_webhook_domain_whitelist(db_path: str) -> None: + """测试域名白名单校验(SSRF 防护)""" + store = BindingStore(db_path) + # 自定义白名单,仅允许 open.feishu.cn + service = BindingService( + store=store, magic_ttl_seconds=600, webhook_allowed_hosts=["open.feishu.cn"] + ) + await service.initialize() + + # 合法域名应通过(不限制端口) + valid_url = "https://open.feishu.cn/open-apis/bot/v2/hook/valid_token" + reply = await service.handle_bind_webhook("telegram", "tg_whitelist_ok", valid_url) + assert "绑定成功" in reply + + # 合法域名 + 自定义端口也应通过 + valid_url_with_port = "https://open.feishu.cn:8443/open-apis/bot/v2/hook/token_with_port" + reply = await service.handle_bind_webhook("telegram", "tg_whitelist_port", valid_url_with_port) + assert "绑定成功" in reply + + # 不在白名单的域名应被拒绝(防止 SSRF) + blocked_url = "https://evil.com/open-apis/bot/v2/hook/malicious" + reply = await service.handle_bind_webhook("telegram", "tg_whitelist_block", blocked_url) + assert "白名单" in reply + + # open.larksuite.com 不在自定义白名单中,应被拒绝 + larksuite_url = "https://open.larksuite.com/open-apis/bot/v2/hook/token" + reply = await service.handle_bind_webhook("telegram", "tg_whitelist_lark", larksuite_url) + assert "白名单" in reply + + +async def _bind_webhook_whitelist_disabled(db_path: str) -> None: + """测试禁用白名单校验""" + store = BindingStore(db_path) + # 空列表表示禁用白名单 + service = BindingService(store=store, magic_ttl_seconds=600, webhook_allowed_hosts=[]) + await service.initialize() + + # 任意域名都应通过(白名单已禁用) + custom_url = "https://custom.domain.com/open-apis/bot/v2/hook/custom_token" + reply = await service.handle_bind_webhook("telegram", "tg_no_whitelist", custom_url) + assert "绑定成功" in reply + + # 即使是非常规域名也应通过 + another_url = "https://example.org/open-apis/bot/v2/hook/another_token" + reply = await service.handle_bind_webhook("telegram", "tg_no_whitelist2", another_url) + assert "绑定成功" in reply + + def test_bind_flow_with_sqlite(tmp_path) -> None: db_path = tmp_path / "binding.db" asyncio.run(_bind_flow(str(db_path))) @@ -175,3 +223,13 @@ def test_switch_from_webhook_to_bot(tmp_path) -> None: def test_bind_webhook_invalid_url(tmp_path) -> None: db_path = tmp_path / "binding.db" asyncio.run(_bind_webhook_invalid_url(str(db_path))) + + +def test_bind_webhook_domain_whitelist(tmp_path) -> None: + db_path = tmp_path / "binding.db" + asyncio.run(_bind_webhook_domain_whitelist(str(db_path))) + + +def test_bind_webhook_whitelist_disabled(tmp_path) -> None: + db_path = tmp_path / "binding.db" + asyncio.run(_bind_webhook_whitelist_disabled(str(db_path))) diff --git a/tests/test_telegram_source.py b/tests/test_telegram_source.py index 7168071..5bd1af4 100644 --- a/tests/test_telegram_source.py +++ b/tests/test_telegram_source.py @@ -6,8 +6,10 @@ BIND_MODE_CALLBACK_PREFIX, PACK_CALLBACK_PREFIX, PendingStickerPackRequest, + PendingWebhookBindRequest, RunningStickerPackTask, _cleanup_pending_requests, + _cleanup_pending_webhook_requests, _deduplicate_filename, _detect_sticker_mime, _has_running_task_for_user, @@ -176,3 +178,20 @@ async def reply_document(self, *args: object, **kwargs: object) -> None: assert message.document_called is True assert message.animation_called is False + + +def test_cleanup_pending_webhook_requests_removes_expired_only() -> None: + now = int(time.time()) + pending = { + "expired_user": PendingWebhookBindRequest( + telegram_user_id="expired_user", + created_at=now - 3600, + ), + "fresh_user": PendingWebhookBindRequest( + telegram_user_id="fresh_user", + created_at=now, + ), + } + _cleanup_pending_webhook_requests(pending) + assert "expired_user" not in pending + assert "fresh_user" in pending