Skip to content

Commit

Permalink
Merge pull request #95 from Yelp/audit-diff-functionality
Browse files Browse the repository at this point in the history
Adding `audit --diff` functionality
  • Loading branch information
domanchi committed Nov 26, 2018
2 parents c3eae1d + 81418a0 commit 839f02b
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 7 deletions.
2 changes: 1 addition & 1 deletion detect_secrets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '0.10.5'
VERSION = '0.11.0'
215 changes: 212 additions & 3 deletions detect_secrets/core/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ def __init__(self, line):
)


class RedundantComparisonError(Exception):
pass


def audit_baseline(baseline_filename):
original_baseline = _get_baseline_from_file(baseline_filename)
if not original_baseline:
return

files_removed = _remove_nonexistent_files_from_baseline(original_baseline)

current_secret_index = 0
all_secrets = list(_secret_generator(original_baseline))
secrets_with_choices = [
(filename, secret) for filename, secret in all_secrets
Expand All @@ -41,6 +44,7 @@ def audit_baseline(baseline_filename):
total_choices = len(secrets_with_choices)
secret_iterator = BidirectionalIterator(secrets_with_choices)

current_secret_index = 0
for filename, secret in secret_iterator:
_clear_screen()

Expand Down Expand Up @@ -84,6 +88,87 @@ def audit_baseline(baseline_filename):
_save_baseline_to_file(baseline_filename, original_baseline)


def compare_baselines(old_baseline_filename, new_baseline_filename):
"""
This function enables developers to more easily configure plugin
settings, by comparing two generated baselines and highlighting
their differences.
For effective use, a few assumptions are made:
1. Baselines are sorted by (filename, line_number, hash).
This allows for a deterministic order, when doing a side-by-side
comparison.
2. Baselines are generated for the same codebase snapshot.
This means that we won't have cases where secrets are moved around;
only added or removed.
NOTE: We don't want to do a version check, because we want to be able to
use this functionality across versions (to see how the new version fares
compared to the old one).
"""
if old_baseline_filename == new_baseline_filename:
raise RedundantComparisonError

old_baseline = _get_baseline_from_file(old_baseline_filename)
new_baseline = _get_baseline_from_file(new_baseline_filename)

_remove_nonexistent_files_from_baseline(old_baseline)
_remove_nonexistent_files_from_baseline(new_baseline)

# We aggregate the secrets first, so that we can display a total count.
secrets_to_compare = _get_secrets_to_compare(old_baseline, new_baseline)
total_reviews = len(secrets_to_compare)
current_index = 0

secret_iterator = BidirectionalIterator(secrets_to_compare)
for filename, secret, is_removed in secret_iterator:
_clear_screen()
current_index += 1

header = '{} {}'
if is_removed:
plugins_used = old_baseline['plugins_used']
header = header.format(
BashColor.color('Status:', Color.BOLD),
'>> {} <<'.format(
BashColor.color('REMOVED', Color.RED),
),
)
else:
plugins_used = new_baseline['plugins_used']
header = header.format(
BashColor.color('Status:', Color.BOLD),
'>> {} <<'.format(
BashColor.color('ADDED', Color.LIGHT_GREEN),
),
)

try:
_print_context(
filename,
secret,
current_index,
total_reviews,
plugins_used,
additional_header_lines=header,
)
decision = _get_user_decision(
can_step_back=secret_iterator.can_step_back(),
prompt_secret_decision=False,
)
except SecretNotFoundOnSpecifiedLineError:
decision = _get_user_decision(prompt_secret_decision=False)

if decision == 'q':
print('Quitting...')
break

if decision == 'b': # pragma: no cover
current_index -= 2
secret_iterator.step_back_on_next_iteration()


def _get_baseline_from_file(filename): # pragma: no cover
try:
with open(filename) as f:
Expand All @@ -109,11 +194,128 @@ def _secret_generator(baseline):
yield filename, secret


def _get_secrets_to_compare(old_baseline, new_baseline):
"""
:rtype: list(tuple)
:param: tuple is in the following format:
filename: str; filename where identified secret is found
secret: dict; PotentialSecret json representation
is_secret_removed: bool; has the secret been removed from the
new baseline?
"""
def _check_string(a, b):
if a == b:
return 0
if a < b:
return -1
return 1

def _check_secret(a, b):
if a == b:
return 0

if a['line_number'] < b['line_number']:
return -1
elif a['line_number'] > b['line_number']:
return 1

return _check_string(a['hashed_secret'], b['hashed_secret'])

secrets_to_compare = []
for old_filename, new_filename in _comparison_generator(
sorted(old_baseline['results'].keys()),
sorted(new_baseline['results'].keys()),
compare_fn=_check_string,
):
if not new_filename:
secrets_to_compare += list(
map(
lambda x: (old_filename, x, True,),
old_baseline['results'][old_filename],
),
)
continue
elif not old_filename:
secrets_to_compare += list(
map(
lambda x: (new_filename, x, False,),
new_baseline['results'][new_filename],
),
)
continue

for old_secret, new_secret in _comparison_generator(
old_baseline['results'][old_filename],
new_baseline['results'][new_filename],
compare_fn=_check_secret,
):
if old_secret == new_secret:
# If they are the same, no point flagging it.
continue

if old_secret:
secrets_to_compare.append(
(old_filename, old_secret, True,),
)
else:
secrets_to_compare.append(
(new_filename, new_secret, False,),
)

return secrets_to_compare


def _comparison_generator(old_list, new_list, compare_fn):
"""
:type old_list: sorted list
:type new_list: sorted list
:type compare_fn: function
:param compare_fn:
takes two arguments, A and B
returns 0 if equal
returns -1 if A is less than B
else returns 1
"""
old_index = 0
new_index = 0
while old_index < len(old_list) and new_index < len(new_list):
old_value = old_list[old_index]
new_value = new_list[new_index]

status = compare_fn(old_value, new_value)
if status == 0:
yield (old_value, new_value,)
old_index += 1
new_index += 1
elif status == -1:
yield (old_value, None,)
old_index += 1
else:
yield (None, new_value,)
new_index += 1

# Catch leftovers. Only one of these while statements should run.
while old_index < len(old_list):
yield (old_list[old_index], None,)
old_index += 1
while new_index < len(new_list):
yield (None, new_list[new_index],)
new_index += 1


def _clear_screen(): # pragma: no cover
subprocess.call(['clear'])


def _print_context(filename, secret, count, total, plugin_settings): # pragma: no cover
def _print_context( # pragma: no cover
filename,
secret,
count,
total,
plugin_settings,
additional_header_lines=None,
):
"""
:type filename: str
:param filename: the file currently scanned.
Expand All @@ -130,6 +332,10 @@ def _print_context(filename, secret, count, total, plugin_settings): # pragma:
:type plugin_settings: list
:param plugin_settings: plugins used to create baseline.
:type additional_header_lines: str
:param additional_header_lines: any additional lines to add to the
header of the interactive audit display.
:raises: SecretNotFoundOnSpecifiedLineError
"""
print('{} {} {} {}\n{} {}\n{} {}'.format(
Expand All @@ -142,6 +348,9 @@ def _print_context(filename, secret, count, total, plugin_settings): # pragma:
BashColor.color('Secret Type:', Color.BOLD),
BashColor.color(secret['type'], Color.PURPLE),
))
if additional_header_lines:
print(additional_header_lines)

print('-' * 10)

error_obj = None
Expand Down Expand Up @@ -334,7 +543,7 @@ def _highlight_secret(secret_line, secret_lineno, secret, filename, plugin_setti
# copy the secret out of the line because .lower() from secret
# generator may be different from the original value:
secret_line[index_of_secret:end_of_secret],
Color.RED,
Color.RED_BACKGROUND,
),
secret_line[index_of_secret + len(raw_secret):],
)
Expand Down
3 changes: 3 additions & 0 deletions detect_secrets/core/bidirectional_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ def __next__(self):
self.step_back_once = False
else:
self.index += 1

