/
truffleHog.py
242 lines (217 loc) · 9.12 KB
/
truffleHog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import shutil
import sys
import math
import datetime
import argparse
import tempfile
import os
import json
import stat
from regexChecks import regexes
from git import Repo
def main():
parser = argparse.ArgumentParser(description='Find secrets hidden in the depths of git.')
parser.add_argument('--json', dest="output_json", action="store_true", help="Output in JSON")
parser.add_argument("--regex", dest="do_regex", action="store_true", help="Enable high signal regex checks")
parser.add_argument("--entropy", dest="do_entropy", help="Enable entropy checks")
parser.add_argument("--since_commit", dest="since_commit", help="Only scan from a given commit hash")
parser.add_argument("--max_depth", dest="max_depth", help="The max commit depth to go back when searching for secrets")
parser.add_argument('git_url', type=str, help='URL for secret searching')
parser.set_defaults(regex=False)
parser.set_defaults(max_depth=1000000)
parser.set_defaults(since_commit=None)
parser.set_defaults(entropy=True)
args = parser.parse_args()
do_entropy = str2bool(args.do_entropy)
output = find_strings(args.git_url, args.since_commit, args.max_depth, args.output_json, args.do_regex, do_entropy)
project_path = output["project_path"]
shutil.rmtree(project_path, onerror=del_rw)
def str2bool(v):
if v == None:
return True
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
BASE64_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
HEX_CHARS = "1234567890abcdefABCDEF"
def del_rw(action, name, exc):
os.chmod(name, stat.S_IWRITE)
os.remove(name)
def shannon_entropy(data, iterator):
"""
Borrowed from http://blog.dkbza.org/2007/05/scanning-data-for-entropy-anomalies.html
"""
if not data:
return 0
entropy = 0
for x in iterator:
p_x = float(data.count(x))/len(data)
if p_x > 0:
entropy += - p_x*math.log(p_x, 2)
return entropy
def get_strings_of_set(word, char_set, threshold=20):
count = 0
letters = ""
strings = []
for char in word:
if char in char_set:
letters += char
count += 1
else:
if count > threshold:
strings.append(letters)
letters = ""
count = 0
if count > threshold:
strings.append(letters)
return strings
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def clone_git_repo(git_url):
project_path = tempfile.mkdtemp()
Repo.clone_from(git_url, project_path)
return project_path
def print_results(printJson, issue):
commit_time = issue['date']
branch_name = issue['branch']
prev_commit = issue['commit']
printableDiff = issue['printDiff']
commitHash = issue['commitHash']
reason = issue['reason']
path = issue['path']
if printJson:
print(json.dumps(issue, sort_keys=True, indent=4))
else:
print("~~~~~~~~~~~~~~~~~~~~~")
reason = "{}Reason: {}{}".format(bcolors.OKGREEN, reason, bcolors.ENDC)
print(reason)
dateStr = "{}Date: {}{}".format(bcolors.OKGREEN, commit_time, bcolors.ENDC)
print(dateStr)
hashStr = "{}Hash: {}{}".format(bcolors.OKGREEN, commitHash, bcolors.ENDC)
print(hashStr)
filePath = "{}Filepath: {}{}".format(bcolors.OKGREEN, path, bcolors.ENDC)
print(filePath)
if sys.version_info >= (3, 0):
branchStr = "{}Branch: {}{}".format(bcolors.OKGREEN, branch_name, bcolors.ENDC)
print(branchStr)
commitStr = "{}Commit: {}{}".format(bcolors.OKGREEN, prev_commit, bcolors.ENDC)
print(commitStr)
print(printableDiff)
else:
branchStr = "{}Branch: {}{}".format(bcolors.OKGREEN, branch_name.encode('utf-8'), bcolors.ENDC)
print(branchStr)
commitStr = "{}Commit: {}{}".format(bcolors.OKGREEN, prev_commit.encode('utf-8'), bcolors.ENDC)
print(commitStr)
print(printableDiff.encode('utf-8'))
print("~~~~~~~~~~~~~~~~~~~~~")
def find_entropy(printableDiff, commit_time, branch_name, prev_commit, blob, commitHash):
stringsFound = []
lines = printableDiff.split("\n")
for line in lines:
for word in line.split():
base64_strings = get_strings_of_set(word, BASE64_CHARS)
hex_strings = get_strings_of_set(word, HEX_CHARS)
for string in base64_strings:
b64Entropy = shannon_entropy(string, BASE64_CHARS)
if b64Entropy > 4.5:
stringsFound.append(string)
printableDiff = printableDiff.replace(string, bcolors.WARNING + string + bcolors.ENDC)
for string in hex_strings:
hexEntropy = shannon_entropy(string, HEX_CHARS)
if hexEntropy > 3:
stringsFound.append(string)
printableDiff = printableDiff.replace(string, bcolors.WARNING + string + bcolors.ENDC)
entropicDiff = None
if len(stringsFound) > 0:
entropicDiff = {}
entropicDiff['date'] = commit_time
entropicDiff['path'] = blob.b_path if blob.b_path else blob.a_path
entropicDiff['branch'] = branch_name
entropicDiff['commit'] = prev_commit.message
entropicDiff['diff'] = blob.diff.decode('utf-8', errors='replace')
entropicDiff['stringsFound'] = stringsFound
entropicDiff['printDiff'] = printableDiff
entropicDiff['commitHash'] = commitHash
entropicDiff['reason'] = "High Entropy"
return entropicDiff
def regex_check(printableDiff, commit_time, branch_name, prev_commit, blob, commitHash):
regex_matches = []
for key in regexes:
found_strings = regexes[key].findall(printableDiff)
for found_string in found_strings:
found_diff = printableDiff.replace(printableDiff, bcolors.WARNING + found_string + bcolors.ENDC)
if found_strings:
foundRegex = {}
foundRegex['date'] = commit_time
foundRegex['path'] = blob.b_path if blob.b_path else blob.a_path
foundRegex['branch'] = branch_name
foundRegex['commit'] = prev_commit.message
foundRegex['diff'] = blob.diff.decode('utf-8', errors='replace')
foundRegex['stringsFound'] = found_strings
foundRegex['printDiff'] = found_diff
foundRegex['reason'] = key
foundRegex['commitHash'] = commitHash
regex_matches.append(foundRegex)
return regex_matches
def find_strings(git_url, since_commit=None, max_depth=None, printJson=False, do_regex=False, do_entropy=True):
output = {"entropicDiffs": []}
project_path = clone_git_repo(git_url)
repo = Repo(project_path)
already_searched = set()
for remote_branch in repo.remotes.origin.fetch():
since_commit_reached = False
branch_name = remote_branch.name.split('/')[1]
try:
repo.git.checkout(remote_branch, b=branch_name)
except:
pass
prev_commit = None
for curr_commit in repo.iter_commits(max_count=max_depth):
commitHash = curr_commit.hexsha
if commitHash == since_commit:
since_commit_reached = True
if since_commit and since_commit_reached:
prev_commit = curr_commit
continue
if not prev_commit:
pass
else:
#avoid searching the same diffs
hashes = str(prev_commit) + str(curr_commit)
if hashes in already_searched:
prev_commit = curr_commit
continue
already_searched.add(hashes)
diff = prev_commit.diff(curr_commit, create_patch=True)
for blob in diff:
printableDiff = blob.diff.decode('utf-8', errors='replace')
if printableDiff.startswith("Binary files"):
continue
commit_time = datetime.datetime.fromtimestamp(prev_commit.committed_date).strftime('%Y-%m-%d %H:%M:%S')
foundIssues = []
if do_entropy:
entropicDiff = find_entropy(printableDiff, commit_time, branch_name, prev_commit, blob, commitHash)
if entropicDiff:
foundIssues.append(entropicDiff)
if do_regex:
found_regexes = regex_check(printableDiff, commit_time, branch_name, prev_commit, blob, commitHash)
foundIssues += found_regexes
for foundIssue in foundIssues:
print_results(printJson, foundIssue)
prev_commit = curr_commit
output["project_path"] = project_path
return output
if __name__ == "__main__":
main()