Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add audit result view #205

Merged
merged 7 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions detect_secrets/core/audit.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from __future__ import print_function
from __future__ import unicode_literals

import codecs
import json
import os
import subprocess
import sys
from builtins import input
from collections import defaultdict
from copy import deepcopy

from ..plugins.common import initialize
from ..plugins.common.filetype import determine_file_type
from ..plugins.common.util import get_mapping_from_secret_type_to_class_name
from ..plugins.high_entropy_strings import HighEntropyStringsPlugin
from ..util import get_git_remotes
from ..util import get_git_sha
from .baseline import merge_results
from .bidirectional_iterator import BidirectionalIterator
from .code_snippet import CodeSnippetHighlighter
Expand All @@ -32,6 +37,22 @@ class RedundantComparisonError(Exception):
pass


AUDIT_RESULT_TO_STRING = {
True: 'positive',
False: 'negative',
None: 'unknown',
}

EMPTY_PLUGIN_AUDIT_RESULT = {
'results': {
'positive': [],
'negative': [],
'unknown': [],
},
'config': {},
}


def audit_baseline(baseline_filename):
original_baseline = _get_baseline_from_file(baseline_filename)
if not original_baseline:
Expand Down Expand Up @@ -176,6 +197,81 @@ def compare_baselines(old_baseline_filename, new_baseline_filename):
secret_iterator.step_back_on_next_iteration()


def determine_audit_results(baseline, baseline_path):
"""
Given a baseline which has been audited, returns
a dictionary describing the results of each plugin in the following form:
{
"results": {
"plugin_name1": {
"results": {
"positive": [list of secrets with is_secret: true caught by this plugin],
"negative": [list of secrets with is_secret: false caught by this plugin],
"unknown": [list of secrets with no is_secret entry caught by this plugin]
},
"config": {configuration used for the plugin}
},
...
},
"repo_info": {
"remote": "remote url",
"sha": "sha of repo checkout"
},
}
"""
all_secrets = _secret_generator(baseline)