if self.index < 0:
raise StopIteration

try:
result = self.collection[self.index]
except IndexError:
raise StopIteration

return result

def next(self):
Expand Down
3 changes: 2 additions & 1 deletion detect_secrets/core/color.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ class Color(Enum):
NORMAL = '[0m'
BOLD = '[1m'

RED = '[41m'
RED = '[91m'
RED_BACKGROUND = '[41m'
LIGHT_GREEN = '[92m'
PURPLE = '[95m'

Expand Down
12 changes: 11 additions & 1 deletion detect_secrets/core/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,23 @@ def __init__(self, subparser):
def add_arguments(self):
self.parser.add_argument(
'filename',
nargs=1,
nargs='+',
help=(
'Audit a given baseline file to distinguish the difference '
'between false and true positives.'
),
)

self.parser.add_argument(
'--diff',
action='store_true',
help=(
'Allows the comparison of two baseline files, in order to '
'effectively distinguish the difference between various '
'plugin configurations.'
),
)

return self


Expand Down
19 changes: 18 additions & 1 deletion detect_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,24 @@ def main(argv=None):
print(output)

elif args.action == 'audit':
audit.audit_baseline(args.filename[0])
if not args.diff:
audit.audit_baseline(args.filename[0])
return 0

if len(args.filename) != 2:
print(
'Must specify two files to compare!',
file=sys.stderr,
)
return 1

try:
audit.compare_baselines(args.filename[0], args.filename[1])
except audit.RedundantComparisonError:
print(
'No difference, because it\'s the same file!',
file=sys.stderr,
)

return 0

Expand Down
5 changes: 5 additions & 0 deletions test_data/each_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
red_herring = 'DEADBEEF'

base64_secret = 'c2VjcmV0IG1lc3NhZ2Ugc28geW91J2xsIG5ldmVyIGd1ZXNzIG15IHBhc3N3b3Jk'
hex_secret = '8b1118b376c313ed420e5133ba91307817ed52c2'
basic_auth = 'http://username:whywouldyouusehttpforpasswords@example.com'
Loading

0 comments on commit 839f02b

Please sign in to comment.