From 670c2822a7fd6e54edfa81b4be26f12b110a414d Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Tue, 6 Nov 2018 17:06:45 -0800 Subject: [PATCH 1/2] audit diff functionality --- detect_secrets/__init__.py | 2 +- detect_secrets/core/audit.py | 215 +++++++++++++++++- detect_secrets/core/bidirectional_iterator.py | 3 + detect_secrets/core/color.py | 3 +- detect_secrets/core/usage.py | 12 +- detect_secrets/main.py | 19 +- 6 files changed, 247 insertions(+), 7 deletions(-) diff --git a/detect_secrets/__init__.py b/detect_secrets/__init__.py index 5308a7d1e..d3c01ed5c 100644 --- a/detect_secrets/__init__.py +++ b/detect_secrets/__init__.py @@ -1 +1 @@ -VERSION = '0.10.5' +VERSION = '0.11.0' diff --git a/detect_secrets/core/audit.py b/detect_secrets/core/audit.py index 880cc6aaa..54fb70717 100644 --- a/detect_secrets/core/audit.py +++ b/detect_secrets/core/audit.py @@ -25,6 +25,10 @@ def __init__(self, line): ) +class RedundantComparisonError(Exception): + pass + + def audit_baseline(baseline_filename): original_baseline = _get_baseline_from_file(baseline_filename) if not original_baseline: @@ -32,7 +36,6 @@ def audit_baseline(baseline_filename): files_removed = _remove_nonexistent_files_from_baseline(original_baseline) - current_secret_index = 0 all_secrets = list(_secret_generator(original_baseline)) secrets_with_choices = [ (filename, secret) for filename, secret in all_secrets @@ -41,6 +44,7 @@ def audit_baseline(baseline_filename): total_choices = len(secrets_with_choices) secret_iterator = BidirectionalIterator(secrets_with_choices) + current_secret_index = 0 for filename, secret in secret_iterator: _clear_screen() @@ -84,6 +88,87 @@ def audit_baseline(baseline_filename): _save_baseline_to_file(baseline_filename, original_baseline) +def compare_baselines(old_baseline_filename, new_baseline_filename): + """ + This function enables developers to more easily configure plugin + settings, by comparing two generated baselines and highlighting + their differences. + + For effective use, a few assumptions are made: + 1. Baselines are sorted by (filename, line_number, hash). + This allows for a deterministic order, when doing a side-by-side + comparison. + + 2. Baselines are generated for the same codebase snapshot. + This means that we won't have cases where secrets are moved around; + only added or removed. + + NOTE: We don't want to do a version check, because we want to be able to + use this functionality across versions (to see how the new version fares + compared to the old one). + """ + if old_baseline_filename == new_baseline_filename: + raise RedundantComparisonError + + old_baseline = _get_baseline_from_file(old_baseline_filename) + new_baseline = _get_baseline_from_file(new_baseline_filename) + + _remove_nonexistent_files_from_baseline(old_baseline) + _remove_nonexistent_files_from_baseline(new_baseline) + + # We aggregate the secrets first, so that we can display a total count. + secrets_to_compare = _get_secrets_to_compare(old_baseline, new_baseline) + total_reviews = len(secrets_to_compare) + current_index = 0 + + secret_iterator = BidirectionalIterator(secrets_to_compare) + for filename, secret, is_removed in secret_iterator: + _clear_screen() + current_index += 1 + + header = '{} {}' + if is_removed: + plugins_used = old_baseline['plugins_used'] + header = header.format( + BashColor.color('Status:', Color.BOLD), + '>> {} <<'.format( + BashColor.color('REMOVED', Color.RED), + ), + ) + else: + plugins_used = new_baseline['plugins_used'] + header = header.format( + BashColor.color('Status:', Color.BOLD), + '>> {} <<'.format( + BashColor.color('ADDED', Color.LIGHT_GREEN), + ), + ) + + try: + _print_context( + filename, + secret, + current_index, + total_reviews, + plugins_used, + additional_header_lines=header, + ) + decision = _get_user_decision( + can_step_back=secret_iterator.can_step_back(), + prompt_secret_decision=False, + ) + except SecretNotFoundOnSpecifiedLineError: + decision = _get_user_decision(prompt_secret_decision=False) + + if decision == 'q': + print('Quitting...') + break + + if decision == 'b': + current_index -= 2 + secret_iterator.step_back_on_next_iteration() + + def _get_baseline_from_file(filename): # pragma: no cover try: with open(filename) as f: @@ -109,11 +194,128 @@ def _secret_generator(baseline): yield filename, secret +def _get_secrets_to_compare(old_baseline, new_baseline): + """ + :rtype: list(tuple) + :param: tuple is in the following format: + filename: str; filename where identified secret is found + secret: dict; PotentialSecret json representation + is_secret_removed: bool; has the secret been removed from the + new baseline? + """ + def _check_string(a, b): + if a == b: + return 0 + if a < b: + return -1 + return 1 + + def _check_secret(a, b): + if a == b: + return 0 + + if a['line_number'] < b['line_number']: + return -1 + elif a['line_number'] > b['line_number']: + return 1 + + return _check_string(a['hashed_secret'], b['hashed_secret']) + + secrets_to_compare = [] + for old_filename, new_filename in _comparison_generator( + sorted(old_baseline['results'].keys()), + sorted(new_baseline['results'].keys()), + compare_fn=_check_string, + ): + if not new_filename: + secrets_to_compare += list( + map( + lambda x: (old_filename, x, True,), + old_baseline['results'][old_filename], + ), + ) + continue + elif not old_filename: + secrets_to_compare += list( + map( + lambda x: (new_filename, x, False,), + new_baseline['results'][new_filename], + ), + ) + continue + + for old_secret, new_secret in _comparison_generator( + old_baseline['results'][old_filename], + new_baseline['results'][new_filename], + compare_fn=_check_secret, + ): + if old_secret == new_secret: + # If they are the same, no point flagging it. + continue + + if old_secret: + secrets_to_compare.append( + (old_filename, old_secret, True,), + ) + else: + secrets_to_compare.append( + (new_filename, new_secret, False,), + ) + + return secrets_to_compare + + +def _comparison_generator(old_list, new_list, compare_fn): + """ + :type old_list: sorted list + :type new_list: sorted list + + :type compare_fn: function + :param compare_fn: + takes two arguments, A and B + returns 0 if equal + returns -1 if A is less than B + else returns 1 + """ + old_index = 0 + new_index = 0 + while old_index < len(old_list) and new_index < len(new_list): + old_value = old_list[old_index] + new_value = new_list[new_index] + + status = compare_fn(old_value, new_value) + if status == 0: + yield (old_value, new_value,) + old_index += 1 + new_index += 1 + elif status == -1: + yield (old_value, None,) + old_index += 1 + else: + yield (None, new_value,) + new_index += 1 + + # Catch leftovers. Only one of these while statements should run. + while old_index < len(old_list): + yield (old_list[old_index], None,) + old_index += 1 + while new_index < len(new_list): + yield (None, new_list[new_index],) + new_index += 1 + + def _clear_screen(): # pragma: no cover subprocess.call(['clear']) -def _print_context(filename, secret, count, total, plugin_settings): # pragma: no cover +def _print_context( # pragma: no cover + filename, + secret, + count, + total, + plugin_settings, + additional_header_lines=None, +): """ :type filename: str :param filename: the file currently scanned. @@ -130,6 +332,10 @@ def _print_context(filename, secret, count, total, plugin_settings): # pragma: :type plugin_settings: list :param plugin_settings: plugins used to create baseline. + :type additional_header_lines: str + :param additional_header_lines: any additional lines to add to the + header of the interactive audit display. + :raises: SecretNotFoundOnSpecifiedLineError """ print('{} {} {} {}\n{} {}\n{} {}'.format( @@ -142,6 +348,9 @@ def _print_context(filename, secret, count, total, plugin_settings): # pragma: BashColor.color('Secret Type:', Color.BOLD), BashColor.color(secret['type'], Color.PURPLE), )) + if additional_header_lines: + print(additional_header_lines) + print('-' * 10) error_obj = None @@ -334,7 +543,7 @@ def _highlight_secret(secret_line, secret_lineno, secret, filename, plugin_setti # copy the secret out of the line because .lower() from secret # generator may be different from the original value: secret_line[index_of_secret:end_of_secret], - Color.RED, + Color.RED_BACKGROUND, ), secret_line[index_of_secret + len(raw_secret):], ) diff --git a/detect_secrets/core/bidirectional_iterator.py b/detect_secrets/core/bidirectional_iterator.py index 1ba361f55..b9d41c821 100644 --- a/detect_secrets/core/bidirectional_iterator.py +++ b/detect_secrets/core/bidirectional_iterator.py @@ -10,12 +10,15 @@ def __next__(self): self.step_back_once = False else: self.index += 1 + if self.index < 0: raise StopIteration + try: result = self.collection[self.index] except IndexError: raise StopIteration + return result def next(self): diff --git a/detect_secrets/core/color.py b/detect_secrets/core/color.py index c4e27dcba..756eb68b1 100644 --- a/detect_secrets/core/color.py +++ b/detect_secrets/core/color.py @@ -5,7 +5,8 @@ class Color(Enum): NORMAL = '[0m' BOLD = '[1m' - RED = '[41m' + RED = '[91m' + RED_BACKGROUND = '[41m' LIGHT_GREEN = '[92m' PURPLE = '[95m' diff --git a/detect_secrets/core/usage.py b/detect_secrets/core/usage.py index 7776bcfdc..a76ffbe3d 100644 --- a/detect_secrets/core/usage.py +++ b/detect_secrets/core/usage.py @@ -149,13 +149,23 @@ def __init__(self, subparser): def add_arguments(self): self.parser.add_argument( 'filename', - nargs=1, + nargs='+', help=( 'Audit a given baseline file to distinguish the difference ' 'between false and true positives.' ), ) + self.parser.add_argument( + '--diff', + action='store_true', + help=( + 'Allows the comparison of two baseline files, in order to ' + 'effectively distinguish the difference between various ' + 'plugin configurations.' + ), + ) + return self diff --git a/detect_secrets/main.py b/detect_secrets/main.py index b0ef5d3b2..718955e6d 100644 --- a/detect_secrets/main.py +++ b/detect_secrets/main.py @@ -48,7 +48,24 @@ def main(argv=None): print(output) elif args.action == 'audit': - audit.audit_baseline(args.filename[0]) + if not args.diff: + audit.audit_baseline(args.filename[0]) + return 0 + + if len(args.filename) != 2: + print( + 'Must specify two files to compare!', + file=sys.stderr, + ) + return 1 + + try: + audit.compare_baselines(args.filename[0], args.filename[1]) + except audit.RedundantComparisonError: + print( + 'No difference, because it\'s the same file!', + file=sys.stderr, + ) return 0 From 81418a08e1a5d4df80e55c30e00f5fef08cca7cc Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Tue, 6 Nov 2018 18:55:52 -0800 Subject: [PATCH 2/2] adding test cases --- detect_secrets/core/audit.py | 2 +- test_data/each_secret.py | 5 + tests/core/audit_test.py | 176 +++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 test_data/each_secret.py diff --git a/detect_secrets/core/audit.py b/detect_secrets/core/audit.py index 54fb70717..00dda578a 100644 --- a/detect_secrets/core/audit.py +++ b/detect_secrets/core/audit.py @@ -164,7 +164,7 @@ def compare_baselines(old_baseline_filename, new_baseline_filename): print('Quitting...') break - if decision == 'b': + if decision == 'b': # pragma: no cover current_index -= 2 secret_iterator.step_back_on_next_iteration() diff --git a/test_data/each_secret.py b/test_data/each_secret.py new file mode 100644 index 000000000..8faa5d0db --- /dev/null +++ b/test_data/each_secret.py @@ -0,0 +1,5 @@ +red_herring = 'DEADBEEF' + +base64_secret = 'c2VjcmV0IG1lc3NhZ2Ugc28geW91J2xsIG5ldmVyIGd1ZXNzIG15IHBhc3N3b3Jk' +hex_secret = '8b1118b376c313ed420e5133ba91307817ed52c2' +basic_auth = 'http://username:whywouldyouusehttpforpasswords@example.com' diff --git a/tests/core/audit_test.py b/tests/core/audit_test.py index be9936b10..43335a8ff 100644 --- a/tests/core/audit_test.py +++ b/tests/core/audit_test.py @@ -285,6 +285,182 @@ def leapfrog_baseline(self): } +class TestCompareBaselines(object): + + def setup(self): + BashColor.disable_color() + + def teardown(self): + BashColor.enable_color() + + def test_raises_error_if_comparing_same_file(self): + with pytest.raises(audit.RedundantComparisonError): + audit.compare_baselines('foo/bar', 'foo/bar') + + def test_compare(self, mock_printer): + with self.mock_env(): + audit.compare_baselines('baselineA', 'baselineB') + + # Break up the printed messages, because we're only interested + # in the headers. + headers = [] + start_capture = True + buffer = '' + for line in mock_printer.message.splitlines(): + if line[0] == '-': + start_capture = not start_capture + continue + + if start_capture: + buffer += line + '\n' + elif buffer: + headers.append(buffer) + buffer = '' + + # This comes first, because it's found at line 1. + assert headers[0] == textwrap.dedent(""" + Secret: 1 of 4 + Filename: test_data/each_secret.py + Secret Type: Hex High Entropy String + Status: >> ADDED << + """)[1:] + + assert headers[1] == textwrap.dedent(""" + Secret: 2 of 4 + Filename: test_data/each_secret.py + Secret Type: Base64 High Entropy String + Status: >> REMOVED << + """)[1:] + + # These files come after, because filenames are sorted first + assert headers[2] == textwrap.dedent(""" + Secret: 3 of 4 + Filename: test_data/short_files/first_line.py + Secret Type: Hex High Entropy String + Status: >> REMOVED << + """)[1:] + + assert headers[3] == textwrap.dedent(""" + Secret: 4 of 4 + Filename: test_data/short_files/last_line.ini + Secret Type: Hex High Entropy String + Status: >> ADDED << + """)[1:] + + @contextmanager + def mock_env(self): + baseline_count = [0] + + def _get_baseline_from_file(_): + if baseline_count[0] == 0: + baseline_count[0] += 1 + return self.old_baseline + else: + return self.new_baseline + + with mock.patch.object( + # This mock allows us to have separate baseline values + audit, + '_get_baseline_from_file', + _get_baseline_from_file, + ), mock.patch.object( + # We don't want this test to clear the screen + audit, + '_clear_screen', + ), mock_user_input( + ['s'] * 4, + ): + yield + + @property + def old_baseline(self): + return { + 'plugins_used': [ + { + 'name': 'Base64HighEntropyString', + 'base64_limit': 4.5, + }, + { + 'name': 'HexHighEntropyString', + 'hex_limit': 3, + }, + ], + 'results': { + 'file_will_be_removed': [], + + # This file is shared, so the code should check each secret + 'test_data/each_secret.py': [ + # This secret is removed + { + 'hashed_secret': '1ca6beea06a87d5f77fa8e4523d0dc1f0965e2ce', + 'line_number': 3, + 'type': 'Base64 High Entropy String', + }, + + # This is the same secret + { + 'hashed_secret': '871deb5e9ff5ce5f777c8d3327511d05f581e755', + 'line_number': 4, + 'type': 'Hex High Entropy String', + }, + ], + + # This entire file will be "removed" + 'test_data/short_files/first_line.py': [ + { + 'hashed_secret': '0de9a11b3f37872868ca49ecd726c955e25b6e21', + 'line_number': 1, + 'type': 'Hex High Entropy String', + }, + ], + }, + } + + @property + def new_baseline(self): + return { + 'plugins_used': [ + { + 'name': 'Base64HighEntropyString', + 'base64_limit': 5.5, + }, + { + 'name': 'HexHighEntropyString', + 'hex_limit': 2, + }, + ], + 'results': { + 'file_will_be_removed': [], + + # This file is shared, so the code should check each secret + 'test_data/each_secret.py': [ + # This secret is added + { + 'hashed_secret': 'a837eb90d815a852f68f56f70b1b3fab24c46c84', + 'line_number': 1, + 'type': 'Hex High Entropy String', + }, + + # This is the same secret + { + 'hashed_secret': '871deb5e9ff5ce5f777c8d3327511d05f581e755', + 'line_number': 4, + 'type': 'Hex High Entropy String', + }, + ], + + # This entire file will be "added" + 'test_data/short_files/last_line.ini': [ + { + 'hashed_secret': '0de9a11b3f37872868ca49ecd726c955e25b6e21', + 'line_number': 5, + 'type': 'Hex High Entropy String', + }, + ], + }, + } + + class TestPrintContext(object): def setup(self):