Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding secret verification as builtin filter #356

Merged
merged 5 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions detect_secrets/core/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import initialize # noqa: F401
from .util import Plugin # noqa: F401
43 changes: 39 additions & 4 deletions detect_secrets/core/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from ..transformers import get_transformers
from ..transformers import ParsingError
from ..types import SelfAwareCallable
from ..util.code_snippet import get_code_snippet
from ..util.inject import get_injectable_variables
from ..util.inject import inject_variables_into_function
from .log import log
from .plugins.util import Plugin
from .plugins import Plugin
from .potential_secret import PotentialSecret


Expand All @@ -26,12 +27,28 @@ def scan_line(line: str) -> Generator[PotentialSecret, None, None]:
)

for plugin in get_plugins():
yield from _scan_line(
for secret in _scan_line(
plugin=plugin,
filename='adhoc-string-scan',
line=line,
line_number=0,
)
):
for filter_fn in get_filters_with_parameter('context'):
if inject_variables_into_function(
filter_fn,
filename=secret.filename,
secret=secret.secret_value,
plugin=plugin,
line=line,
context=get_code_snippet(
lines=[line],
line_number=1,
),
):
log.debug(f'Skipping "{secret.secret_value}" due to `{filter_fn.path}`.')
break
else:
yield secret


def scan_file(filename: str) -> Generator[PotentialSecret, None, None]:
Expand Down Expand Up @@ -140,6 +157,8 @@ def _process_line_based_plugins(
lines: List[Tuple[int, str]],
filename: str,
) -> Generator[PotentialSecret, None, None]:
line_content = [line[1] for line in lines]

# NOTE: We iterate through lines *then* plugins, because we want to quit early if any of the
# filters return True.
for line_number, line in lines:
Expand All @@ -153,7 +172,23 @@ def _process_line_based_plugins(
continue

for plugin in get_plugins():
yield from _scan_line(plugin, filename, line, line_number)
for secret in _scan_line(plugin, filename, line, line_number):
for filter_fn in get_filters_with_parameter('context'):
if inject_variables_into_function(
filter_fn,
filename=secret.filename,
secret=secret.secret_value,
plugin=plugin,
line=line,
context=get_code_snippet(
lines=line_content,
line_number=line_number,
),
):
log.debug(f'Skipping "{secret.secret_value}" due to `{filter_fn.path}`.')
break
else:
yield secret


def _scan_line(
Expand Down
7 changes: 7 additions & 0 deletions detect_secrets/core/upgrades/v1_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def _migrate_filters(baseline: Dict[str, Any]) -> None:
{
'path': 'detect_secrets.filters.heuristic.is_likely_id_string',
},
{
'path': 'detect_secrets.filters.common.is_ignored_due_to_verification_policies',

# Hard-code this, just in case VerifiedResult enum values changes.
# This corresponds to VerifiedResult.UNVERIFIED
'min_level': 2,
},
]

if baseline.get('exclude'):
Expand Down
24 changes: 23 additions & 1 deletion detect_secrets/core/usage/filters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse

from ... import filters
from ...constants import VerifiedResult
from ...settings import get_settings
from .common import valid_path

Expand All @@ -14,12 +15,18 @@ def add_filter_options(parent: argparse.ArgumentParser) -> None:
),
)

