-
Notifications
You must be signed in to change notification settings - Fork 11
/
zgrab2scannersmtp.py
162 lines (140 loc) · 5.81 KB
/
zgrab2scannersmtp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""smtp"""
import os
import signal
import traceback
import uuid
from datacontract.iscandataset.iscantask import IscanTask
from .zgrab2scannerbase import Zgrab2ScannerBase
from ..zgrab2parser import Zgrab2ParserSMTP
class Zgrab2ScannerSMTP(Zgrab2ScannerBase):
"""zgrab2 http scanner"""
def __init__(self, zgrab_path: str):
Zgrab2ScannerBase.__init__(self, "zgrab2smtp")
self._parser: Zgrab2ParserSMTP = Zgrab2ParserSMTP()
def get_banner_smtp(
self,
task: IscanTask,
level,
pinfo_dict,
port,
*args,
zgrab2path: str = "zgrab2",
sudo: bool = False,
timeout: float = 600,
) -> iter:
"""scan smtp services and get the banner"""
hostfi = None
outfi = None
try:
if not isinstance(port, int) or port < 0 or port > 65535:
raise Exception("Invalid port: {}".format(port))
hosts: iter = pinfo_dict.keys()
hostfi = self._write_hosts_to_file(task, hosts)
if hostfi is None:
return
outfi = self._scan_smtp(
task,
level,
hostfi,
port,
*args,
zgrab2path=zgrab2path,
sudo=sudo,
timeout=timeout,
)
if outfi is None or not os.path.isfile(outfi):
return
self._parser.parse_banner(task, level, pinfo_dict, outfi)
except Exception:
self._logger.error("Scan mssql error: {}".format(traceback.format_exc()))
finally:
if not hostfi is None and os.path.isfile(hostfi):
os.remove(hostfi)
if not outfi is None and os.path.isfile(outfi):
os.remove(outfi)
def _scan_smtp(
self,
task: IscanTask,
level,
host_file: str,
port: int,
*args,
zgrab2path: str = "zgrab2",
sudo: bool = False,
timeout: float = 600,
) -> str:
"""scan smtp"""
outfi: str = None
try:
enhanced_args = []
# add hosts and ports to args
enhanced_args.append("smtp")
enhanced_args.append(f"-p {port}")
# zgrab2 smtp --send-ehlo --ehlo-domain="mail.example.com" --starttls --keep-client-logs -f host.txt -o smtp3.json
enhanced_args.append(f"-t {timeout}")
if not "--send-ehlo" in args:
enhanced_args.append("--send-ehlo")
if not "--ehlo-domain=" in args:
enhanced_args.append('--ehlo-domain="mail.example.com"')
if not "--keep-client-logs" in args:
enhanced_args.append("--keep-client-logs")
# 这个args里面几乎没有东西,除非真的是外面有特殊说明这个才有值,所以还是先留在这里
enhanced_args.extend(args)
if port == 25 and not "--starttls" in enhanced_args:
enhanced_args.append("--starttls")
if "--smtps" in enhanced_args:
enhanced_args.remove("--smtps")
elif port == 465 and not "--smtps" in enhanced_args:
# although it have scanned tls certificate in advance,
# here should scan it again in order to check if the
# certificate is different from the one previously scanned.
enhanced_args.append("--smtps")
if "--starttls" in enhanced_args:
enhanced_args.remove("--starttls")
if "--input-file=" not in args or "-f" not in args:
enhanced_args.append(f"-f {host_file}") # input file
# outfi = os.path.join(self._tmpdir, "{}_{}.smtp".format(task.taskid, port))
with self._outfile_locker:
outfi = os.path.join(
self._tmpdir, "{}_{}.smtp".format(str(uuid.uuid1()), port)
)
while os.path.isfile(outfi):
outfi = os.path.join(
self._tmpdir, "{}_{}.smtp".format(str(uuid.uuid1()), port)
)
if "--output-file=" not in args or "-o" not in args:
# here must use -o, use '--output-file' will cause exception 'No such file or directory'
# this may be a bug
# 人家没说可以用--output-file
enhanced_args.append(f"-o {outfi}") # output file
# 如果没有当前文件夹那么就创建当前文件夹
outdir = os.path.dirname(outfi)
if not os.path.exists(outdir) or not os.path.isdir(outdir):
os.makedirs(outdir)
curr_process = None
try:
curr_process = self._run_process(
zgrab2path, *enhanced_args, rootDir=outdir, sudo=sudo
)
stdout, stderr = curr_process.communicate(timeout=timeout)
exitcode = curr_process.wait(timeout=10)
if stdout is not None:
self._logger.trace(stdout)
if stderr is not None:
self._logger.trace(stderr)
if exitcode != 0:
raise Exception(f"Scan smtp error: {stdout}\n{stderr}")
self._logger.info(
f"Scan smtp exitcode={str(exitcode)}\ntaskid:{task.taskid}\nbatchid:{task.batchid}\nport:{port}"
)
finally:
if curr_process is not None:
curr_process.kill()
except Exception:
if outfi is not None and os.path.isfile(outfi):
os.remove(outfi)
outfi = None
self._logger.info(
f"Scan smtp error\ntaskid:{task.taskid}\nbatchid:{task.batchid}\nport:{port}"
)
return outfi