forked from 18F/domain-scan
/
gather
executable file
·176 lines (135 loc) · 5.44 KB
/
gather
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
import os
import sys
import csv
import requests
import logging
import importlib
from scanners import utils
options = utils.options()
utils.configure_logging(options)
utils.mkdir_p(utils.cache_dir())
utils.mkdir_p(utils.results_dir())
# some metadata about the scan itself
start_time = utils.utc_timestamp()
start_command = str.join(" ", sys.argv)
def run(options=None):
if not options["_"]:
logging.error("Specify a gatherer.")
exit(1)
# For now, require a --suffix.
suffix = utils.normalize_suffix(options.get("suffix"))
if suffix is None:
logging.error("--suffix is required.")
exit(1)
suffix_pattern = utils.suffix_pattern(suffix)
# Opt in to include parent (second-level) domains.
include_parents = options.get("include-parents", False)
# --parents should be a CSV whose first column is parent domains
# that will act as a whitelist for which subdomains to gather.
parents = get_parent_domains(options)
# Import the gatherer(s).
sources = options["_"][0].split(",")
# Open CSV writer.
if len(sources) > 1:
filename = "gathered"
else:
filename = sources[0] # backwards compatibility, mostly
# De-duping hostnames. This will cause the system to hold all
# hostnames in memory at once, but oh well.
hostnames_cache = {}
for source in sources:
extra = {}
try:
gatherer = importlib.import_module("gatherers.%s" % source)
except ImportError:
# If it's not a registered module, allow it to be "hot registered"
# as long as the user gave us a flag with that name that can be
# used as the --url option to the URL module.
if options.get(source):
gatherer = importlib.import_module("gatherers.url")
extra['name'] = source
else:
exc_type, exc_value, exc_traceback = sys.exc_info()
logging.error("[%s] Gatherer not found, or had an error during loading.\n\tERROR: %s\n\t%s" % (source, exc_type, exc_value))
exit(1)
# Iterate over each hostname.
for domain in gatherer.gather(suffix, options, extra):
# Always apply the suffix to returned names.
if not suffix_pattern.search(domain):
continue
base = utils.base_domain_for(domain)
# Unless --include-parents is specified, exclude them.
if not include_parents:
if (domain == base) or (domain == "www.%s" % base):
continue
# Apply --parent domain whitelist, if present.
if parents:
if base not in parents:
continue
# Use hostname cache to de-dupe, if seen before.
if domain not in hostnames_cache:
hostnames_cache[domain] = [source]
elif source not in hostnames_cache[domain]:
hostnames_cache[domain] += [source]
# Now that we've gone through all sources and logged when each
# domain appears in each one, go through cache and write
# all of them to disk.
# Assemble headers.
headers = ["Domain", "Base Domain"]
# Add headers dynamically for each source.
headers += sources
# Open CSV file.
gathered_filename = "%s/%s.csv" % (utils.results_dir(), filename)
gathered_file = open(gathered_filename, 'w', newline='')
gathered_writer = csv.writer(gathered_file)
gathered_writer.writerow(headers)
# Write each hostname to disk, with all discovered sources.
hostnames = list(hostnames_cache.keys())
hostnames.sort()
for hostname in hostnames:
base = utils.base_domain_for(hostname)
row = [hostname, base]
for source in sources:
row += [source in hostnames_cache[hostname]]
gathered_writer.writerow(row)
# Close CSV file.
gathered_file.close()
# If sort requested, sort in place by domain.
if options.get("sort"):
utils.sort_csv(gathered_filename)
logging.warn("Results written to CSV.")
# Save metadata.
metadata = {
'start_time': start_time,
'end_time': utils.utc_timestamp(),
'command': start_command
}
utils.write(utils.json_for(metadata), "%s/meta.json" % utils.results_dir())
# Read in parent domains from the first column of a given CSV.
def get_parent_domains(options):
parents = options.get("parents")
if not parents:
return None
# If --parents is a URL, we want to download it now,
# and then adjust the value to be the path of the cached download.
if parents.startswith("http:") or parents.startswith("https:"):
# Though it's saved in cache/, it will be downloaded every time.
parents_path = os.path.join(utils.cache_dir(), "parents.csv")
try:
response = requests.get(parents)
utils.write(response.text, parents_path)
except:
logging.error("Parent domains URL not downloaded successfully.")
print(utils.format_last_exception())
exit(1)
parents = parents_path
parent_domains = []
with open(parents, encoding='utf-8', newline='') as csvfile:
for row in csv.reader(csvfile):
if (not row[0]) or (row[0].lower().startswith("domain")):
continue
parent_domains.append(row[0].lower())
return parent_domains
if __name__ == '__main__':
run(options)