Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,12 @@ FEISHU_APP_SECRET=
BINDING_DB_PATH=data/stickerhub.db
BIND_MAGIC_TTL_SECONDS=600

# 飞书 Webhook 域名白名单(JSON 格式的字符串数组)
# 不设置:使用默认白名单 ["open.feishu.cn","open.larksuite.com"]
# 设为 []:禁用白名单校验(允许任意域名,请谨慎使用)
# 设为自定义列表:如 ["open.feishu.cn","custom.domain.com"]
# FEISHU_WEBHOOK_ALLOWED_HOSTS=["open.feishu.cn","open.larksuite.com"]
FEISHU_WEBHOOK_ALLOWED_HOSTS=

# 日志级别(DEBUG / INFO / WARNING / ERROR)
LOG_LEVEL=INFO
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@ FEISHU_APP_SECRET=

BINDING_DB_PATH=data/stickerhub.db
BIND_MAGIC_TTL_SECONDS=600

# 飞书 Webhook 域名白名单(JSON 格式的字符串数组)
# 不设置:使用默认白名单 ["open.feishu.cn","open.larksuite.com"]
# 设为 []:禁用白名单校验(允许任意域名,请谨慎使用)
# 设为自定义列表:如 ["open.feishu.cn","custom.domain.com"]
FEISHU_WEBHOOK_ALLOWED_HOSTS=

