Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding adhoc GitHub script #56

Merged
merged 7 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
90 changes: 90 additions & 0 deletions detect_secrets_server/adhoc/github/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
For organizations that integrate with Github, they have the ability to setup
a webhook to receive events for all repos under the entire organization. In
such cases, this script allows you to scan other fields in which secrets may
be leaked, rather than just focusing on secrets in code.
"""
import io
from contextlib import redirect_stdout
from typing import Any
from typing import Dict
from typing import Tuple

from detect_secrets.main import main as run_detect_secrets


def scan_for_secrets(event_type: str, body: Dict[str, Any]) -> str:
"""
:param event_type: a full list can be found
https://developer.github.com/v3/activity/events/types/

:returns: link to field with leaked secret
"""
mapping = {
'commit_comment': _parse_comment,
'issue_comment': _parse_comment,
'pull_request_review_comment': _parse_comment,
'issues': _parse_issue,
'pull_request': _parse_pull_request,

# NOTE: We're currently ignoring `project*` events, because we don't use
# it. Pull requests welcome!
}
try:
payload, attribution_link = mapping[event_type](body)
except KeyError:
# Not an applicable event.
return None

f = io.StringIO()
with redirect_stdout(f):
run_detect_secrets([
'scan',
'--string', payload,
])

has_results = any([
line
for line in f.getvalue().splitlines()
if 'True' in line.split(':')[1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could add # For e.g. 'SomeDetector : Bool' output

])

return attribution_link if has_results else None


def _parse_comment(body: Dict[str, Any]) -> Tuple[str, str]:
if body.get('action', 'created') == 'deleted':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be wrong, but I don't think this does what you want it to.

'created' is the return value of .get if 'action' is not a present key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's accurate. Some comments don't have action on them, and we want to scan the contents as default behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my feedback is since 'created' == 'deleted' will always be false, it feels weird, but not a strong opinion.

# This indicates that this is not an applicable event.
raise KeyError

return (
body['comment']['body'],
body['comment']['html_url'],
)


def _parse_issue(body: Dict[str, Any]) -> Tuple[str, str]:
if body['action'] not in {'opened', 'edited',}:
# This indicates that this is not an applicable event.
raise KeyError

# NOTE: Explicitly ignoring the issue "title" here, because
# I trust developers enough (hopefully, not famous last words).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 😄

# I think a secret in the title falls under the same threat
# vector as a secret in the labels.
return (
body['issue']['body'],
body['issue']['html_url'],
)


def _parse_pull_request(body: Dict[str, Any]) -> Tuple[str, str]:
if body['action'] not in {'opened', 'edited',}:
# This indicates that this is not an applicable event.
raise KeyError

return (
body['pull_request']['body'],
body['pull_request']['html_url'],
)

3 changes: 3 additions & 0 deletions testing/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from detect_secrets_server.core.usage.common import storage


EICAR = 'aHR0cHM6Ly93d3cueW91dHViZS5jb20vd2F0Y2g/dj1vSGc1U0pZUkhBMA=='


def cache_buster():
storage.get_storage_options.cache_clear()
storage.should_enable_s3_options.cache_clear()
147 changes: 147 additions & 0 deletions tests/adhoc/github/github_webhook_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import json
import os

import pytest

from detect_secrets_server.adhoc.github.webhook import scan_for_secrets
from testing.util import EICAR


@pytest.mark.parametrize(
'action',
{
'created',
'edited',
},
)
@pytest.mark.parametrize(
'event',
{
'commit_comment',
'issue_comment',
'pull_request_review_comment',
},
)
def test_comment_with_secret(event, action):
payload = get_payload(event)

# We make sure to add "multiple words" here, since we want to make
# sure that it supports multi-word bodies (as we would expect in
# regular usage).
payload['comment']['body'] = 'multiple words {}'.format(EICAR)
if 'action' in payload:
payload['action'] = action

assert scan_for_secrets(event, payload)


@pytest.mark.parametrize(
'action',
{
'created',
'edited',
},
)
@pytest.mark.parametrize(
'event',
{
'commit_comment',
'issue_comment',
'pull_request_review_comment',
},
)
def test_comment_no_secret(event, action):
payload = get_payload(event)
if 'action' in payload:
payload['action'] = action

assert not scan_for_secrets(event, payload)


@pytest.mark.parametrize(
'event',
{
'commit_comment',
'issue_comment',
'pull_request_review_comment',
},
)
def test_comment_deleted(event):
payload = get_payload(event)
payload['comment']['body'] = 'multiple words {}'.format(EICAR)
if 'action' in payload:
payload['action'] = 'deleted'

assert not scan_for_secrets(event, payload)


@pytest.mark.parametrize(
'event_key',
{
'issues,issue',
'pull_request,pull_request',
},
)
@pytest.mark.parametrize(
'action',
{
'opened',
'edited',
},
)
def test_issue_success(event_key, action):
event, key = event_key.split(',')
payload = get_payload(event)
payload['action'] = action
payload[key]['body'] = 'multiple words {}'.format(EICAR)

assert scan_for_secrets(event, payload)


@pytest.mark.parametrize(
'event_key',
{
'issues,issue',
'pull_request,pull_request',
},
)
@pytest.mark.parametrize(
'action',
{
'opened',
'edited',
},
)
def test_issue_no_secret(event_key, action):
event, key = event_key.split(',')
payload = get_payload(event)
payload['action'] = action

assert not scan_for_secrets(event, payload)


@pytest.mark.parametrize(
'event_key',
{
'issues,issue',
'pull_request,pull_request',
},
)
def test_issue_not_applicable(event_key):
event, key = event_key.split(',')
payload = get_payload(event)
payload['action'] = 'deleted'
payload[key]['body'] = 'multiple words {}'.format(EICAR)

assert not scan_for_secrets(event, payload)


def get_payload(name):
filepath = os.path.join(
os.path.dirname(__file__),
'../../../testing/github/',
'{}.json'.format(name),
)

with open(filepath) as f:
return json.loads(f.read())