Skip to content

Commit

Permalink
ecdsa cot verification test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
escapewindow committed Jan 4, 2019
1 parent c2f2870 commit c6965a6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 44 deletions.
3 changes: 3 additions & 0 deletions scriptworker/test/data/ecdsa/foo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"a": "b"
}
1 change: 1 addition & 0 deletions scriptworker/test/data/ecdsa/foo.json.scriptworker.sig
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
忓⹠䙴鐥霹ȵ펔﵎瑽鈫Ŏ��탵͵�쓢뢵ᵨ痨鱶꼧쒥⃾痜轺찛⤱揱ඡ䘏⹧챾ྌ�띈륞剠⻞鯖뎔祭⨉
Expand Down
137 changes: 93 additions & 44 deletions scriptworker/test/test_cot_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import scriptworker.cot.verify as cotverify
from scriptworker.exceptions import CoTError, ScriptWorkerGPGException, DownloadError
import scriptworker.context as swcontext
from scriptworker.utils import makedirs, load_json_or_yaml
from scriptworker.utils import format_json, makedirs, load_json_or_yaml, write_to_file
from . import noop_async, noop_sync, rw_context, mobile_rw_context, tmpdir, touch
from scriptworker.artifacts import (
get_single_upstream_artifact_full_path,
Expand All @@ -37,6 +37,7 @@

COTV2_DIR = os.path.join(os.path.dirname(__file__), "data", "cotv2")
COTV4_DIR = os.path.join(os.path.dirname(__file__), "data", "cotv4")
ECDSA_DIR = os.path.join(os.path.dirname(__file__), "data", "ecdsa")


def write_artifact(context, task_id, path, contents):
Expand All @@ -50,6 +51,10 @@ async def die_async(*args, **kwargs):
raise CoTError("x")


def die_sync(*args, **kwargs):
raise CoTError("x")


@pytest.yield_fixture(scope='function')
def chain(rw_context):
yield _craft_chain(rw_context)
Expand Down Expand Up @@ -933,65 +938,109 @@ async def test_get_all_artifacts_per_task_id(chain, decision_link, build_link,
assert expected == cotverify.get_all_artifacts_per_task_id(chain, upstream_artifacts)


@pytest.mark.parametrize('upstream_artifacts, raises', ((
[{'taskId': 'build_task_id', 'paths': ['path1', 'path2']}],
True,
# verify_link_gpg_cot_signature {{{1
@pytest.mark.parametrize('sig_verifies, unsigned_path_exists, unsigned_path_matches, raises', ((
True, True, True, False
), (
[{'taskId': 'build_task_id', 'paths': ['failed_path'], 'optional': True}],
False,
True, False, False, False
), (
[{'taskId': 'build_task_id', 'paths': ['path1', 'path2']},
{'taskId': 'build_task_id', 'paths': ['failed_path'], 'optional': True}],
True,
True, True, False, True
), (
[],
False,
False, False, False, True
)))
def test_verify_cot_signatures_no_file(chain, build_link, mocker, upstream_artifacts, raises):
chain.links = [build_link]
mocker.patch.object(cotverify, 'GPG', new=noop_sync)
def test_verify_link_gpg_cot_signature(chain, build_link, mocker, sig_verifies,
unsigned_path_exists, unsigned_path_matches, raises):
contents = {
'taskId': 'build_task_id',
'a': 'b',
}
bad_contents = {
'taskId': 'build_task_id',
'q': 'r',
}

chain.task['payload']['upstreamArtifacts'] = upstream_artifacts
def fake_body(*args, **kwargs):
if sig_verifies:
return format_json(contents)
else:
raise ScriptWorkerGPGException("Foo")

gpg_path = os.path.join(build_link.cot_dir, 'public/chainOfTrust.json.asc')
unsigned_path = os.path.join(build_link.cot_dir, 'public/chain-of-trust.json')
makedirs(os.path.dirname(gpg_path))
touch(gpg_path)
if unsigned_path_exists:
with open(unsigned_path, 'w') as fh:
if unsigned_path_matches:
write_to_file(unsigned_path, contents, file_type='json')
else:
write_to_file(unsigned_path, bad_contents, file_type='json')
build_link._cot = None
mocker.patch.object(cotverify, 'read_from_file', new=noop_sync)
mocker.patch.object(cotverify, 'GPG', new=noop_sync)
mocker.patch.object(cotverify, 'get_body', new=fake_body)
if raises:
with pytest.raises(CoTError):
cotverify.verify_cot_signatures(chain)
cotverify.verify_link_gpg_cot_signature(chain, build_link, unsigned_path)
else:
cotverify.verify_cot_signatures(chain)
cotverify.verify_link_gpg_cot_signature(chain, build_link, unsigned_path)
assert build_link.cot == contents


def test_verify_cot_signatures_bad_sig(chain, build_link, mocker):
# verify_link_ecdsa_cot_signature {{{1
@pytest.mark.parametrize('unsigned_path, signature_path, verifying_key_path, raises', ((
# Good
os.path.join(ECDSA_DIR, 'foo.json'),
os.path.join(ECDSA_DIR, 'foo.json.scriptworker.sig'),
os.path.join(ECDSA_DIR, 'scriptworker_public.pem'),
False,
), (
# nonexistent unsigned_path
os.path.join(ECDSA_DIR, 'NONEXISTENT_PATH'),
os.path.join(ECDSA_DIR, 'foo.json.scriptworker.sig'),
os.path.join(ECDSA_DIR, 'scriptworker_public.pem'),
True,
), (
# Bad verifying key
os.path.join(ECDSA_DIR, 'foo.json'),
os.path.join(ECDSA_DIR, 'foo.json.scriptworker.sig'),
os.path.join(ECDSA_DIR, 'docker-worker_public.pem'),
True,
)))
def test_verify_link_ecdsa_cot_signature(chain, build_link, mocker, unsigned_path,
signature_path, verifying_key_path, raises):
chain.context.config['ecdsa_public_key_paths'][build_link.worker_impl] = verifying_key_path
build_link._cot = None
build_link.task_id = None
if raises:
with pytest.raises(CoTError):
cotverify.verify_link_ecdsa_cot_signature(chain, build_link, unsigned_path, signature_path)
else:
contents = load_json_or_yaml(unsigned_path, is_path=True)
cotverify.verify_link_ecdsa_cot_signature(chain, build_link, unsigned_path, signature_path)
assert build_link.cot == contents

def die(*args, **kwargs):
raise ScriptWorkerGPGException("x")

path = os.path.join(build_link.cot_dir, 'public/chainOfTrust.json.asc')
makedirs(os.path.dirname(path))
touch(path)
# verify_cot_signatures {{{1
@pytest.mark.parametrize('ecdsa_mock, gpg_mock, raises', ((
noop_sync, die_sync, False
), (
die_sync, noop_sync, False
), (
die_sync, die_sync, True
)))
def test_verify_link_gpg_cot_signature_bad_sig(chain, mocker, build_link, ecdsa_mock, gpg_mock, raises):
mocker.patch.object(cotverify, 'verify_link_ecdsa_cot_signature', new=ecdsa_mock)
mocker.patch.object(cotverify, 'verify_link_gpg_cot_signature', new=gpg_mock)
chain.links = [build_link]
mocker.patch.object(cotverify, 'GPG', new=noop_sync)
mocker.patch.object(cotverify, 'get_body', new=die)
with pytest.raises(CoTError):
if raises:
with pytest.raises(CoTError):
cotverify.verify_cot_signatures(chain)
else:
cotverify.verify_cot_signatures(chain)


def test_verify_cot_signatures(chain, build_link, mocker):

def fake_body(*args, **kwargs):
return '{"taskId": "build_task_id"}'

build_link._cot = None
unsigned_path = os.path.join(build_link.cot_dir, 'public/chainOfTrust.json.asc')
path = os.path.join(build_link.cot_dir, 'chainOfTrust.json')
makedirs(os.path.dirname(unsigned_path))
touch(unsigned_path)
chain.links = [build_link]
mocker.patch.object(cotverify, 'GPG', new=noop_sync)
mocker.patch.object(cotverify, 'get_body', new=fake_body)
cotverify.verify_cot_signatures(chain)
assert os.path.exists(path)
with open(path, "r") as fh:
assert json.load(fh) == {"taskId": "build_task_id"}

# _take_expires_out_from_artifacts_in_payload {{{1
@pytest.mark.parametrize('payload, expected', (
({}, {}),
(
Expand Down

0 comments on commit c6965a6

Please sign in to comment.