LOG_LEVEL=INFO
```

说明:

- `TELEGRAM_BOT_API_TOKEN`:必填,Telegram Bot API Token
- `FEISHU_APP_ID` / `FEISHU_APP_SECRET`:可选,填写后启用飞书转发和 `/bind` 功能(包括 webhook 绑定所需的图片上传能力)
- `FEISHU_WEBHOOK_ALLOWED_HOSTS`:飞书 Webhook 域名白名单(JSON 格式,防止 SSRF 攻击)
- 不设置:使用默认白名单 `["open.feishu.cn", "open.larksuite.com"]`
- 设为 `[]`:禁用白名单校验(允许任意域名,**请谨慎使用**)
- 设为自定义列表:如 `["open.feishu.cn", "custom.domain.com"]`
- 飞书需在应用后台开启机器人收发消息权限(im:message)以及获取与上传图片或文件资源权限(im:resource) ![lark_permission.png](docs/lark_permission.png)
- 飞书事件添加接收消息(im.message.receive_v1)并启用长连接事件能力。 ![lark_event.png](docs/lark_event.png)

Expand Down
10 changes: 8 additions & 2 deletions src/stickerhub/adapters/feishu_sender.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import logging
from typing import Literal

import httpx

from stickerhub.core.models import StickerAsset
from stickerhub.utils.url_masking import mask_url

logger = logging.getLogger(__name__)

Expand All @@ -18,11 +20,15 @@ def __init__(
self._app_secret = app_secret
self._base_url = "https://open.feishu.cn/open-apis"

async def send(self, asset: StickerAsset, target_mode: str, target: str) -> None:
async def send(
self, asset: StickerAsset, target_mode: Literal["bot", "webhook"], target: str
) -> None:
# 避免在日志中暴露 webhook URL 中的敏感 token
safe_target = target if target_mode == "bot" else mask_url(target)
logger.debug(
"准备发送图片到飞书: mode=%s target=%s file=%s mime=%s size=%s",
target_mode,
target,
safe_target,
asset.file_name,
asset.mime_type,
len(asset.content),
Expand Down
5 changes: 5 additions & 0 deletions src/stickerhub/adapters/telegram_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ async def handle_bind(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non
if not update.message or not update.effective_user:
return

_cleanup_pending_webhook_requests(pending_webhook_requests)

try:
arg = context.args[0] if context.args else None
telegram_user_id = str(update.effective_user.id)
Expand Down Expand Up @@ -150,6 +152,8 @@ async def handle_bind_mode_callback(update: Update, context: ContextTypes.DEFAUL
return
await query.answer()

_cleanup_pending_webhook_requests(pending_webhook_requests)

data = query.data or ""
parsed = _parse_bind_mode_callback_data(data)
if not parsed:
Expand Down Expand Up @@ -194,6 +198,7 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
return

_cleanup_pending_requests(pending_pack_requests)
_cleanup_pending_webhook_requests(pending_webhook_requests)

try:
asset = await _extract_asset(update.message, context)
Expand Down
21 changes: 21 additions & 0 deletions src/stickerhub/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ class Settings(BaseSettings):
validation_alias=AliasChoices("BINDING_DB_PATH", "BINDING_STORE_PATH"),
)
bind_magic_ttl_seconds: int = Field(default=600, alias="BIND_MAGIC_TTL_SECONDS")
feishu_webhook_allowed_hosts: list[str] | None = Field(
default=None,
alias="FEISHU_WEBHOOK_ALLOWED_HOSTS",
description=(
"飞书 Webhook 域名白名单(JSON 格式,如 "
'["open.feishu.cn","open.larksuite.com"])。'
"设为 null 或空列表 [] 禁用白名单校验。"
"不设置时使用默认白名单。"
),
)
log_level: str = Field(default="INFO", alias="LOG_LEVEL")

model_config = SettingsConfigDict(
Expand All @@ -29,3 +39,14 @@ class Settings(BaseSettings):
case_sensitive=False,
extra="ignore",
)

def get_webhook_allowed_hosts(self) -> list[str] | None:
"""
获取 webhook 域名白名单。
- None: 使用默认白名单 ["open.feishu.cn", "open.larksuite.com"]
- []: 禁用白名单校验
- [...]: 使用自定义白名单
"""
if self.feishu_webhook_allowed_hosts is None:
return ["open.feishu.cn", "open.larksuite.com"]
return self.feishu_webhook_allowed_hosts
6 changes: 4 additions & 2 deletions src/stickerhub/core/ports.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Protocol
from typing import Literal, Protocol

from stickerhub.core.models import StickerAsset

Expand All @@ -9,5 +9,7 @@ async def normalize(self, asset: StickerAsset) -> StickerAsset:


class TargetPlatformSender(Protocol):
async def send(self, asset: StickerAsset, target_mode: str, target: str) -> None:
async def send(
self, asset: StickerAsset, target_mode: Literal["bot", "webhook"], target: str
) -> None:
"""将素材发送到目标平台。"""
1 change: 1 addition & 0 deletions src/stickerhub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def async_main() -> None:
binding_service = BindingService(
store=BindingStore(settings.binding_db_path),
magic_ttl_seconds=settings.bind_magic_ttl_seconds,
webhook_allowed_hosts=settings.get_webhook_allowed_hosts(),
)
await binding_service.initialize()

Expand Down
54 changes: 47 additions & 7 deletions src/stickerhub/services/binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from typing import Literal
from urllib.parse import urlparse

from stickerhub.utils.url_masking import mask_url

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -344,9 +346,16 @@ def _connect(self) -> sqlite3.Connection:


class BindingService:
def __init__(self, store: BindingStore, magic_ttl_seconds: int = 600) -> None:
def __init__(
self,
store: BindingStore,
magic_ttl_seconds: int = 600,
webhook_allowed_hosts: list[str] | None = None,
) -> None:
self._store = store
self._magic_ttl_seconds = magic_ttl_seconds
# None 表示使用默认白名单,[] 表示禁用白名单,其他表示自定义白名单
self._webhook_allowed_hosts = webhook_allowed_hosts

async def initialize(self) -> None:
await self._store.ensure_initialized()
Expand Down Expand Up @@ -407,15 +416,19 @@ async def handle_bind_webhook(
source_user_id: str,
webhook_url: str,
) -> str:
normalized_url = _normalize_feishu_webhook_url(webhook_url)
normalized_url = _normalize_feishu_webhook_url(webhook_url, self._webhook_allowed_hosts)
if not normalized_url:
# 脱敏 URL 用于日志
masked_url = mask_url(webhook_url)
logger.warning(
"Webhook 绑定失败: 平台=%s user=%s 原因=URL格式不合法",
"Webhook 绑定失败: 平台=%s user=%s 原因=URL格式不合法或域名不在白名单内 url=%s",
source_platform,
source_user_id,
masked_url,
Comment on lines +419 to +427
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_bind_webhook() 虽然在失败日志里对输入 URL 做了脱敏,但后续成功日志仍会输出 details.get("previous_webhook")(旧 webhook URL),这通常包含敏感 token,依然会造成凭据泄露。建议对 previous_webhook 同样使用 mask_url() 后再记录,或只记录是否存在/哈希摘要而不记录 URL 本身。

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已修复,commit 7851551

现在成功日志中的 previous_webhook 会先使用 mask_url() 脱敏后再记录,防止旧凭据泄露:

previous_webhook_masked = (
    mask_url(details["previous_webhook"])
    if details.get("previous_webhook")
    else None
)
logger.info("... previous_webhook=%s ...", previous_webhook_masked)

)
# 不在用户消息中暴露完整白名单,避免为攻击者提供信息
return (
"绑定失败: Webhook 地址格式不合法。\n"
"绑定失败: Webhook 地址格式不合法或域名不在白名单内。\n"
"请填写飞书自定义机器人 Webhook 地址,例如:\n"
"https://open.feishu.cn/open-apis/bot/v2/hook/xxxx"
)
Expand All @@ -426,14 +439,18 @@ async def handle_bind_webhook(
await self._store.bind_platform(source_platform, source_user_id, hub_id)

details = await self._store.bind_feishu_webhook(hub_id, normalized_url)
# 脱敏 previous_webhook 避免泄露旧凭据
previous_webhook_masked = (
mask_url(details["previous_webhook"]) if details.get("previous_webhook") else None
)
logger.info(
(
"Webhook 绑定成功: source_platform=%s source_user=%s "
"previous_webhook=%s replaced_user=%s"
),
source_platform,
source_user_id,
details.get("previous_webhook"),
previous_webhook_masked,
details.get("replaced_user_id"),
)
return "绑定成功,已切换为飞书 Webhook 转发模式"
Expand Down Expand Up @@ -488,16 +505,39 @@ async def get_feishu_target(
return None


def _normalize_feishu_webhook_url(url: str) -> str | None:
def _normalize_feishu_webhook_url(url: str, allowed_hosts: list[str] | None) -> str | None:
"""
验证并归一化飞书 Webhook URL。
- 必须是 https 协议
- 域名必须在白名单内(防止 SSRF),除非白名单为空列表(禁用白名单)
- 路径必须包含 /open-apis/bot/v2/hook/

Args:
url: 待验证的 webhook URL
allowed_hosts: 域名白名单。
- None: 使用默认白名单 ["open.feishu.cn", "open.larksuite.com"]
- []: 禁用白名单校验(允许任意域名)
- [...]: 使用指定的自定义白名单
"""
normalized = url.strip()
if not normalized:
return None

parsed = urlparse(normalized)
if parsed.scheme.lower() != "https":
return None
if not parsed.netloc:

# 必须有合法主机名
if not parsed.hostname:
return None

# 域名白名单校验(SSRF 防护)——仅基于 hostname,不限制端口
# allowed_hosts 为空列表或 None 时的处理在外层逻辑中已完成
if allowed_hosts:
hostname = parsed.hostname.lower()
if hostname not in [host.lower() for host in allowed_hosts]:
return None

if "/open-apis/bot/v2/hook/" not in parsed.path:
return None
return normalized
36 changes: 36 additions & 0 deletions src/stickerhub/utils/url_masking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Shared constants and utilities for URL masking to prevent credential leakage in logs."""

from urllib.parse import urlparse

# URL masking constants
PATH_PREFIX_LENGTH = 20
PATH_SUFFIX_LENGTH = 8
PATH_MASK_THRESHOLD = PATH_PREFIX_LENGTH + PATH_SUFFIX_LENGTH


def mask_url(url: str) -> str:
"""
脱敏 URL 用于日志输出,避免泄露敏感 token。

仅保留协议、域名(hostname)、路径前 20 个字符和后 8 个字符,中间用 ... 替代。
不包含 userinfo、端口、query、fragment 等敏感信息。
"""
try:
parsed = urlparse(url)

# 仅在解析结果有明确的协议和主机名时才返回拼接后的 URL
if not parsed.scheme or not parsed.hostname:
return "[url_masked]"

if parsed.path and len(parsed.path) > PATH_MASK_THRESHOLD:
masked_path = (
f"{parsed.path[:PATH_PREFIX_LENGTH]}...{parsed.path[-PATH_SUFFIX_LENGTH:]}"
)
else:
masked_path = parsed.path

# 仅使用 hostname,避免将 userinfo、端口等敏感信息写入日志
return f"{parsed.scheme}://{parsed.hostname}{masked_path}"
except (ValueError, TypeError, AttributeError):
# urlparse 可能抛出 ValueError,或传入 None 导致 TypeError/AttributeError
return "[url_masked]"
58 changes: 58 additions & 0 deletions tests/test_binding_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,54 @@ async def _bind_webhook_invalid_url(db_path: str) -> None:
assert "格式不合法" in reply


async def _bind_webhook_domain_whitelist(db_path: str) -> None:
"""测试域名白名单校验(SSRF 防护)"""
store = BindingStore(db_path)
# 自定义白名单,仅允许 open.feishu.cn
service = BindingService(
store=store, magic_ttl_seconds=600, webhook_allowed_hosts=["open.feishu.cn"]
)
await service.initialize()

# 合法域名应通过(不限制端口)
valid_url = "https://open.feishu.cn/open-apis/bot/v2/hook/valid_token"
reply = await service.handle_bind_webhook("telegram", "tg_whitelist_ok", valid_url)
assert "绑定成功" in reply

# 合法域名 + 自定义端口也应通过
valid_url_with_port = "https://open.feishu.cn:8443/open-apis/bot/v2/hook/token_with_port"
reply = await service.handle_bind_webhook("telegram", "tg_whitelist_port", valid_url_with_port)
assert "绑定成功" in reply

# 不在白名单的域名应被拒绝(防止 SSRF)
blocked_url = "https://evil.com/open-apis/bot/v2/hook/malicious"
reply = await service.handle_bind_webhook("telegram", "tg_whitelist_block", blocked_url)
assert "白名单" in reply

# open.larksuite.com 不在自定义白名单中,应被拒绝
larksuite_url = "https://open.larksuite.com/open-apis/bot/v2/hook/token"
reply = await service.handle_bind_webhook("telegram", "tg_whitelist_lark", larksuite_url)
assert "白名单" in reply


async def _bind_webhook_whitelist_disabled(db_path: str) -> None:
"""测试禁用白名单校验"""
store = BindingStore(db_path)
# 空列表表示禁用白名单
service = BindingService(store=store, magic_ttl_seconds=600, webhook_allowed_hosts=[])
await service.initialize()

# 任意域名都应通过(白名单已禁用)
custom_url = "https://custom.domain.com/open-apis/bot/v2/hook/custom_token"
reply = await service.handle_bind_webhook("telegram", "tg_no_whitelist", custom_url)
assert "绑定成功" in reply

# 即使是非常规域名也应通过
another_url = "https://example.org/open-apis/bot/v2/hook/another_token"
reply = await service.handle_bind_webhook("telegram", "tg_no_whitelist2", another_url)
assert "绑定成功" in reply


def test_bind_flow_with_sqlite(tmp_path) -> None:
db_path = tmp_path / "binding.db"
asyncio.run(_bind_flow(str(db_path)))
Expand Down Expand Up @@ -175,3 +223,13 @@ def test_switch_from_webhook_to_bot(tmp_path) -> None:
def test_bind_webhook_invalid_url(tmp_path) -> None:
db_path = tmp_path / "binding.db"
asyncio.run(_bind_webhook_invalid_url(str(db_path)))


def test_bind_webhook_domain_whitelist(tmp_path) -> None:
db_path = tmp_path / "binding.db"
asyncio.run(_bind_webhook_domain_whitelist(str(db_path)))


def test_bind_webhook_whitelist_disabled(tmp_path) -> None:
db_path = tmp_path / "binding.db"
asyncio.run(_bind_webhook_whitelist_disabled(str(db_path)))
19 changes: 19 additions & 0 deletions tests/test_telegram_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
BIND_MODE_CALLBACK_PREFIX,
PACK_CALLBACK_PREFIX,
PendingStickerPackRequest,
PendingWebhookBindRequest,
RunningStickerPackTask,
_cleanup_pending_requests,
_cleanup_pending_webhook_requests,
_deduplicate_filename,
_detect_sticker_mime,
_has_running_task_for_user,
Expand Down Expand Up @@ -176,3 +178,20 @@ async def reply_document(self, *args: object, **kwargs: object) -> None:

assert message.document_called is True
assert message.animation_called is False


def test_cleanup_pending_webhook_requests_removes_expired_only() -> None:
now = int(time.time())
pending = {
"expired_user": PendingWebhookBindRequest(
telegram_user_id="expired_user",
created_at=now - 3600,
),
"fresh_user": PendingWebhookBindRequest(
telegram_user_id="fresh_user",
created_at=now,
),
}
_cleanup_pending_webhook_requests(pending)
assert "expired_user" not in pending
assert "fresh_user" in pending