Skip to content

Commit

Permalink
Merge pull request #205 from OiCMudkips/add_audit_result_view
Browse files Browse the repository at this point in the history
Add audit result view
  • Loading branch information
OiCMudkips committed Jul 9, 2019
2 parents 13575ed + 69a109e commit d16ec18
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 22 deletions.
107 changes: 107 additions & 0 deletions detect_secrets/core/audit.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from __future__ import print_function
from __future__ import unicode_literals

import codecs
import json
import os
import subprocess
import sys
from builtins import input
from collections import defaultdict
from copy import deepcopy

from ..plugins.common import initialize
from ..plugins.common.filetype import determine_file_type
from ..plugins.common.util import get_mapping_from_secret_type_to_class_name
from ..plugins.high_entropy_strings import HighEntropyStringsPlugin
from ..util import get_git_remotes
from ..util import get_git_sha
from .baseline import merge_results
from .bidirectional_iterator import BidirectionalIterator
from .code_snippet import CodeSnippetHighlighter
Expand All @@ -32,6 +37,22 @@ class RedundantComparisonError(Exception):
pass


AUDIT_RESULT_TO_STRING = {
True: 'positive',
False: 'negative',
None: 'unknown',
}

EMPTY_PLUGIN_AUDIT_RESULT = {
'results': {
'positive': [],
'negative': [],
'unknown': [],
},
'config': {},
}


def audit_baseline(baseline_filename):
original_baseline = _get_baseline_from_file(baseline_filename)
if not original_baseline:
Expand Down Expand Up @@ -176,6 +197,81 @@ def compare_baselines(old_baseline_filename, new_baseline_filename):
secret_iterator.step_back_on_next_iteration()


def determine_audit_results(baseline, baseline_path):
"""
Given a baseline which has been audited, returns
a dictionary describing the results of each plugin in the following form:
{
"results": {
"plugin_name1": {
"results": {
"positive": [list of secrets with is_secret: true caught by this plugin],
"negative": [list of secrets with is_secret: false caught by this plugin],
"unknown": [list of secrets with no is_secret entry caught by this plugin]
},
"config": {configuration used for the plugin}
},
...
},
"repo_info": {
"remote": "remote url",
"sha": "sha of repo checkout"
},
}
"""
all_secrets = _secret_generator(baseline)

audit_results = {
'results': defaultdict(lambda: deepcopy(EMPTY_PLUGIN_AUDIT_RESULT)),
}

secret_type_to_plugin_name = get_mapping_from_secret_type_to_class_name()

for filename, secret in all_secrets:
plaintext_line = _get_file_line(filename, secret['line_number'])
try:
secret_plaintext = get_raw_secret_value(
secret_line=plaintext_line,
secret=secret,
plugin_settings=baseline['plugins_used'],
filename=filename,
)
except SecretNotFoundOnSpecifiedLineError:
secret_plaintext = plaintext_line

plugin_name = secret_type_to_plugin_name[secret['type']]
audit_result = AUDIT_RESULT_TO_STRING[secret.get('is_secret')]
audit_results['results'][plugin_name]['results'][audit_result].append(secret_plaintext)

for plugin_config in baseline['plugins_used']:
plugin_name = plugin_config['name']
if plugin_name not in audit_results['results']:
continue

audit_results['results'][plugin_name]['config'].update(plugin_config)

git_repo_path = os.path.dirname(os.path.abspath(baseline_path))
git_sha = get_git_sha(git_repo_path)
git_remotes = get_git_remotes(git_repo_path)

if git_sha and git_remotes:
audit_results['repo_info'] = {
'remote': git_remotes[0],
'sha': git_sha,
}

return audit_results


def print_audit_results(baseline_filename):
baseline = _get_baseline_from_file(baseline_filename)
if not baseline:
print('Failed to retrieve baseline from {filename}'.format(filename=baseline_filename))
return

print(json.dumps(determine_audit_results(baseline, baseline_filename)))


def _get_baseline_from_file(filename): # pragma: no cover
try:
with open(filename) as f:
Expand Down Expand Up @@ -424,6 +520,17 @@ def _handle_user_decision(decision, secret):
del secret['is_secret']


def _get_file_line(filename, line_number):
"""
Attempts to read a given line from the input file.
"""
try:
with codecs.open(filename, encoding='utf-8') as f:
return f.read().splitlines()[line_number - 1] # line numbers are 1-indexed
except (OSError, IOError, IndexError):
return None


def _get_secret_with_context(
filename,
secret,
Expand Down
13 changes: 12 additions & 1 deletion detect_secrets/core/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def add_arguments(self):
),
)

self.parser.add_argument(
action_parser = self.parser.add_mutually_exclusive_group()

action_parser.add_argument(
'--diff',
action='store_true',
help=(
Expand All @@ -219,6 +221,15 @@ def add_arguments(self):
),
)

action_parser.add_argument(
'--display-results',
action='store_true',
help=(
'Displays the results of an interactive auditing session '
'which have been saved to a baseline file.'
),
)

return self


Expand Down
6 changes: 5 additions & 1 deletion detect_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ def main(argv=None):
)

elif args.action == 'audit':
if not args.diff:
if not args.diff and not args.display_results:
audit.audit_baseline(args.filename[0])
return 0

if args.display_results:
audit.print_audit_results(args.filename[0])
return 0

if len(args.filename) != 2:
print(
'Must specify two files to compare!',
Expand Down
22 changes: 2 additions & 20 deletions detect_secrets/plugins/common/initialize.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
"""Intelligent initialization of plugins."""
try:
from functools import lru_cache
except ImportError: # pragma: no cover
from functools32 import lru_cache

from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..common.util import get_mapping_from_secret_type_to_class_name
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
Expand Down Expand Up @@ -195,7 +191,7 @@ def from_secret_type(secret_type, settings):
... },
... ]
"""
mapping = _get_mapping_from_secret_type_to_class_name()
mapping = get_mapping_from_secret_type_to_class_name()
try:
classname = mapping[secret_type]
except KeyError:
Expand All @@ -214,17 +210,3 @@ def from_secret_type(secret_type, settings):

**plugin_init_vars
)


@lru_cache(maxsize=1)
def _get_mapping_from_secret_type_to_class_name():
"""Returns secret_type => plugin classname"""
mapping = {}
for key, value in globals().items():
try:
if issubclass(value, BasePlugin) and value != BasePlugin:
mapping[value.secret_type] = key
except TypeError:
pass

return mapping
31 changes: 31 additions & 0 deletions detect_secrets/plugins/common/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
try:
from functools import lru_cache
except ImportError: # pragma: no cover
from functools32 import lru_cache

# These plugins need to be imported here so that globals()
# can find them.
from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
from ..private_key import PrivateKeyDetector # noqa: F401
from ..slack import SlackDetector # noqa: F401
from ..stripe import StripeDetector # noqa: F401


@lru_cache(maxsize=1)
def get_mapping_from_secret_type_to_class_name():
"""Returns secret_type => plugin classname"""
mapping = {}
for key, value in globals().items():
try:
if issubclass(value, BasePlugin) and value != BasePlugin:
mapping[value.secret_type] = key
except TypeError:
pass

return mapping
48 changes: 48 additions & 0 deletions detect_secrets/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import subprocess


def get_root_directory(): # pragma: no cover
Expand All @@ -15,3 +16,50 @@ def get_relative_path(root, path):
return os.path.realpath(
os.path.join(root, path),
)[len(os.getcwd() + '/'):]


def get_git_sha(path):
"""Returns the sha of the git checkout at the input path
:type path: str
:param path: directory of the git checkout
:rtype: str|None
:returns: git sha of the input path
"""
try:
with open(os.devnull, 'w') as fnull:
return subprocess.check_output(
['git', 'rev-parse', '--verify', 'HEAD'],
stderr=fnull,
cwd=path,
).decode('utf-8').split()[0]
except (subprocess.CalledProcessError, OSError, IndexError): # pragma: no cover
return None


def get_git_remotes(path):
"""Returns a list of unique git remotes of the checkout
at the input path
:type path: str
:param path: directory of the git checkout
:rtype: List<str>|None
:returns: A list of unique git urls
"""
try:
with open(os.devnull, 'w') as fnull:
git_remotes = subprocess.check_output(
['git', 'remote', '-v'],
stderr=fnull,
cwd=path,
).decode('utf-8').split('\n')
return list({
git_remote.split()[1]
for git_remote
in git_remotes
if len(git_remote) > 2 # split('\n') produces an empty list
})
except (subprocess.CalledProcessError, OSError): # pragma: no cover
return None
Loading

0 comments on commit d16ec18

Please sign in to comment.