From c4b7389559e1c2ca06a521989ea2fee0f3e2e780 Mon Sep 17 00:00:00 2001 From: xiaoxing0135 <706015750@qq.com> Date: Wed, 13 May 2026 23:08:07 +0800 Subject: [PATCH] fix(bundle2-W5-2): api_security_scanner env-var gate + SSRF metadata opt-in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 7 offensive functions (test_idor, test_no_auth, test_invalid_token, test_rate_limit, test_ssrf, test_jwt_none_alg, test_csrf) now refuse unless TAGENT_PENTEST_AUTHORIZED=1 — same pattern as W5-1 chaos_helper. test_ssrf additionally: - requires explicit probe_targets list (raises ValueError if missing) - AWS metadata / file:// / localhost preset gated behind confirm_metadata_probe=True (triggers GuardDuty / LFI signature) - CLI gains --probe (repeatable) + --confirm-metadata-probe flags SECURITY.md row updated to reflect refuse-by-default behavior. test_cors left ungated (passive OPTIONS preflight only). scan_api inherits gate via inner calls — no separate guard needed. --- .../api_security_scanner.py" | 77 ++++++++++++++++--- SECURITY.md | 2 +- 2 files changed, 68 insertions(+), 11 deletions(-) diff --git "a/05-\344\273\243\347\240\201\347\244\272\344\276\213/api_security_scanner.py" "b/05-\344\273\243\347\240\201\347\244\272\344\276\213/api_security_scanner.py" index 8c02f5b..9117354 100644 --- "a/05-\344\273\243\347\240\201\347\244\272\344\276\213/api_security_scanner.py" +++ "b/05-\344\273\243\347\240\201\347\244\272\344\276\213/api_security_scanner.py" @@ -11,6 +11,19 @@ - API8: 配置错误 - API9: 库存清单管理(影子 API) - API10: 不安全消费第三方 + +⚠️ SECURITY · 法律 / 合规 ⚠️ +本模块所有 offensive 函数默认 **refuse**。授权方式: + export TAGENT_PENTEST_AUTHORIZED=1 # 显式确认拥有书面渗透授权 +仅在以下场景启用: + - 你拥有的资产 + - 持书面授权的客户 / CTF 靶场 / bug bounty in-scope target +未授权对第三方系统运行可能违反:CFAA(美国)/ 网络安全法(中国)/ 等同地方法律。 + +特别风险: + - test_ssrf AWS metadata + file:// 预设默认 **关闭**(confirm_metadata_probe=True 才启用)。 + 理由:探 169.254.169.254 会触发 GuardDuty / CloudWatch 告警;file:// 属 LFI signature。 + - test_csrf / test_jwt_none_alg 实际伪造请求,可能改写目标数据。 """ import json import logging @@ -23,6 +36,21 @@ logger = logging.getLogger(__name__) +# ===== 授权 gate ===== + +PENTEST_AUTHORIZED = os.getenv("TAGENT_PENTEST_AUTHORIZED") == "1" + + +def _require_authorized(op: str) -> None: + """Refuse offensive op unless TAGENT_PENTEST_AUTHORIZED=1.""" + if not PENTEST_AUTHORIZED: + raise RuntimeError( + f"pentest op '{op}' refused: set TAGENT_PENTEST_AUTHORIZED=1 to enable. " + "Authorize ONLY on assets you own / have written pentest authorization. " + "Risks: CFAA / 网络安全法 violation, IDS alerts, target data corruption." + ) + + # ===== API1 BOLA / IDOR(越权访问对象) ===== def test_idor(base_url: str, endpoint_template: str, @@ -32,6 +60,7 @@ def test_idor(base_url: str, endpoint_template: str, 用 own_token 访问 other_id 的资源,应被拒绝(403/404)。 endpoint_template: "/api/users/{id}/profile" """ + _require_authorized("test_idor") url = urljoin(base_url, endpoint_template.format(id=other_id)) r = requests.get(url, headers={"Authorization": f"Bearer {own_token}"}, timeout=10) blocked = r.status_code in (401, 403, 404) @@ -49,6 +78,7 @@ def test_idor(base_url: str, endpoint_template: str, def test_no_auth(url: str, method: str = "GET") -> Dict: """无 token 访问受保护资源应被拒""" + _require_authorized("test_no_auth") r = requests.request(method, url, timeout=10) return { "test": "missing_auth", @@ -59,6 +89,7 @@ def test_no_auth(url: str, method: str = "GET") -> Dict: def test_invalid_token(url: str, invalid_token: str = "invalid.jwt.token") -> Dict: + _require_authorized("test_invalid_token") r = requests.get(url, headers={"Authorization": f"Bearer {invalid_token}"}, timeout=10) return { "test": "invalid_token", @@ -71,6 +102,7 @@ def test_invalid_token(url: str, invalid_token: str = "invalid.jwt.token") -> Di def test_rate_limit(url: str, total: int = 100, headers: Optional[Dict] = None) -> Dict: """连续请求 N 次,验证限流是否生效(429 状态)""" + _require_authorized("test_rate_limit") statuses = [] for _ in range(total): try: @@ -90,14 +122,28 @@ def test_rate_limit(url: str, total: int = 100, headers: Optional[Dict] = None) # ===== API7 SSRF ===== def test_ssrf(url: str, vulnerable_param: str = "url", - probe_targets: Optional[List[str]] = None) -> Dict: - """探测 SSRF:用内网地址试探""" - probe_targets = probe_targets or [ - "http://169.254.169.254/latest/meta-data/", # AWS metadata - "http://localhost:22", - "http://127.0.0.1:6379", - "file:///etc/passwd", - ] + probe_targets: Optional[List[str]] = None, + confirm_metadata_probe: bool = False) -> Dict: + """探测 SSRF:用内网地址试探。 + + 必须传 probe_targets,否则 raises ValueError。 + AWS metadata (169.254.169.254) + file:///etc/passwd + localhost 高风险预设仅在 + confirm_metadata_probe=True 时启用(触发 GuardDuty / LFI signature)。 + """ + _require_authorized("test_ssrf") + if probe_targets is None: + if not confirm_metadata_probe: + raise ValueError( + "test_ssrf requires explicit probe_targets list. " + "Pass confirm_metadata_probe=True to use AWS metadata + file:// preset " + "(triggers GuardDuty / LFI signature; ONLY on AWS accounts you own)." + ) + probe_targets = [ + "http://169.254.169.254/latest/meta-data/", # AWS metadata + "http://localhost:22", + "http://127.0.0.1:6379", + "file:///etc/passwd", + ] findings = [] for probe in probe_targets: try: @@ -115,6 +161,7 @@ def test_ssrf(url: str, vulnerable_param: str = "url", def test_jwt_none_alg(target_url: str, original_token: str) -> Dict: """JWT alg=none 攻击:把签名置空,alg 改 none""" + _require_authorized("test_jwt_none_alg") import base64 parts = original_token.split(".") if len(parts) != 3: @@ -151,6 +198,7 @@ def test_cors(url: str, malicious_origin: str = "https://evil.com") -> Dict: def test_csrf(post_url: str, body: Dict, session_cookie: Optional[Dict] = None) -> Dict: """无 CSRF token / 无 origin 验证下的写操作""" + _require_authorized("test_csrf") headers = {"Origin": "https://evil.com"} r = requests.post(post_url, json=body, cookies=session_cookie or {}, headers=headers, timeout=10) return { @@ -185,12 +233,21 @@ def scan_api(base_url: str, endpoints: List[Dict], token: str) -> Dict: parser = argparse.ArgumentParser(description="API 安全扫描") sub = parser.add_subparsers(dest="cmd") rl = sub.add_parser("rate-limit"); rl.add_argument("url"); rl.add_argument("--total", type=int, default=100) - ss = sub.add_parser("ssrf"); ss.add_argument("url"); ss.add_argument("--param", default="url") + ss = sub.add_parser("ssrf") + ss.add_argument("url") + ss.add_argument("--param", default="url") + ss.add_argument("--probe", action="append", dest="probe_targets", + help="可重复; e.g. --probe http://10.0.0.1/") + ss.add_argument("--confirm-metadata-probe", action="store_true", + help="启用 AWS metadata + file:// 预设 probe (触发 GuardDuty / LFI signature)") cr = sub.add_parser("cors"); cr.add_argument("url") args = parser.parse_args() if args.cmd == "rate-limit": print(json.dumps(test_rate_limit(args.url, args.total), indent=2)) elif args.cmd == "ssrf": - print(json.dumps(test_ssrf(args.url, args.param), indent=2, ensure_ascii=False)) + print(json.dumps(test_ssrf(args.url, args.param, + probe_targets=args.probe_targets, + confirm_metadata_probe=args.confirm_metadata_probe), + indent=2, ensure_ascii=False)) elif args.cmd == "cors": print(json.dumps(test_cors(args.url), indent=2)) diff --git a/SECURITY.md b/SECURITY.md index 92bda60..ae1531b 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -68,7 +68,7 @@ |------|------| | `02-专家定义/15-渗透测试.md` | 渗透测试 Agent(调用 sqlmap / Metasploit / Hydra 等真实攻击工具) | | `03-技能定义/pentest-*.md`(7 项) | 渗透 skill 系列(api / coordinator / exploit / recon / report / vuln / web) | -| `05-代码示例/api_security_scanner.py` | API 安全扫描器(含 SSRF probe 默认 169.254.169.254 / file:///etc/passwd) | +| `05-代码示例/api_security_scanner.py` | API 安全扫描器(SSRF / IDOR / JWT / CSRF; 默认 refuse,需 `TAGENT_PENTEST_AUTHORIZED=1` + AWS metadata 探针需 `confirm_metadata_probe=True`) | | `05-代码示例/ai_adversarial.py` | AI 对抗测试(含 JAILBREAK_PROMPTS + PROMPT_INJECTION_TEMPLATES 模板) | | `05-代码示例/security_scanner.py` | 通用安全扫描器(调用 ZAP / Burp) |