diff --git a/detect_secrets/core/audit.py b/detect_secrets/core/audit.py index 81e09754d..f36d0c22e 100644 --- a/detect_secrets/core/audit.py +++ b/detect_secrets/core/audit.py @@ -1,16 +1,21 @@ from __future__ import print_function from __future__ import unicode_literals +import codecs import json import os import subprocess import sys from builtins import input from collections import defaultdict +from copy import deepcopy from ..plugins.common import initialize from ..plugins.common.filetype import determine_file_type +from ..plugins.common.util import get_mapping_from_secret_type_to_class_name from ..plugins.high_entropy_strings import HighEntropyStringsPlugin +from ..util import get_git_remotes +from ..util import get_git_sha from .baseline import merge_results from .bidirectional_iterator import BidirectionalIterator from .code_snippet import CodeSnippetHighlighter @@ -32,6 +37,22 @@ class RedundantComparisonError(Exception): pass +AUDIT_RESULT_TO_STRING = { + True: 'positive', + False: 'negative', + None: 'unknown', +} + +EMPTY_PLUGIN_AUDIT_RESULT = { + 'results': { + 'positive': [], + 'negative': [], + 'unknown': [], + }, + 'config': {}, +} + + def audit_baseline(baseline_filename): original_baseline = _get_baseline_from_file(baseline_filename) if not original_baseline: @@ -176,6 +197,81 @@ def compare_baselines(old_baseline_filename, new_baseline_filename): secret_iterator.step_back_on_next_iteration() +def determine_audit_results(baseline, baseline_path): + """ + Given a baseline which has been audited, returns + a dictionary describing the results of each plugin in the following form: + { + "results": { + "plugin_name1": { + "results": { + "positive": [list of secrets with is_secret: true caught by this plugin], + "negative": [list of secrets with is_secret: false caught by this plugin], + "unknown": [list of secrets with no is_secret entry caught by this plugin] + }, + "config": {configuration used for the plugin} + }, + ... + }, + "repo_info": { + "remote": "remote url", + "sha": "sha of repo checkout" + }, + } + """ + all_secrets = _secret_generator(baseline) + + audit_results = { + 'results': defaultdict(lambda: deepcopy(EMPTY_PLUGIN_AUDIT_RESULT)), + } + + secret_type_to_plugin_name = get_mapping_from_secret_type_to_class_name() + + for filename, secret in all_secrets: + plaintext_line = _get_file_line(filename, secret['line_number']) + try: + secret_plaintext = get_raw_secret_value( + secret_line=plaintext_line, + secret=secret, + plugin_settings=baseline['plugins_used'], + filename=filename, + ) + except SecretNotFoundOnSpecifiedLineError: + secret_plaintext = plaintext_line + + plugin_name = secret_type_to_plugin_name[secret['type']] + audit_result = AUDIT_RESULT_TO_STRING[secret.get('is_secret')] + audit_results['results'][plugin_name]['results'][audit_result].append(secret_plaintext) + + for plugin_config in baseline['plugins_used']: + plugin_name = plugin_config['name'] + if plugin_name not in audit_results['results']: + continue + + audit_results['results'][plugin_name]['config'].update(plugin_config) + + git_repo_path = os.path.dirname(os.path.abspath(baseline_path)) + git_sha = get_git_sha(git_repo_path) + git_remotes = get_git_remotes(git_repo_path) + + if git_sha and git_remotes: + audit_results['repo_info'] = { + 'remote': git_remotes[0], + 'sha': git_sha, + } + + return audit_results + + +def print_audit_results(baseline_filename): + baseline = _get_baseline_from_file(baseline_filename) + if not baseline: + print('Failed to retrieve baseline from {filename}'.format(filename=baseline_filename)) + return + + print(json.dumps(determine_audit_results(baseline, baseline_filename))) + + def _get_baseline_from_file(filename): # pragma: no cover try: with open(filename) as f: @@ -424,6 +520,17 @@ def _handle_user_decision(decision, secret): del secret['is_secret'] +def _get_file_line(filename, line_number): + """ + Attempts to read a given line from the input file. + """ + try: + with codecs.open(filename, encoding='utf-8') as f: + return f.read().splitlines()[line_number - 1] # line numbers are 1-indexed + except (OSError, IOError, IndexError): + return None + + def _get_secret_with_context( filename, secret, diff --git a/detect_secrets/core/usage.py b/detect_secrets/core/usage.py index a9ee590a0..c60df451b 100644 --- a/detect_secrets/core/usage.py +++ b/detect_secrets/core/usage.py @@ -209,7 +209,9 @@ def add_arguments(self): ), ) - self.parser.add_argument( + action_parser = self.parser.add_mutually_exclusive_group() + + action_parser.add_argument( '--diff', action='store_true', help=( @@ -219,6 +221,15 @@ def add_arguments(self): ), ) + action_parser.add_argument( + '--display-results', + action='store_true', + help=( + 'Displays the results of an interactive auditing session ' + 'which have been saved to a baseline file.' + ), + ) + return self diff --git a/detect_secrets/main.py b/detect_secrets/main.py index f294c00ad..5e3012746 100644 --- a/detect_secrets/main.py +++ b/detect_secrets/main.py @@ -60,10 +60,14 @@ def main(argv=None): ) elif args.action == 'audit': - if not args.diff: + if not args.diff and not args.display_results: audit.audit_baseline(args.filename[0]) return 0 + if args.display_results: + audit.print_audit_results(args.filename[0]) + return 0 + if len(args.filename) != 2: print( 'Must specify two files to compare!', diff --git a/detect_secrets/plugins/common/initialize.py b/detect_secrets/plugins/common/initialize.py index 294b81708..c7b80b30c 100644 --- a/detect_secrets/plugins/common/initialize.py +++ b/detect_secrets/plugins/common/initialize.py @@ -1,13 +1,9 @@ """Intelligent initialization of plugins.""" -try: - from functools import lru_cache -except ImportError: # pragma: no cover - from functools32 import lru_cache - from ..artifactory import ArtifactoryDetector # noqa: F401 from ..aws import AWSKeyDetector # noqa: F401 from ..base import BasePlugin from ..basic_auth import BasicAuthDetector # noqa: F401 +from ..common.util import get_mapping_from_secret_type_to_class_name from ..high_entropy_strings import Base64HighEntropyString # noqa: F401 from ..high_entropy_strings import HexHighEntropyString # noqa: F401 from ..keyword import KeywordDetector # noqa: F401 @@ -195,7 +191,7 @@ def from_secret_type(secret_type, settings): ... }, ... ] """ - mapping = _get_mapping_from_secret_type_to_class_name() + mapping = get_mapping_from_secret_type_to_class_name() try: classname = mapping[secret_type] except KeyError: @@ -214,17 +210,3 @@ def from_secret_type(secret_type, settings): **plugin_init_vars ) - - -@lru_cache(maxsize=1) -def _get_mapping_from_secret_type_to_class_name(): - """Returns secret_type => plugin classname""" - mapping = {} - for key, value in globals().items(): - try: - if issubclass(value, BasePlugin) and value != BasePlugin: - mapping[value.secret_type] = key - except TypeError: - pass - - return mapping diff --git a/detect_secrets/plugins/common/util.py b/detect_secrets/plugins/common/util.py new file mode 100644 index 000000000..6bef437f4 --- /dev/null +++ b/detect_secrets/plugins/common/util.py @@ -0,0 +1,31 @@ +try: + from functools import lru_cache +except ImportError: # pragma: no cover + from functools32 import lru_cache + +# These plugins need to be imported here so that globals() +# can find them. +from ..artifactory import ArtifactoryDetector # noqa: F401 +from ..aws import AWSKeyDetector # noqa: F401 +from ..base import BasePlugin +from ..basic_auth import BasicAuthDetector # noqa: F401 +from ..high_entropy_strings import Base64HighEntropyString # noqa: F401 +from ..high_entropy_strings import HexHighEntropyString # noqa: F401 +from ..keyword import KeywordDetector # noqa: F401 +from ..private_key import PrivateKeyDetector # noqa: F401 +from ..slack import SlackDetector # noqa: F401 +from ..stripe import StripeDetector # noqa: F401 + + +@lru_cache(maxsize=1) +def get_mapping_from_secret_type_to_class_name(): + """Returns secret_type => plugin classname""" + mapping = {} + for key, value in globals().items(): + try: + if issubclass(value, BasePlugin) and value != BasePlugin: + mapping[value.secret_type] = key + except TypeError: + pass + + return mapping diff --git a/detect_secrets/util.py b/detect_secrets/util.py index 286d0cbc2..b11c30d54 100644 --- a/detect_secrets/util.py +++ b/detect_secrets/util.py @@ -1,4 +1,5 @@ import os +import subprocess def get_root_directory(): # pragma: no cover @@ -15,3 +16,50 @@ def get_relative_path(root, path): return os.path.realpath( os.path.join(root, path), )[len(os.getcwd() + '/'):] + + +def get_git_sha(path): + """Returns the sha of the git checkout at the input path + + :type path: str + :param path: directory of the git checkout + + :rtype: str|None + :returns: git sha of the input path + """ + try: + with open(os.devnull, 'w') as fnull: + return subprocess.check_output( + ['git', 'rev-parse', '--verify', 'HEAD'], + stderr=fnull, + cwd=path, + ).decode('utf-8').split()[0] + except (subprocess.CalledProcessError, OSError, IndexError): # pragma: no cover + return None + + +def get_git_remotes(path): + """Returns a list of unique git remotes of the checkout + at the input path + + :type path: str + :param path: directory of the git checkout + + :rtype: List|None + :returns: A list of unique git urls + """ + try: + with open(os.devnull, 'w') as fnull: + git_remotes = subprocess.check_output( + ['git', 'remote', '-v'], + stderr=fnull, + cwd=path, + ).decode('utf-8').split('\n') + return list({ + git_remote.split()[1] + for git_remote + in git_remotes + if len(git_remote) > 2 # split('\n') produces an empty list + }) + except (subprocess.CalledProcessError, OSError): # pragma: no cover + return None diff --git a/tests/core/audit_test.py b/tests/core/audit_test.py index bf9f8d392..58e8e5cc2 100644 --- a/tests/core/audit_test.py +++ b/tests/core/audit_test.py @@ -475,6 +475,227 @@ def new_baseline(self): } +class TestDetermineAuditResults(object): + + @pytest.fixture + def mock_get_raw_secret_value(self): + with mock.patch.object( + audit, + 'get_raw_secret_value', + autospec=True, + ) as _mock: + yield _mock + + @pytest.fixture + def mock_get_git_sha(self): + with mock.patch( + 'detect_secrets.core.audit.get_git_sha', + return_value=None, + autospec=True, + ) as _mock: + yield _mock + + @pytest.fixture + def mock_get_git_remotes(self): + with mock.patch( + 'detect_secrets.core.audit.get_git_remotes', + return_value=None, + autospec=True, + ) as _mock: + yield _mock + + def get_audited_baseline(self, plugin_config, is_secret): + """ + Returns a baseline in dict form with 1 plugin and 1 secret. + :param plugin_config: An optional dict for the plugin's config. + :param is_secret: An optional bool for whether the secret has been + audited. + """ + baseline_fixture = { + 'plugins_used': [ + { + 'name': 'HexHighEntropyString', + }, + ], + 'results': { + 'file': [ + { + 'hashed_secret': 'a837eb90d815a852f68f56f70b1b3fab24c46c84', + 'line_number': 1, + 'type': 'Hex High Entropy String', + }, + ], + }, + } + + if plugin_config: + baseline_fixture['plugins_used'][0].update(plugin_config) + + if is_secret is not None: + baseline_fixture['results']['file'][0]['is_secret'] = is_secret + + return baseline_fixture + + @pytest.mark.parametrize( + 'plugin_config', [{}, {'hex_limit': 2}], + ) + def test_determine_audit_results_plugin_config( + self, + mock_get_raw_secret_value, + mock_get_git_remotes, + mock_get_git_sha, + plugin_config, + ): + plaintext_secret = 'some_plaintext_secret' + mock_get_raw_secret_value.return_value = plaintext_secret + baseline = self.get_audited_baseline(plugin_config=plugin_config, is_secret=None) + + results = audit.determine_audit_results(baseline, '.secrets.baseline') + + assert results['results']['HexHighEntropyString']['config'].items() \ + >= plugin_config.items() + + @pytest.mark.parametrize( + 'is_secret, expected_audited_result', + [ + (True, 'positive'), + (False, 'negative'), + (None, 'unknown'), + ], + ) + def test_determine_audit_results_is_secret( + self, + mock_get_raw_secret_value, + mock_get_git_remotes, + mock_get_git_sha, + is_secret, + expected_audited_result, + ): + plaintext_secret = 'some_plaintext_secret' + mock_get_raw_secret_value.return_value = plaintext_secret + baseline = self.get_audited_baseline(plugin_config={}, is_secret=is_secret) + + results = audit.determine_audit_results(baseline, '.secrets.baseline') + + for audited_result, list_of_secrets \ + in results['results']['HexHighEntropyString']['results'].items(): + if audited_result == expected_audited_result: + assert plaintext_secret in list_of_secrets + else: + assert len(list_of_secrets) == 0 + + @pytest.mark.parametrize( + 'git_remotes, git_sha, expected_git_info', + [ + (None, None, None), + (None, 'abc', None), + (['git.com/git.git'], None, None), + ( + ['git.com/git.git'], + 'abc', + {'remote': 'git.com/git.git', 'sha': 'abc'}, + ), + ( + ['git.com/git.git', 'hub.com/git.git'], + 'abc', + {'remote': 'git.com/git.git', 'sha': 'abc'}, + ), + ], + ) + def test_determine_audit_results_git_info( + self, + mock_get_raw_secret_value, + mock_get_git_remotes, + mock_get_git_sha, + git_remotes, + git_sha, + expected_git_info, + ): + plaintext_secret = 'some_plaintext_secret' + mock_get_raw_secret_value.return_value = plaintext_secret + mock_get_git_remotes.return_value = git_remotes + mock_get_git_sha.return_value = git_sha + + baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + + results = audit.determine_audit_results(baseline, '.secrets.baseline') + + if expected_git_info: + assert results['repo_info'] == expected_git_info + else: + assert 'repo_info' not in results + + def test_determine_audit_results_secret_not_found( + self, + mock_get_raw_secret_value, + mock_get_git_remotes, + mock_get_git_sha, + ): + mock_get_raw_secret_value.side_effect = audit.SecretNotFoundOnSpecifiedLineError(1) + baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + + whole_plaintext_line = 'a plaintext line' + + with mock.patch.object( + audit, + '_get_file_line', + return_value=whole_plaintext_line, + autospec=True, + ): + results = audit.determine_audit_results(baseline, '.secrets.baseline') + + assert whole_plaintext_line in \ + results['results']['HexHighEntropyString']['results']['positive'] + + +class TestPrintAuditResults(): + + @contextmanager + def mock_env(self, baseline): + with mock.patch.object( + # We mock this, so we don't need to do any file I/O. + audit, + '_get_baseline_from_file', + return_value=baseline, + ) as _mock: + yield _mock + + @pytest.mark.parametrize( + 'mock_baseline, expected_message', + [ + ( + {}, + 'Failed to retrieve baseline', + ), + ( + None, + 'Failed to retrieve baseline', + ), + ( + {'plugins_used': {'name': 'MyFakePlugin'}, 'results': {}}, + '{}', + ), + ], + ) + def test_print_audit_results_none( + self, mock_printer, mock_baseline, expected_message, + ): + """ + This doesn't actually test for correctness; we rely on + good tests for determine_audit_results. + """ + with self.mock_env( + baseline=mock_baseline, + ), mock.patch.object( + audit, + 'determine_audit_results', + return_value={}, + ): + audit.print_audit_results('somefilename') + + assert expected_message in mock_printer.message + + class TestPrintContext(object): def run_logic(self, secret=None, secret_lineno=15, settings=None): diff --git a/tests/util_test.py b/tests/util_test.py new file mode 100644 index 000000000..bd03cdd68 --- /dev/null +++ b/tests/util_test.py @@ -0,0 +1,54 @@ +import subprocess + +import mock +import pytest + +from detect_secrets import util + +GIT_REPO_SHA = b'cbb33d8c545ccf5c55fdcc7d5b0218078598e677' +GIT_REMOTES_VERBOSE_ONE_URL = ( + b'origin\tgit://a.com/a/a.git\t(fetch)\n' + b'origin\tgit://a.com/a/a.git\t(push)\n' +) +GIT_REMOTES_VERBOSE_TWO_URLS = ( + b'origin\tgit://a.com/a/a.git\t(fetch)\n' + b'origin\tgit://a.com/a/a.git\t(push)\n' + b'origin\tgit://b.com/b/b.git\t(fetch)\n' + b'origin\tgit://b.com/b/b.git\t(push)\n' +) + + +def test_get_git_sha(): + with mock.patch.object( + subprocess, + 'check_output', + autospec=True, + return_value=GIT_REPO_SHA, + ): + assert util.get_git_sha('.') == GIT_REPO_SHA.decode('utf-8') + + +@pytest.mark.parametrize( + 'git_remotes_result, expected_urls', + [ + ( + GIT_REMOTES_VERBOSE_ONE_URL, + {'git://a.com/a/a.git'}, + ), + ( + GIT_REMOTES_VERBOSE_TWO_URLS, + {'git://a.com/a/a.git', 'git://b.com/b/b.git'}, + ), + ], +) +def test_get_git_remotes( + git_remotes_result, + expected_urls, +): + with mock.patch.object( + subprocess, + 'check_output', + autospec=True, + return_value=git_remotes_result, + ): + assert expected_urls == set(util.get_git_remotes('.'))