parser.add_argument(
verify_group = parser.add_mutually_exclusive_group()
verify_group.add_argument(
'-n',
'--no-verify',
action='store_true',
help='Disables additional verification of secrets via network call.',
)
verify_group.add_argument(
'--only-verified',
action='store_true',
help='Only flags secrets that can be verified.',
)

parser.add_argument(
'--exclude-lines',
Expand Down Expand Up @@ -60,3 +67,18 @@ def parse_args(args: argparse.Namespace) -> None:
and args.word_list_file
):
filters.wordlist.initialize(args.word_list_file)

if not args.no_verify:
get_settings().filters[
'detect_secrets.filters.common.is_ignored_due_to_verification_policies'
] = {
'min_level': (
VerifiedResult.VERIFIED_TRUE
if args.only_verified
else VerifiedResult.UNVERIFIED
).value,
}
else:
get_settings().disable_filters(
'detect_secrets.filters.common.is_ignored_due_to_verification_policies',
)
4 changes: 2 additions & 2 deletions detect_secrets/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ class InvalidBaselineError(ValueError):
pass


class SecretNotFoundOnSpecifiedLineError(Exception):
class SecretNotFoundOnSpecifiedLineError(ValueError):
def __init__(self, line):
super(SecretNotFoundOnSpecifiedLineError, self).__init__(
super().__init__(
'ERROR: Secret not found on line {}!\n'.format(line)
+ 'Try recreating your baseline to fix this issue.',
)
51 changes: 51 additions & 0 deletions detect_secrets/filters/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import os
from functools import lru_cache

import requests

from ..constants import VerifiedResult
from ..core.plugins import Plugin
from ..settings import get_settings
from ..util.code_snippet import CodeSnippet
from ..util.inject import get_injectable_variables
from ..util.inject import inject_variables_into_function
from .util import get_caller_path


Expand All @@ -17,3 +24,47 @@ def is_baseline_file(filename: str) -> bool:
def _get_baseline_filename() -> str:
path = get_caller_path(offset=1)
return get_settings().filters[path]['filename']


def is_ignored_due_to_verification_policies(
secret: str,
plugin: Plugin,
context: CodeSnippet,
) -> bool:
"""
Valid policies include:
- Only VERIFIED_TRUE
- Can be UNVERIFIED or VERIFIED_TRUE
- Disabled check.

There's no such thing as "only verified false", because if you're going to verify
something, and it's verified false, why are you still including it as a valid secret?
"""
function = plugin.__class__.verify
if not hasattr(function, 'injectable_variables'):
function.injectable_variables = set(get_injectable_variables(plugin.verify))
function.path = f'{plugin.__class__.__name__}.verify'

try:
verify_result = inject_variables_into_function(
function,
self=plugin,
secret=secret,
context=context,
)
except requests.exceptions.RequestException:
verify_result = VerifiedResult.UNVERIFIED

if not verify_result:
return False

if verify_result.value < _get_verification_policy().value:
return True

return False


@lru_cache(maxsize=1)
def _get_verification_policy() -> VerifiedResult:
path = get_caller_path(offset=1)
return VerifiedResult(get_settings().filters[path]['min_level'])
18 changes: 9 additions & 9 deletions detect_secrets/plugins/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import string
import textwrap
from datetime import datetime
from typing import List
from typing import Union

import requests

from ..constants import VerifiedResult
from ..util.code_snippet import CodeSnippet
from .base import RegexBasedDetector


Expand All @@ -22,19 +25,19 @@ class AWSKeyDetector(RegexBasedDetector):
re.compile(r'AKIA[0-9A-Z]{16}'),
)

def verify(self, token, context):
def verify(self, secret: str, context: CodeSnippet) -> VerifiedResult:
secret_access_key_candidates = get_secret_access_keys(context)
if not secret_access_key_candidates:
return VerifiedResult.UNVERIFIED

for candidate in secret_access_key_candidates:
if verify_aws_secret_access_key(token, candidate):
if verify_aws_secret_access_key(secret, candidate):
return VerifiedResult.VERIFIED_TRUE

return VerifiedResult.VERIFIED_FALSE


def get_secret_access_keys(content):
def get_secret_access_keys(content: CodeSnippet) -> List[str]:
# AWS secret access keys are 40 characters long.
# e.g. some_function('AKIA...', '[secret key]')
# e.g. secret_access_key = '[secret key]'
Expand All @@ -46,21 +49,18 @@ def get_secret_access_keys(content):

return [
match[2]
for line in content.splitlines()
for line in content
for match in regex.findall(line)
]


def verify_aws_secret_access_key(key, secret): # pragma: no cover
def verify_aws_secret_access_key(key: str, secret: str) -> bool: # pragma: no cover
"""
Using requests, because we don't want to require boto3 for this one
optional verification step.

Loosely based off:
https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html

:type key: str
:type secret: str
"""
now = datetime.utcnow()
amazon_datetime = now.strftime('%Y%m%dT%H%M%SZ')
Expand Down Expand Up @@ -176,7 +176,7 @@ def verify_aws_secret_access_key(key, secret): # pragma: no cover
return True


def _sign(key, message, hex=False): # pragma: no cover
def _sign(key, message: str, hex: bool = False) -> Union[str, bytes]: # pragma: no cover
value = hmac.new(key, message.encode('utf-8'), hashlib.sha256)
if not hex:
return value.digest()
Expand Down
31 changes: 25 additions & 6 deletions detect_secrets/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
from typing import Pattern
from typing import Set

import requests

from ..constants import VerifiedResult
from ..core.potential_secret import PotentialSecret
from ..settings import get_settings


class BasePlugin(metaclass=ABCMeta):
Expand Down Expand Up @@ -70,19 +73,35 @@ def format_scan_result(self, secret: Optional[PotentialSecret]) -> str:
if not secret:
return 'False'

return 'True'
# TODO: check settings for verification
# if not should_verify:
# # This is a secret, but we can't verify it. So this is the best we can do.
# return 'True'
try:
verification_level = VerifiedResult(
get_settings().filters[
'detect_secrets.filters.common.is_ignored_due_to_verification_policies'
]['min_level'],
)
except KeyError:
verification_level = VerifiedResult.VERIFIED_FALSE

if verification_level == VerifiedResult.VERIFIED_FALSE:
# This is a secret, but we can't verify it. So this is the best we can do.
return 'True'

if not secret.secret_value and not secret.is_verified:
# If the secret isn't verified, but we don't know the true secret value, this
# is also the best we can do.
return 'True (unverified)'

if not secret.is_verified:
verified_result = self.verify(secret.secret_value)
try:
# NOTE: There is no context here, since in this frame, we're only aware of the
# secret itself.
verified_result = self.verify(secret.secret_value)
except (requests.exceptions.RequestException, TypeError):
# NOTE: A TypeError is raised when the function expects a `context` to be supplied.
# However, if this function is run through a context-less situation (e.g. adhoc
# string scanning), we don't have that context to provide. As such, the secret is
# UNVERIFIED.
verified_result = VerifiedResult.UNVERIFIED
else:
# It's not going to be VERIFIED_FALSE, otherwise, we won't have the secret object
# to format.
Expand Down
13 changes: 7 additions & 6 deletions detect_secrets/plugins/cloudant.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import re
from typing import List

import requests

from ..constants import VerifiedResult
from ..util.code_snippet import CodeSnippet
from .base import RegexBasedDetector


Expand Down Expand Up @@ -60,19 +62,18 @@ class CloudantDetector(RegexBasedDetector):
),
]

def verify(self, token, context):

def verify(self, secret: str, context: CodeSnippet) -> VerifiedResult:
hosts = find_account(context)
if not hosts:
return VerifiedResult.UNVERIFIED

for host in hosts:
return verify_cloudant_key(host, token)
return verify_cloudant_key(host, secret)

return VerifiedResult.VERIFIED_FALSE


def find_account(context):
def find_account(context: CodeSnippet) -> List[str]:
opt_hostname_keyword = r'(?:hostname|host|username|id|user|userid|user-id|user-name|' \
'name|user_id|user_name|uname|account)'
account = r'(\w[\w\-]*)'
Expand All @@ -98,13 +99,13 @@ def find_account(context):

return [
match
for line in context.splitlines()
for line in context
for regex in regexes
for match in regex.findall(line)
]


def verify_cloudant_key(hostname, token):
def verify_cloudant_key(hostname: str, token: str) -> VerifiedResult:
headers = {'Content-type': 'application/json'}
request_url = 'https://{hostname}:' \
'{token}' \
Expand Down
Loading