Skip to content

Commit

Permalink
improve keyvault certificate download
Browse files Browse the repository at this point in the history
  • Loading branch information
devigned committed Apr 27, 2017
1 parent 7ee49ba commit 0392afe
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 30 deletions.
4 changes: 3 additions & 1 deletion src/azure-cli-core/azure/cli/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ def read_file_content(file_path, allow_binary=False):
for encoding in ['utf-8-sig', 'utf-8', 'utf-16', 'utf-16le', 'utf-16be']:
try:
with codecs_open(file_path, encoding=encoding) as f:
logger.debug("attempting to read file %s as %s", file_path, encoding)
return f.read()
except UnicodeDecodeError:
if allow_binary:
with open(file_path, 'rb') as input_file:
return input_file.read()
logger.debug("attempting to read file %s as binary", file_path)
return base64.b64encode(input_file.read()).decode("utf-8")
else:
raise
except UnicodeError:
Expand Down
1 change: 1 addition & 0 deletions src/command_modules/azure-cli-keyvault/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release History
unreleased
++++++++++++++++++++

* Change `az keyvault certificate download` to better reflect the encoding options (PEM and DER).
* BC: Remove --expires and --not-before from `keyvault certificate create` as these parameters are not supported by the service.
* Adds the --validity parameter to `keyvault certificate create` to selectively override the value in --policy
* Fixes issue in `keyvault certificate get-default-policy` where 'expires' and 'not_before' were exposed but 'validity_in_months' was not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def completer(prefix, action, parsed_args, **kwargs): # pylint: disable=unused-a
json_web_key_op_values = ', '.join([x.value for x in JsonWebKeyOperation])
secret_encoding_values = secret_text_encoding_values + secret_binary_encoding_values
certificate_file_encoding_values = ['binary', 'string']
certificate_format_values = ['PEM', 'DER']

# KEY ATTRIBUTE PARAMETER REGISTRATION

Expand Down Expand Up @@ -170,7 +171,7 @@ def register_attributes_argument(scope, name, attr_class, create=False, ignore=N
register_extra_cli_argument('keyvault certificate import', 'disabled', help='Import the certificate in disabled state.', **three_state_flag())

register_cli_argument('keyvault certificate download', 'file_path', options_list=('--file', '-f'), type=file_type, completer=FilesCompleter(), help='File to receive the binary certificate contents.')
register_cli_argument('keyvault certificate download', 'encoding', options_list=('--encoding', '-e'), help='How to store base64 certificate contents in file.', **enum_choice_list(certificate_file_encoding_values))
register_cli_argument('keyvault certificate download', 'encoding', options_list=('--encoding', '-e'), help='Encoding of the certificate. DER will create a binary DER formatted x509 certificate, and PEM will create a base64 PEM x509 certificate.', **enum_choice_list(certificate_format_values))

register_cli_argument('keyvault certificate pending merge', 'x509_certificates', options_list=('--file', '-f'), type=file_type, completer=FilesCompleter(), help='File containing the certificate or certificate chain to merge.', validator=validate_x509_certificate_chain)
register_attributes_argument('keyvault certificate pending merge', 'certificate', CertificateAttributes, True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def import_certificate(client, vault_base_url, certificate_name, certificate_dat


def download_certificate(client, vault_base_url, certificate_name, file_path,
encoding='binary', certificate_version=''):
encoding='PEM', certificate_version=''):
""" Download a certificate from a KeyVault. """
if os.path.isfile(file_path) or os.path.isdir(file_path):
raise CLIError("File or directory named '{}' already exists.".format(file_path))
Expand All @@ -625,15 +625,16 @@ def download_certificate(client, vault_base_url, certificate_name, file_path,

try:
with open(file_path, 'wb') as f:
if encoding == 'binary':
if encoding == 'DER':
f.write(cert)
else:
import base64
try:
f.write(base64.encodebytes(cert))
except AttributeError:
f.write(base64.encodestring(cert)) # pylint: disable=deprecated-method
except Exception as ex: # pylint: disable=broad-except
encoded = base64.encodestring(cert) # pylint:disable=deprecated-method
if isinstance(encoded, bytes):
encoded = encoded.decode("utf-8") # pylint:disable=redefined-variable-type
encoded = '-----BEGIN CERTIFICATE-----\n' + encoded + '-----END CERTIFICATE-----\n'
f.write(encoded.encode("utf-8"))
except Exception as ex: # pylint: disable=broad-except
if os.path.isfile(file_path):
os.remove(file_path)
raise ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def test_parse_asn1_date(self):
self.assertEqual(_asn1_to_iso8601("20170424163720Z"), expected)



class KeyVaultMgmtScenarioTest(ResourceGroupVCRTestBase):

def __init__(self, test_method):
Expand Down Expand Up @@ -382,6 +381,8 @@ def _test_keyvault_pending_certificate(self):
allowed_exceptions='Pending certificate not found')

def _test_certificate_download(self):
import OpenSSL.crypto

kv = self.keyvault_name
pem_file = os.path.join(TEST_DIR, 'import_pem_plain.pem')
pem_policy_path = os.path.join(TEST_DIR, 'policy_import_pem.json')
Expand All @@ -391,25 +392,32 @@ def _test_certificate_download(self):
dest_binary = os.path.join(TEST_DIR, 'download-binary')
dest_string = os.path.join(TEST_DIR, 'download-string')

expected_pem = "-----BEGIN CERTIFICATE-----\n" + \
cert_data + \
'-----END CERTIFICATE-----\n'
expected_pem = expected_pem.replace('\n', '')

try:
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}"'.format(kv, dest_binary))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e string'.format(kv, dest_string))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e DER'.format(kv, dest_binary))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e PEM'.format(kv, dest_string))
self.cmd('keyvault certificate delete --vault-name {} -n pem-cert1'.format(kv))
with open(dest_binary, 'rb') as f:
import base64
downloaded_binary = base64.b64encode(f.read()).decode('utf-8')

with open(dest_string, 'r') as f:
downloaded_string = f.read().replace('\r\n', '\n').replace('\n', '')
def verify(path, file_type):
with open(path, 'rb') as f:
x509 = OpenSSL.crypto.load_certificate(file_type, f.read())
actual_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, x509)
if isinstance(actual_pem, bytes):
actual_pem = actual_pem.decode("utf-8")
self.assertIn(expected_pem, actual_pem.replace('\n', ''))

