Skip to content

Commit

Permalink
update key to string conversion util to fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
myuwono committed Feb 7, 2017
1 parent d962e0b commit 2f7e513
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 37 deletions.
6 changes: 3 additions & 3 deletions src/main/python/covata/delta/crypto.py
Expand Up @@ -47,7 +47,7 @@ def save(self, private_key, file_name):
os.makedirs(self.key_store_path)

with open(file_path, 'w') as f:
f.write(pem)
f.write(pem.decode(encoding='utf8'))

def load(self, file_name):
# type: (str) -> rsa.RSAPrivateKey
Expand Down Expand Up @@ -81,7 +81,7 @@ def generate_key():

@staticmethod
def serialized(public_key):
# type: (rsa.RSAPublicKey) -> str
# type: (rsa.RSAPublicKey) -> unicode
"""
:param :class:`RSAPublicKey` public_key: the public Key object
Expand All @@ -90,4 +90,4 @@ def serialized(public_key):
"""
der = public_key.public_bytes(encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1)
return str(base64.b64encode(der))
return base64.b64encode(der).decode(encoding='utf8')
27 changes: 13 additions & 14 deletions src/unittest/python/conftest.py
Expand Up @@ -19,8 +19,6 @@
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

from mock import mock_open

import covata.delta.crypto as crypto


Expand All @@ -43,15 +41,16 @@ def private_key():
backend=default_backend())


@pytest.fixture(scope="function")
def mock_file(mocker, crypto_service, private_key):
mock_pem_file = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(
crypto_service.key_store_passphrase))
mocker.patch('os.path.isdir', return_value=True)
return mocker.patch(
'covata.delta.crypto.open',
mock_open(read_data=mock_pem_file),
create=True)
@pytest.fixture(scope="session")
def key2bytes():
def convert(key):
if isinstance(key, rsa.RSAPrivateKey):
return key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
elif isinstance(key, rsa.RSAPublicKey):
return key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1)
return convert
10 changes: 6 additions & 4 deletions src/unittest/python/test_api_client.py
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import uuid

import responses
Expand All @@ -21,16 +20,19 @@


@responses.activate
def test_register_identity(mocker, crypto_service, private_key):
def test_register_identity(mocker, crypto_service, private_key, key2bytes):
expected_id = str(uuid.uuid4())
responses.add(responses.POST,
ApiClient.DELTA_URL + ApiClient.RESOURCE_IDENTITIES,
status=201,
body=json.dumps(dict(identityId=expected_id)),
content_type='application/json')
json=dict(identityId=expected_id))

mocker.patch.object(crypto_service, 'generate_key', return_value=private_key)

api_client = RequestsApiClient(crypto_service)
identity_id = api_client.register_identity("1", {})
crypto_key = crypto_service.load("%s.crypto.pem" % identity_id)
signing_key = crypto_service.load("%s.signing.pem" % identity_id)
assert identity_id == expected_id
assert key2bytes(crypto_key) == key2bytes(private_key)
assert key2bytes(signing_key) == key2bytes(private_key)
23 changes: 7 additions & 16 deletions src/unittest/python/test_crypto_service.py
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import base64
import os

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
Expand All @@ -32,30 +31,22 @@ def test_should_serialize_public_key_to_b64_encoded_der_format(

expected = base64.b64encode(public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1))
format=serialization.PublicFormat.PKCS1)) # type: bytes

assert crypto_service.serialized(public_key) == str(expected)
assert crypto_service.serialized(public_key) == expected.decode()


def test_should_decrypt_private_key(crypto_service, private_key, mock_file):
retrieved = as_string(crypto_service.load("mock.pem"))
expected = as_string(private_key)
mock_file.assert_called_once_with(
os.path.join(crypto_service.key_store_path, "mock.pem"), 'r')
def test_should_decrypt_private_key(crypto_service, private_key, key2bytes):
crypto_service.save(private_key, "mock.pem")
retrieved = key2bytes(crypto_service.load("mock.pem"))
expected = key2bytes(private_key)
assert retrieved == expected


def test_should_encrypt_to_file(mocker, crypto_service, private_key, mock_file):
def test_should_encrypt_to_file(mocker, crypto_service, private_key):
mock_makedirs = mocker.patch('os.makedirs')
mocker.patch('os.path.isdir', return_value=False)
crypto_service.save(private_key, "mock.pem")
mock_file.assert_called_once_with(
os.path.join(crypto_service.key_store_path, "mock.pem"), 'w')
mock_makedirs.assert_called_once_with(crypto_service.key_store_path)


def as_string(private_key):
return private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())

0 comments on commit 2f7e513

Please sign in to comment.