audit_results = {
'results': defaultdict(lambda: deepcopy(EMPTY_PLUGIN_AUDIT_RESULT)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

results feels a little redundant, e.g.

  ...
  },
  "results": {
    "Base64HighEntropyString": {
      "config": {
        "base64_limit": 4.5,
        "name": "Base64HighEntropyString"
      },
      "results": {
        "positive": [],
        ...

vs.

  ...
  },
  "Base64HighEntropyString": {
    "config": {
      "base64_limit": 4.5,
      "name": "Base64HighEntropyString"
    },
    "results": {
    "positive": [],
    ...

We could rename the key to plugins, but removing it as in the latter snippet seems okay to me since we will mostly use this for plugin development.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great use of lambda btw 🐑

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with renaming it to plugins, but I think there needs to be a key. If we can get git config info we dump it into a key called repo_info on the top level. If the plugins were also at the top-level then it would be really annoying for clients to have to filter out the plugin results in iteration (assuming the client is a machine or jq).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}

secret_type_to_plugin_name = get_mapping_from_secret_type_to_class_name()

for filename, secret in all_secrets:
plaintext_line = _get_file_line(filename, secret['line_number'])
try:
secret_plaintext = get_raw_secret_value(
secret_line=plaintext_line,
secret=secret,
plugin_settings=baseline['plugins_used'],
filename=filename,
)
except SecretNotFoundOnSpecifiedLineError:
secret_plaintext = plaintext_line

plugin_name = secret_type_to_plugin_name[secret['type']]
audit_result = AUDIT_RESULT_TO_STRING[secret.get('is_secret')]
audit_results['results'][plugin_name]['results'][audit_result].append(secret_plaintext)

for plugin_config in baseline['plugins_used']:
plugin_name = plugin_config['name']
if plugin_name not in audit_results['results']:
continue

audit_results['results'][plugin_name]['config'].update(plugin_config)

git_repo_path = os.path.dirname(os.path.abspath(baseline_path))
git_sha = get_git_sha(git_repo_path)
git_remotes = get_git_remotes(git_repo_path)

if git_sha and git_remotes:
audit_results['repo_info'] = {
'remote': git_remotes[0],
'sha': git_sha,
}

return audit_results


def print_audit_results(baseline_filename):
baseline = _get_baseline_from_file(baseline_filename)
if not baseline:
print('Failed to retrieve baseline from {filename}'.format(filename=baseline_filename))
return

print(json.dumps(determine_audit_results(baseline, baseline_filename)))


def _get_baseline_from_file(filename): # pragma: no cover
try:
with open(filename) as f:
Expand Down Expand Up @@ -424,6 +520,17 @@ def _handle_user_decision(decision, secret):
del secret['is_secret']


def _get_file_line(filename, line_number):
"""
Attempts to read a given line from the input file.
"""
try:
with codecs.open(filename, encoding='utf-8') as f:
return f.read().splitlines()[line_number - 1] # line numbers are 1-indexed
except (OSError, IOError, IndexError):
return None


def _get_secret_with_context(
filename,
secret,
Expand Down
13 changes: 12 additions & 1 deletion detect_secrets/core/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def add_arguments(self):
),
)

self.parser.add_argument(
action_parser = self.parser.add_mutually_exclusive_group()

action_parser.add_argument(
'--diff',
action='store_true',
help=(
Expand All @@ -219,6 +221,15 @@ def add_arguments(self):
),
)

action_parser.add_argument(
'--display-results',
action='store_true',
help=(
'Displays the results of an interactive auditing session '
'which have been saved to a baseline file.'
),
)

return self


Expand Down
6 changes: 5 additions & 1 deletion detect_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ def main(argv=None):
)

elif args.action == 'audit':
if not args.diff:
if not args.diff and not args.display_results:
audit.audit_baseline(args.filename[0])
return 0

if args.display_results:
audit.print_audit_results(args.filename[0])
return 0

if len(args.filename) != 2:
print(
'Must specify two files to compare!',
Expand Down
22 changes: 2 additions & 20 deletions detect_secrets/plugins/common/initialize.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
"""Intelligent initialization of plugins."""
try:
from functools import lru_cache
except ImportError: # pragma: no cover
from functools32 import lru_cache

from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..common.util import get_mapping_from_secret_type_to_class_name
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
Expand Down Expand Up @@ -195,7 +191,7 @@ def from_secret_type(secret_type, settings):
... },
... ]
"""
mapping = _get_mapping_from_secret_type_to_class_name()
mapping = get_mapping_from_secret_type_to_class_name()
try:
classname = mapping[secret_type]
except KeyError:
Expand All @@ -214,17 +210,3 @@ def from_secret_type(secret_type, settings):

**plugin_init_vars
)


@lru_cache(maxsize=1)
def _get_mapping_from_secret_type_to_class_name():
"""Returns secret_type => plugin classname"""
mapping = {}
for key, value in globals().items():
try:
if issubclass(value, BasePlugin) and value != BasePlugin:
mapping[value.secret_type] = key
except TypeError:
pass

return mapping
31 changes: 31 additions & 0 deletions detect_secrets/plugins/common/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
try:
from functools import lru_cache
except ImportError: # pragma: no cover
from functools32 import lru_cache

# These plugins need to be imported here so that globals()
# can find them.
from ..artifactory import ArtifactoryDetector # noqa: F401
from ..aws import AWSKeyDetector # noqa: F401
from ..base import BasePlugin
from ..basic_auth import BasicAuthDetector # noqa: F401
from ..high_entropy_strings import Base64HighEntropyString # noqa: F401
from ..high_entropy_strings import HexHighEntropyString # noqa: F401
from ..keyword import KeywordDetector # noqa: F401
from ..private_key import PrivateKeyDetector # noqa: F401
from ..slack import SlackDetector # noqa: F401
from ..stripe import StripeDetector # noqa: F401


@lru_cache(maxsize=1)
def get_mapping_from_secret_type_to_class_name():
"""Returns secret_type => plugin classname"""
mapping = {}
for key, value in globals().items():
try:
if issubclass(value, BasePlugin) and value != BasePlugin:
mapping[value.secret_type] = key
except TypeError:
pass

return mapping
48 changes: 48 additions & 0 deletions detect_secrets/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import subprocess


def get_root_directory(): # pragma: no cover
Expand All @@ -15,3 +16,50 @@ def get_relative_path(root, path):
return os.path.realpath(
os.path.join(root, path),
)[len(os.getcwd() + '/'):]


def get_git_sha(path):
"""Returns the sha of the git checkout at the input path

:type path: str
:param path: directory of the git checkout

:rtype: str|None
:returns: git sha of the input path
"""
try:
with open(os.devnull, 'w') as fnull:
return subprocess.check_output(
['git', 'rev-parse', '--verify', 'HEAD'],
stderr=fnull,
cwd=path,
).decode('utf-8').split()[0]
except (subprocess.CalledProcessError, OSError, IndexError): # pragma: no cover
return None


def get_git_remotes(path):
"""Returns a list of unique git remotes of the checkout
at the input path

:type path: str
:param path: directory of the git checkout

:rtype: List<str>|None
:returns: A list of unique git urls
"""
try:
with open(os.devnull, 'w') as fnull:
git_remotes = subprocess.check_output(
['git', 'remote', '-v'],
stderr=fnull,
cwd=path,
).decode('utf-8').split('\n')
return list({
git_remote.split()[1]
for git_remote
in git_remotes
if len(git_remote) > 2 # split('\n') produces an empty list
})
except (subprocess.CalledProcessError, OSError): # pragma: no cover
return None
Loading