verify(dest_binary, OpenSSL.crypto.FILETYPE_ASN1)
verify(dest_string, OpenSSL.crypto.FILETYPE_PEM)
finally:
if os.path.exists(dest_binary):
os.remove(dest_binary)
if os.path.exists(dest_string):
os.remove(dest_string)

self.assertEqual(cert_data, downloaded_string)
self.assertEqual(cert_data, downloaded_binary)

def body(self):
_create_keyvault(self, self.keyvault_name, self.resource_group, self.location)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from azure.cli.core.commands import CliArgumentType
from azure.cli.core.commands import register_cli_argument
from azure.cli.core.commands.parameters import enum_choice_list
from .custom import get_role_definition_name_completion_list
from .custom import get_role_definition_name_completion_list, x509_type
from ._validators import validate_group, validate_member_id, VARIANT_GROUP_ID_ARGS

register_cli_argument('ad app', 'application_object_id', options_list=('--object-id',))
Expand Down Expand Up @@ -36,6 +36,8 @@
register_cli_argument('ad sp create-for-rbac', 'role', completer=get_role_definition_name_completion_list)
register_cli_argument('ad sp create-for-rbac', 'skip_assignment', action='store_true', help='do not create default assignment')
register_cli_argument('ad sp create-for-rbac', 'expanded_view', action='store_true', help='Once created, display more information like subscription and cloud environments')
register_cli_argument('ad sp create-for-rbac', 'cert', type=x509_type)

register_cli_argument('ad sp reset-credentials', 'name', name_arg_type)
register_cli_argument('ad sp reset-credentials', 'years', type=int, default=None)
register_cli_argument('ad sp reset-credentials', 'create_cert', action='store_true', help='re-create and upload self-signed certificate')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def get_role_definition_name_completion_list(prefix, **kwargs): # pylint: disab
return [x.properties.role_name for x in list(definitions)]


def x509_type(cert):
x509 = _try_x509_pem(cert) or _try_x509_der(cert)
if not x509:
raise CLIError("The value provided for --cert was not a PEM or a DER file.")
return x509


def create_role_definition(role_definition):
return _create_update_role_definition(role_definition, for_update=False)

Expand Down Expand Up @@ -508,7 +515,8 @@ def create_service_principal_for_rbac(
'''create a service principal and configure its access to Azure resources
:param str name: a display name or an app id uri. Command will generate one if missing.
:param str password: the password used to login. If missing, command will generate one.
:param str cert: string or @file_path PEM formatted public certificate. Do not include private key info.
:param str cert: string or @file_path PEM formatted public certificate. Do not include private
key info.
:param str years: Years the password will be valid. Default: 1 year
:param str scopes: space separated scopes the service principal's role assignment applies to.
Defaults to the root of the current subscription.
Expand Down Expand Up @@ -682,17 +690,49 @@ def _create_self_signed_cert(years):
return (cert_string, creds_file)


def _normalize_cert(cert):
pem_data = cert
cert_header = '-----BEGIN CERTIFICATE-----'
cert_end = '-----END CERTIFICATE-----'
if not pem_data.startswith(cert_header):
pem_data = cert_header + '\n' + pem_data + '\n' + cert_end
def _try_x509_pem(cert):
import OpenSSL.crypto
try:
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
except OpenSSL.crypto.Error:
# could not load the pem, try with headers
try:
pem_with_headers = '-----BEGIN CERTIFICATE-----\n' \
+ cert + \
'-----END CERTIFICATE-----\n'
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_with_headers)
except OpenSSL.crypto.Error:
return None
except UnicodeEncodeError:
# this must be a binary encoding
return None


def _try_x509_der(cert):
import OpenSSL.crypto
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_data)
import base64
try:
cert = base64.b64decode(cert)
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
except OpenSSL.crypto.Error:
return None


def _get_public(x509):
import OpenSSL.crypto
pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, x509)
if isinstance(pem, bytes):
pem = pem.decode("utf-8")
stripped = pem.replace('-----BEGIN CERTIFICATE-----\n', '')
stripped = stripped.replace('-----END CERTIFICATE-----\n', '')
return stripped


def _normalize_cert(x509):
logger.debug("normalizing x509 certificate with fingerprint %s", x509.digest("sha1"))
pkey = x509.get_notAfter().decode()
end_date = dateutil.parser.parse(pkey)
return cert.replace(cert_header, '').replace(cert_end, '').strip(), end_date
return _get_public(x509), end_date


def reset_service_principal_credential(name, password=None, create_cert=False,
Expand Down

0 comments on commit 0392afe

Please sign in to comment.