From 2f7e51364b3b852e02bc63f861fe78ca137ff7e9 Mon Sep 17 00:00:00 2001 From: Mitchell Yuwono Date: Tue, 7 Feb 2017 12:57:23 +1100 Subject: [PATCH] update key to string conversion util to fixtures --- src/main/python/covata/delta/crypto.py | 6 ++--- src/unittest/python/conftest.py | 27 +++++++++++----------- src/unittest/python/test_api_client.py | 10 ++++---- src/unittest/python/test_crypto_service.py | 23 ++++++------------ 4 files changed, 29 insertions(+), 37 deletions(-) diff --git a/src/main/python/covata/delta/crypto.py b/src/main/python/covata/delta/crypto.py index c94c04b..45e2413 100644 --- a/src/main/python/covata/delta/crypto.py +++ b/src/main/python/covata/delta/crypto.py @@ -47,7 +47,7 @@ def save(self, private_key, file_name): os.makedirs(self.key_store_path) with open(file_path, 'w') as f: - f.write(pem) + f.write(pem.decode(encoding='utf8')) def load(self, file_name): # type: (str) -> rsa.RSAPrivateKey @@ -81,7 +81,7 @@ def generate_key(): @staticmethod def serialized(public_key): - # type: (rsa.RSAPublicKey) -> str + # type: (rsa.RSAPublicKey) -> unicode """ :param :class:`RSAPublicKey` public_key: the public Key object @@ -90,4 +90,4 @@ def serialized(public_key): """ der = public_key.public_bytes(encoding=serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1) - return str(base64.b64encode(der)) + return base64.b64encode(der).decode(encoding='utf8') diff --git a/src/unittest/python/conftest.py b/src/unittest/python/conftest.py index eb339be..fe2d2cd 100644 --- a/src/unittest/python/conftest.py +++ b/src/unittest/python/conftest.py @@ -19,8 +19,6 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization -from mock import mock_open - import covata.delta.crypto as crypto @@ -43,15 +41,16 @@ def private_key(): backend=default_backend()) -@pytest.fixture(scope="function") -def mock_file(mocker, crypto_service, private_key): - mock_pem_file = private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.BestAvailableEncryption( - crypto_service.key_store_passphrase)) - mocker.patch('os.path.isdir', return_value=True) - return mocker.patch( - 'covata.delta.crypto.open', - mock_open(read_data=mock_pem_file), - create=True) +@pytest.fixture(scope="session") +def key2bytes(): + def convert(key): + if isinstance(key, rsa.RSAPrivateKey): + return key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption()) + elif isinstance(key, rsa.RSAPublicKey): + return key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.PKCS1) + return convert diff --git a/src/unittest/python/test_api_client.py b/src/unittest/python/test_api_client.py index d91f263..4657a7d 100644 --- a/src/unittest/python/test_api_client.py +++ b/src/unittest/python/test_api_client.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import uuid import responses @@ -21,16 +20,19 @@ @responses.activate -def test_register_identity(mocker, crypto_service, private_key): +def test_register_identity(mocker, crypto_service, private_key, key2bytes): expected_id = str(uuid.uuid4()) responses.add(responses.POST, ApiClient.DELTA_URL + ApiClient.RESOURCE_IDENTITIES, status=201, - body=json.dumps(dict(identityId=expected_id)), - content_type='application/json') + json=dict(identityId=expected_id)) mocker.patch.object(crypto_service, 'generate_key', return_value=private_key) api_client = RequestsApiClient(crypto_service) identity_id = api_client.register_identity("1", {}) + crypto_key = crypto_service.load("%s.crypto.pem" % identity_id) + signing_key = crypto_service.load("%s.signing.pem" % identity_id) assert identity_id == expected_id + assert key2bytes(crypto_key) == key2bytes(private_key) + assert key2bytes(signing_key) == key2bytes(private_key) diff --git a/src/unittest/python/test_crypto_service.py b/src/unittest/python/test_crypto_service.py index 54a26c8..f731bb7 100644 --- a/src/unittest/python/test_crypto_service.py +++ b/src/unittest/python/test_crypto_service.py @@ -13,7 +13,6 @@ # limitations under the License. import base64 -import os from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa @@ -32,30 +31,22 @@ def test_should_serialize_public_key_to_b64_encoded_der_format( expected = base64.b64encode(public_key.public_bytes( encoding=serialization.Encoding.DER, - format=serialization.PublicFormat.PKCS1)) + format=serialization.PublicFormat.PKCS1)) # type: bytes - assert crypto_service.serialized(public_key) == str(expected) + assert crypto_service.serialized(public_key) == expected.decode() -def test_should_decrypt_private_key(crypto_service, private_key, mock_file): - retrieved = as_string(crypto_service.load("mock.pem")) - expected = as_string(private_key) - mock_file.assert_called_once_with( - os.path.join(crypto_service.key_store_path, "mock.pem"), 'r') +def test_should_decrypt_private_key(crypto_service, private_key, key2bytes): + crypto_service.save(private_key, "mock.pem") + retrieved = key2bytes(crypto_service.load("mock.pem")) + expected = key2bytes(private_key) assert retrieved == expected -def test_should_encrypt_to_file(mocker, crypto_service, private_key, mock_file): +def test_should_encrypt_to_file(mocker, crypto_service, private_key): mock_makedirs = mocker.patch('os.makedirs') mocker.patch('os.path.isdir', return_value=False) crypto_service.save(private_key, "mock.pem") - mock_file.assert_called_once_with( - os.path.join(crypto_service.key_store_path, "mock.pem"), 'w') mock_makedirs.assert_called_once_with(crypto_service.key_store_path) -def as_string(private_key): - return private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption())