Skip to content

Commit

Permalink
Merge pull request #195 from OpenKMIP/feat/add-opp-policy-name-to-client
Browse files Browse the repository at this point in the history
Adding operation policy name support to the client
  • Loading branch information
PeterHamilton authored Oct 6, 2016
2 parents bcb5e7d + 894a7ac commit 843c75d
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 13 deletions.
57 changes: 45 additions & 12 deletions kmip/pie/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ def close(self):
self.logger.exception("could not close client connection", e)
raise e

def create(self, algorithm, length):
def create(self, algorithm, length, operation_policy_name=None):
"""
Create a symmetric key on a KMIP appliance.
Args:
algorithm (CryptographicAlgorithm): An enumeration defining the
algorithm to use to generate the symmetric key.
length (int): The length in bits for the symmetric key.
operation_policy_name (string): The name of the operation policy
to use for the new symmetric key. Optional, defaults to None.
Returns:
string: The uid of the newly created symmetric key.
Expand All @@ -164,8 +166,12 @@ def create(self, algorithm, length):
raise exceptions.ClientConnectionNotOpen()

# Create the template containing the attributes
attributes = self._build_key_attributes(algorithm, length)
template = cobjects.TemplateAttribute(attributes=attributes)
common_attributes = self._build_common_attributes(
operation_policy_name
)
key_attributes = self._build_key_attributes(algorithm, length)
key_attributes.extend(common_attributes)
template = cobjects.TemplateAttribute(attributes=key_attributes)

# Create the symmetric key and handle the results
result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template)
Expand All @@ -179,14 +185,16 @@ def create(self, algorithm, length):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)

def create_key_pair(self, algorithm, length):
def create_key_pair(self, algorithm, length, operation_policy_name=None):
"""
Create an asymmetric key pair on a KMIP appliance.
Args:
algorithm (CryptographicAlgorithm): An enumeration defining the
algorithm to use to generate the key pair.
length (int): The length in bits for the key pair.
operation_policy_name (string): The name of the operation policy
to use for the new key pair. Optional, defaults to None.
Returns:
string: The uid of the newly created public key.
Expand All @@ -209,8 +217,12 @@ def create_key_pair(self, algorithm, length):
raise exceptions.ClientConnectionNotOpen()

# Create the template containing the attributes
attributes = self._build_key_attributes(algorithm, length)
template = cobjects.CommonTemplateAttribute(attributes=attributes)
common_attributes = self._build_common_attributes(
operation_policy_name
)
key_attributes = self._build_key_attributes(algorithm, length)
key_attributes.extend(common_attributes)
template = cobjects.CommonTemplateAttribute(attributes=key_attributes)

# Create the asymmetric key pair and handle the results
result = self.proxy.create_key_pair(common_template_attribute=template)
Expand Down Expand Up @@ -250,15 +262,22 @@ def register(self, managed_object):
raise exceptions.ClientConnectionNotOpen()

# Extract and create attributes
attributes = list()
object_attributes = list()

if hasattr(managed_object, 'cryptographic_usage_masks'):
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
managed_object.cryptographic_usage_masks)

attributes.append(mask_attribute)

template = cobjects.TemplateAttribute(attributes=attributes)
managed_object.cryptographic_usage_masks
)
object_attributes.append(mask_attribute)
if hasattr(managed_object, 'operation_policy_name'):
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
managed_object.operation_policy_name
)
object_attributes.append(opn_attribute)

template = cobjects.TemplateAttribute(attributes=object_attributes)
object_type = managed_object.object_type

# Register the managed object and handle the results
Expand Down Expand Up @@ -391,6 +410,20 @@ def _build_key_attributes(self, algorithm, length):

return [algorithm_attribute, length_attribute, mask_attribute]

def _build_common_attributes(self, operation_policy_name=None):
# Build a list of common attributes.
common_attributes = []

if operation_policy_name:
common_attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
operation_policy_name
)
)

return common_attributes

