Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 67 additions & 10 deletions 05-代码示例/api_security_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@
- API8: 配置错误
- API9: 库存清单管理(影子 API)
- API10: 不安全消费第三方

⚠️ SECURITY · 法律 / 合规 ⚠️
本模块所有 offensive 函数默认 **refuse**。授权方式:
export TAGENT_PENTEST_AUTHORIZED=1 # 显式确认拥有书面渗透授权
仅在以下场景启用:
- 你拥有的资产
- 持书面授权的客户 / CTF 靶场 / bug bounty in-scope target
未授权对第三方系统运行可能违反:CFAA(美国)/ 网络安全法(中国)/ 等同地方法律。

特别风险:
- test_ssrf AWS metadata + file:// 预设默认 **关闭**(confirm_metadata_probe=True 才启用)。
理由:探 169.254.169.254 会触发 GuardDuty / CloudWatch 告警;file:// 属 LFI signature。
- test_csrf / test_jwt_none_alg 实际伪造请求,可能改写目标数据。
"""
import json
import logging
Expand All @@ -23,6 +36,21 @@
logger = logging.getLogger(__name__)


# ===== 授权 gate =====

PENTEST_AUTHORIZED = os.getenv("TAGENT_PENTEST_AUTHORIZED") == "1"


def _require_authorized(op: str) -> None:
"""Refuse offensive op unless TAGENT_PENTEST_AUTHORIZED=1."""
if not PENTEST_AUTHORIZED:
raise RuntimeError(
f"pentest op '{op}' refused: set TAGENT_PENTEST_AUTHORIZED=1 to enable. "
"Authorize ONLY on assets you own / have written pentest authorization. "
"Risks: CFAA / 网络安全法 violation, IDS alerts, target data corruption."
)


# ===== API1 BOLA / IDOR(越权访问对象) =====

def test_idor(base_url: str, endpoint_template: str,
Expand All @@ -32,6 +60,7 @@ def test_idor(base_url: str, endpoint_template: str,
用 own_token 访问 other_id 的资源,应被拒绝(403/404)。
endpoint_template: "/api/users/{id}/profile"
"""
_require_authorized("test_idor")
url = urljoin(base_url, endpoint_template.format(id=other_id))
r = requests.get(url, headers={"Authorization": f"Bearer {own_token}"}, timeout=10)
blocked = r.status_code in (401, 403, 404)
Expand All @@ -49,6 +78,7 @@ def test_idor(base_url: str, endpoint_template: str,

def test_no_auth(url: str, method: str = "GET") -> Dict:
"""无 token 访问受保护资源应被拒"""
_require_authorized("test_no_auth")
r = requests.request(method, url, timeout=10)
return {
"test": "missing_auth",
Expand All @@ -59,6 +89,7 @@ def test_no_auth(url: str, method: str = "GET") -> Dict:


def test_invalid_token(url: str, invalid_token: str = "invalid.jwt.token") -> Dict:
_require_authorized("test_invalid_token")
r = requests.get(url, headers={"Authorization": f"Bearer {invalid_token}"}, timeout=10)
return {
"test": "invalid_token",
Expand All @@ -71,6 +102,7 @@ def test_invalid_token(url: str, invalid_token: str = "invalid.jwt.token") -> Di

def test_rate_limit(url: str, total: int = 100, headers: Optional[Dict] = None) -> Dict:
"""连续请求 N 次,验证限流是否生效(429 状态)"""
_require_authorized("test_rate_limit")
statuses = []
for _ in range(total):
try:
Expand All @@ -90,14 +122,28 @@ def test_rate_limit(url: str, total: int = 100, headers: Optional[Dict] = None)
# ===== API7 SSRF =====

def test_ssrf(url: str, vulnerable_param: str = "url",
probe_targets: Optional[List[str]] = None) -> Dict:
"""探测 SSRF:用内网地址试探"""
probe_targets = probe_targets or [
"http://169.254.169.254/latest/meta-data/", # AWS metadata
"http://localhost:22",
"http://127.0.0.1:6379",
"file:///etc/passwd",
]
probe_targets: Optional[List[str]] = None,
confirm_metadata_probe: bool = False) -> Dict:
"""探测 SSRF:用内网地址试探。

必须传 probe_targets,否则 raises ValueError。
AWS metadata (169.254.169.254) + file:///etc/passwd + localhost 高风险预设仅在
confirm_metadata_probe=True 时启用(触发 GuardDuty / LFI signature)。
"""
_require_authorized("test_ssrf")
if probe_targets is None:
if not confirm_metadata_probe:
raise ValueError(
"test_ssrf requires explicit probe_targets list. "
"Pass confirm_metadata_probe=True to use AWS metadata + file:// preset "
"(triggers GuardDuty / LFI signature; ONLY on AWS accounts you own)."
)
probe_targets = [
"http://169.254.169.254/latest/meta-data/", # AWS metadata
"http://localhost:22",
"http://127.0.0.1:6379",
"file:///etc/passwd",
]
findings = []
for probe in probe_targets:
try:
Expand All @@ -115,6 +161,7 @@ def test_ssrf(url: str, vulnerable_param: str = "url",

def test_jwt_none_alg(target_url: str, original_token: str) -> Dict:
"""JWT alg=none 攻击:把签名置空,alg 改 none"""
_require_authorized("test_jwt_none_alg")
import base64
parts = original_token.split(".")
if len(parts) != 3:
Expand Down Expand Up @@ -151,6 +198,7 @@ def test_cors(url: str, malicious_origin: str = "https://evil.com") -> Dict:

def test_csrf(post_url: str, body: Dict, session_cookie: Optional[Dict] = None) -> Dict:
"""无 CSRF token / 无 origin 验证下的写操作"""
_require_authorized("test_csrf")
headers = {"Origin": "https://evil.com"}
r = requests.post(post_url, json=body, cookies=session_cookie or {}, headers=headers, timeout=10)
return {
Expand Down Expand Up @@ -185,12 +233,21 @@ def scan_api(base_url: str, endpoints: List[Dict], token: str) -> Dict:
parser = argparse.ArgumentParser(description="API 安全扫描")
sub = parser.add_subparsers(dest="cmd")
rl = sub.add_parser("rate-limit"); rl.add_argument("url"); rl.add_argument("--total", type=int, default=100)
ss = sub.add_parser("ssrf"); ss.add_argument("url"); ss.add_argument("--param", default="url")
ss = sub.add_parser("ssrf")
ss.add_argument("url")
ss.add_argument("--param", default="url")
ss.add_argument("--probe", action="append", dest="probe_targets",
help="可重复; e.g. --probe http://10.0.0.1/")
ss.add_argument("--confirm-metadata-probe", action="store_true",
help="启用 AWS metadata + file:// 预设 probe (触发 GuardDuty / LFI signature)")
cr = sub.add_parser("cors"); cr.add_argument("url")
args = parser.parse_args()
if args.cmd == "rate-limit":
print(json.dumps(test_rate_limit(args.url, args.total), indent=2))
elif args.cmd == "ssrf":
print(json.dumps(test_ssrf(args.url, args.param), indent=2, ensure_ascii=False))
print(json.dumps(test_ssrf(args.url, args.param,
probe_targets=args.probe_targets,
confirm_metadata_probe=args.confirm_metadata_probe),
indent=2, ensure_ascii=False))
elif args.cmd == "cors":
print(json.dumps(test_cors(args.url), indent=2))
2 changes: 1 addition & 1 deletion SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
|------|------|
| `02-专家定义/15-渗透测试.md` | 渗透测试 Agent(调用 sqlmap / Metasploit / Hydra 等真实攻击工具) |
| `03-技能定义/pentest-*.md`(7 项) | 渗透 skill 系列(api / coordinator / exploit / recon / report / vuln / web) |
| `05-代码示例/api_security_scanner.py` | API 安全扫描器(SSRF probe 默认 169.254.169.254 / file:///etc/passwd) |
| `05-代码示例/api_security_scanner.py` | API 安全扫描器(SSRF / IDOR / JWT / CSRF; 默认 refuse,需 `TAGENT_PENTEST_AUTHORIZED=1` + AWS metadata 探针需 `confirm_metadata_probe=True`) |
| `05-代码示例/ai_adversarial.py` | AI 对抗测试(含 JAILBREAK_PROMPTS + PROMPT_INJECTION_TEMPLATES 模板) |
| `05-代码示例/security_scanner.py` | 通用安全扫描器(调用 ZAP / Burp) |

Expand Down
Loading