-
Notifications
You must be signed in to change notification settings - Fork 486
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Aaron Loo
committed
Nov 8, 2020
1 parent
982f5ba
commit 9aaae64
Showing
33 changed files
with
1,417 additions
and
2,037 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from . import analytics # noqa: F401 | ||
from .audit import audit_baseline # noqa: F401 | ||
from .compare import compare_baselines # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
""" | ||
The analytics module produces a machine-readable breakdown of true and false positives | ||
for a given audited baseline. | ||
""" | ||
from collections import defaultdict | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Tuple | ||
|
||
from ..core.plugins.util import get_mapping_from_secret_type_to_class | ||
from ..core.potential_secret import PotentialSecret | ||
from .common import get_baseline_from_file | ||
|
||
|
||
def calculate_statistics_for_baseline( | ||
filename: str, | ||
**kwargs: Any, | ||
) -> 'StatisticsAggregator': | ||
""" | ||
:raises: InvalidBaselineError | ||
""" | ||
secrets = get_baseline_from_file(filename) | ||
|
||
aggregator = StatisticsAggregator(**kwargs) | ||
for _, secret in secrets: | ||
# TODO: gather real secrets? | ||
# TODO: do we need repo_info? | ||
aggregator.record_secret(secret) | ||
|
||
return aggregator | ||
|
||
|
||
class StatisticsAggregator: | ||
def __init__(self) -> None: | ||
framework = { | ||
'stats': StatisticsCounter, | ||
} | ||
|
||
self.data = defaultdict( | ||
lambda: { | ||
key: value() | ||
for key, value in framework.items() | ||
}, | ||
) | ||
|
||
def record_secret(self, secret: PotentialSecret) -> None: | ||
# NOTE: We don't do anything with verified secrets, because this function | ||
# is solely to measure statistics on labelled results. | ||
counter = self._get_plugin_counter(secret.type) | ||
if secret.is_secret is True: | ||
counter.correct += 1 | ||
elif secret.is_secret is False: | ||
counter.incorrect += 1 | ||
else: | ||
counter.unknown += 1 | ||
|
||
def _get_plugin_counter(self, secret_type: str) -> 'StatisticsCounter': | ||
return self.data[secret_type]['stats'] | ||
|
||
def __str__(self) -> str: | ||
raise NotImplementedError | ||
|
||
def json(self) -> Dict[str, Any]: | ||
output = {} | ||
for secret_type, framework in self.data.items(): | ||
output[get_mapping_from_secret_type_to_class()[secret_type].__name__] = { | ||
key: value.json() | ||
for key, value in framework.items() | ||
} | ||
|
||
return output | ||
|
||
|
||
class StatisticsCounter: | ||
def __init__(self) -> None: | ||
self.correct: int = 0 | ||
self.incorrect: int = 0 | ||
self.unknown: int = 0 | ||
|
||
def __repr__(self) -> Tuple[int, int, int]: | ||
return ( | ||
f'{self.__class__.__name__}(correct={self.correct}, ' | ||
'incorrect={self.incorrect}, unknown={self.unknown},)' | ||
) | ||
|
||
def json(self) -> Dict[str, Any]: | ||
precision = ( | ||
round(float(self.correct) / (self.correct + self.incorrect), 4) | ||
if (self.correct and self.incorrect) | ||
else 0.0 | ||
) | ||
|
||
# NOTE(2020-11-08|domanchi): This isn't the formal definition of `recall`, however, | ||
# this is the definition that we're going to attribute to it. | ||
# | ||
# Rationale: If we follow the formal definition of `recall` (i.e. TP / (TP + FN)), | ||
# we would need some way to measure false negatives. However, this is impossible | ||
# since we don't know what we don't know. The only way to get proper "recall" is | ||
# to measure this against a known set of secrets, and see how effective our rules | ||
# are against them. | ||
# | ||
# This is a common problem with Machine Learning. One way to address this is by | ||
# splitting the labelled data you have into a "test set" and a "training set", | ||
# train your model on the test set, and test it's performance on its counterpart. | ||
# This works great for RegexBasedDetectors, but not so much for more heuristic | ||
# scanners (e.g. entropy scanning, or keyword scanning). The primary reason is | ||
# that no labelled data that we can compile will be a representative sample of | ||
# the different types of secrets out there. And as such, we'd be overfitting it | ||
# to whatever sample set we attempt this with. | ||
# | ||
# There is however, an alternative method. If we know these ratios for a certain | ||
# configuration, then change the configuration to be more liberal, we would expect | ||
# our *precision* to decrease, and our *recall* to increase (i.e. catching more | ||
# false positives, in hopes to reduce false negatives). Then, we can work to | ||
# **increase** our precision with this same data set, which is a much more | ||
# measurable way to do this than "decreasing false negatives". | ||
# | ||
# Essentially, if we make our scans more liberal (catching more things), but | ||
# our precision stays the same, we would be catching more real secrets. This | ||
# definition of `recall` allows us to do this. | ||
recall = ( | ||
round(float(self.correct) / (self.correct + self.unknown), 4) | ||
if (self.correct + self.unknown) | ||
else 0.0 | ||
) | ||
|
||
return { | ||
'raw': { | ||
'true-positives': self.correct, | ||
'false-positives': self.incorrect, | ||
'unknown': self.unknown, | ||
}, | ||
'score': { | ||
'precision': precision, | ||
'recall': recall, | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
""" | ||
The audit module allows analysts to easily scan the baseline, and verify whether | ||
the secrets flagged are actually secrets. | ||
""" | ||
from . import io | ||
from ..core import baseline | ||
from ..types import SecretContext | ||
from ..util.code_snippet import get_code_snippet | ||
from .common import get_baseline_from_file | ||
from .common import get_raw_secret_from_file | ||
from .common import open_file | ||
from .exceptions import SecretNotFoundOnSpecifiedLineError | ||
from .iterator import BidirectionalIterator | ||
from .iterator import get_secret_iterator | ||
|
||
|
||
def audit_baseline(filename: str) -> None: | ||
""" | ||
:raises: InvalidBaselineError | ||
""" | ||
secrets = get_baseline_from_file(filename) | ||
|
||
secrets.trim() | ||
if _classify_secrets(get_secret_iterator(secrets)): | ||
io.print_message('Saving progress...') | ||
baseline.save_to_file(secrets, filename) | ||
|
||
|
||
def _classify_secrets(iterator: BidirectionalIterator) -> bool: | ||
""" | ||
:returns: True if changes were made. | ||
""" | ||
# NOTE: Technically, this is a conservative estimate. If an entry was changed to the same | ||
# value, we would return True as well. | ||
has_changes = False | ||
|
||
for secret in iterator: | ||
io.clear_screen() | ||
try: | ||
secret.secret_value = get_raw_secret_from_file(secret) | ||
io.print_context( | ||
SecretContext( | ||
current_index=iterator.index + 1, | ||
num_total_secrets=len(iterator.collection), | ||
secret=secret, | ||
snippet=get_code_snippet( | ||
lines=open_file(secret.filename), | ||
line_number=secret.line_number, | ||
), | ||
), | ||
) | ||
|
||
decision = io.get_user_decision(can_step_back=iterator.can_step_back()) | ||
except SecretNotFoundOnSpecifiedLineError as e: | ||
io.print_secret_not_found( | ||
SecretContext( | ||
current_index=iterator.index + 1, | ||
num_total_secrets=len(iterator.collection), | ||
secret=secret, | ||
error=e, | ||
), | ||
) | ||
|
||
decision = io.get_user_decision( | ||
prompt_secret_decision=False, | ||
can_step_back=iterator.can_step_back(), | ||
) | ||
|
||
if decision == io.InputOptions.QUIT: | ||
io.print_message('Quitting...') | ||
break | ||
|
||
if decision == io.InputOptions.BACK: | ||
iterator.step_back_on_next_iteration() | ||
elif decision == io.InputOptions.YES: | ||
secret.is_secret = True | ||
has_changes = True | ||
elif decision == io.InputOptions.NO: | ||
secret.is_secret = False | ||
has_changes = True | ||
elif decision == io.InputOptions.SKIP and secret.is_secret is not None: | ||
# This handles the case of back-stepping to clear a mistake. | ||
# This is not triggered for pre-labelled secrets, as pre-labelled secrets will be | ||
# excluded from this iterator. | ||
secret.is_secret = None | ||
has_changes = True | ||
|
||
return has_changes |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.