def __enter__(self):
self.open()
return self
Expand Down
7 changes: 6 additions & 1 deletion kmip/pie/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class ManagedObject(sql.Base):
_names = relationship('ManagedObjectName', back_populates='mo',
cascade='all, delete-orphan')
names = association_proxy('_names', 'name')
operation_policy_name = Column(
'operation_policy_name',
String(50),
default='default'
)

__mapper_args__ = {
'polymorphic_identity': 'ManagedObject',
Expand All @@ -71,14 +76,14 @@ def __init__(self):
self.unique_identifier = None
self.name_index = 0
self.names = list()
self.operation_policy_name = None
self._object_type = None

# All remaining attributes are not considered part of the public API
# and are subject to change.
self._application_specific_informations = list()
self._contact_information = None
self._object_groups = list()
self._operation_policy_name = None

# The following attributes are placeholders for attributes that are
# unsupported by kmip.core
Expand Down
124 changes: 124 additions & 0 deletions kmip/tests/unit/pie/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,55 @@ def test_create(self):
self.assertIsInstance(uid, six.string_types)
self.assertEqual(uid, key_id)

@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_with_operation_policy_name(self):
"""
Test that a symmetric key can be created with proper inputs,
specifically testing that the operation policy name is correctly
sent with the request.
"""
# Create the template to test the create call
algorithm = enums.CryptographicAlgorithm.AES
length = 256
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
)

key_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute,
opn_attribute
]
template = obj.TemplateAttribute(attributes=key_attributes)

key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
status = enums.ResultStatus.SUCCESS
result = results.CreateResult(
contents.ResultStatus(status),
uuid=attr.UniqueIdentifier(key_id))

with ProxyKmipClient() as client:
client.proxy.create.return_value = result

client.create(
algorithm,
length,
operation_policy_name='test'
)
client.proxy.create.assert_called_with(
enums.ObjectType.SYMMETRIC_KEY, template)

@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_on_invalid_algorithm(self):
Expand Down Expand Up @@ -311,6 +360,58 @@ def test_create_key_pair(self):
self.assertIsInstance(public_uid, six.string_types)
self.assertIsInstance(private_uid, six.string_types)

@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_key_pair_with_operation_policy_name(self):
"""
Test that an asymmetric key pair can be created with proper inputs,
specifically testing that the operation policy name is correctly
sent with the request.
"""
# Create the template to test the create key pair call
algorithm = enums.CryptographicAlgorithm.RSA
length = 2048
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
)

pair_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute,
opn_attribute
]
template = obj.CommonTemplateAttribute(attributes=pair_attributes)

status = enums.ResultStatus.SUCCESS
result = results.CreateKeyPairResult(
contents.ResultStatus(status),
public_key_uuid=attr.PublicKeyUniqueIdentifier(
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
private_key_uuid=attr.PrivateKeyUniqueIdentifier(
'ffffffff-3333-2222-1111-aaaaaaaaaaaa'))

with ProxyKmipClient() as client:
client.proxy.create_key_pair.return_value = result

public_uid, private_uid = client.create_key_pair(
enums.CryptographicAlgorithm.RSA,
2048,
operation_policy_name='test'
)

kwargs = {'common_template_attribute': template}
client.proxy.create_key_pair.assert_called_with(**kwargs)

@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_key_pair_on_invalid_algorithm(self):
Expand Down Expand Up @@ -680,3 +781,26 @@ def test_register_on_operation_failure(self):

self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.register, *args)

@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_build_common_attributes(self):
"""
Test that the right attribute objects are created.
"""
client = ProxyKmipClient()
client.open()

operation_policy_name = 'test'
common_attributes = client._build_common_attributes(
operation_policy_name
)

self.assertEqual(1, len(common_attributes))

opn = common_attributes[0]
self.assertIsInstance(opn, obj.Attribute)
self.assertIsInstance(opn.attribute_name, obj.Attribute.AttributeName)
self.assertIsInstance(opn.attribute_value, attr.OperationPolicyName)
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
self.assertEqual(opn.attribute_value.value, 'test')

0 comments on commit 843c75d

Please sign in to comment.