Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Key Vault Certificate Download and AAD SP Integration #3003

Merged
merged 7 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/azure-cli-core/azure/cli/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ def read_file_content(file_path, allow_binary=False):
for encoding in ['utf-8-sig', 'utf-8', 'utf-16', 'utf-16le', 'utf-16be']:
try:
with codecs_open(file_path, encoding=encoding) as f:
logger.debug("attempting to read file %s as %s", file_path, encoding)
return f.read()
except UnicodeDecodeError:
if allow_binary:
with open(file_path, 'rb') as input_file:
return input_file.read()
logger.debug("attempting to read file %s as binary", file_path)
return base64.b64encode(input_file.read()).decode("utf-8")
else:
raise
except UnicodeError:
Expand Down
1 change: 1 addition & 0 deletions src/command_modules/azure-cli-keyvault/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release History
unreleased
++++++++++++++++++++

* BC:`az keyvault certificate download` change -e from string or binary to PEM or DER to better represent the options
* BC: Remove --expires and --not-before from `keyvault certificate create` as these parameters are not supported by the service.
* Adds the --validity parameter to `keyvault certificate create` to selectively override the value in --policy
* Fixes issue in `keyvault certificate get-default-policy` where 'expires' and 'not_before' were exposed but 'validity_in_months' was not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@
short-summary: Manage certificates.
"""

helps['keyvault certificate download'] = """
long-summary: >
Download the public portion of a Key Vault certificate formatted as either PEM or DER. PEM
formatting is the default.
examples:
- name: Download a PEM and check it's fingerprint in openssl
text: >
az keyvault certificate download --vault-name vault -n cert-name -f cert.pem \n
openssl x509 -in cert.pem -inform PEM -noout -sha1 -fingerprint
- name: Download a DER and check it's fingerprint in openssl
text: >
az keyvault certificate download --vault-name vault -n cert-name -f cert.crt -e DER \n
openssl x509 -in cert.crt -inform DER -noout -sha1 -fingerprint
"""

helps['keyvault certificate get-default-policy'] = """
type: command
short-summary: Get a default policy for a self-signed certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def completer(prefix, action, parsed_args, **kwargs): # pylint: disable=unused-a
certificate_permission_values = ', '.join([x.value for x in CertificatePermissions])
json_web_key_op_values = ', '.join([x.value for x in JsonWebKeyOperation])
secret_encoding_values = secret_text_encoding_values + secret_binary_encoding_values
certificate_file_encoding_values = ['binary', 'string']
certificate_format_values = ['PEM', 'DER']

# KEY ATTRIBUTE PARAMETER REGISTRATION

Expand Down Expand Up @@ -170,7 +170,7 @@ def register_attributes_argument(scope, name, attr_class, create=False, ignore=N
register_extra_cli_argument('keyvault certificate import', 'disabled', help='Import the certificate in disabled state.', **three_state_flag())

register_cli_argument('keyvault certificate download', 'file_path', options_list=('--file', '-f'), type=file_type, completer=FilesCompleter(), help='File to receive the binary certificate contents.')
register_cli_argument('keyvault certificate download', 'encoding', options_list=('--encoding', '-e'), help='How to store base64 certificate contents in file.', **enum_choice_list(certificate_file_encoding_values))
register_cli_argument('keyvault certificate download', 'encoding', options_list=('--encoding', '-e'), help='Encoding of the certificate. DER will create a binary DER formatted x509 certificate, and PEM will create a base64 PEM x509 certificate.', **enum_choice_list(certificate_format_values))

register_cli_argument('keyvault certificate pending merge', 'x509_certificates', options_list=('--file', '-f'), type=file_type, completer=FilesCompleter(), help='File containing the certificate or certificate chain to merge.', validator=validate_x509_certificate_chain)
register_attributes_argument('keyvault certificate pending merge', 'certificate', CertificateAttributes, True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def import_certificate(client, vault_base_url, certificate_name, certificate_dat


def download_certificate(client, vault_base_url, certificate_name, file_path,
encoding='binary', certificate_version=''):
encoding='PEM', certificate_version=''):
""" Download a certificate from a KeyVault. """
if os.path.isfile(file_path) or os.path.isdir(file_path):
raise CLIError("File or directory named '{}' already exists.".format(file_path))
Expand All @@ -625,15 +625,16 @@ def download_certificate(client, vault_base_url, certificate_name, file_path,

try:
with open(file_path, 'wb') as f:
if encoding == 'binary':
if encoding == 'DER':
f.write(cert)
else:
import base64
try:
f.write(base64.encodebytes(cert))
except AttributeError:
f.write(base64.encodestring(cert)) # pylint: disable=deprecated-method
except Exception as ex: # pylint: disable=broad-except
encoded = base64.encodestring(cert) # pylint:disable=deprecated-method
if isinstance(encoded, bytes):
encoded = encoded.decode("utf-8") # pylint:disable=redefined-variable-type
encoded = '-----BEGIN CERTIFICATE-----\n' + encoded + '-----END CERTIFICATE-----\n'
f.write(encoded.encode("utf-8"))
except Exception as ex: # pylint: disable=broad-except
if os.path.isfile(file_path):
os.remove(file_path)
raise ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def test_parse_asn1_date(self):
self.assertEqual(_asn1_to_iso8601("20170424163720Z"), expected)



class KeyVaultMgmtScenarioTest(ResourceGroupVCRTestBase):

def __init__(self, test_method):
Expand Down Expand Up @@ -382,6 +381,8 @@ def _test_keyvault_pending_certificate(self):
allowed_exceptions='Pending certificate not found')

def _test_certificate_download(self):
import OpenSSL.crypto

kv = self.keyvault_name
pem_file = os.path.join(TEST_DIR, 'import_pem_plain.pem')
pem_policy_path = os.path.join(TEST_DIR, 'policy_import_pem.json')
Expand All @@ -391,25 +392,32 @@ def _test_certificate_download(self):
dest_binary = os.path.join(TEST_DIR, 'download-binary')
dest_string = os.path.join(TEST_DIR, 'download-string')

expected_pem = "-----BEGIN CERTIFICATE-----\n" + \
cert_data + \
'-----END CERTIFICATE-----\n'
expected_pem = expected_pem.replace('\n', '')

try:
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}"'.format(kv, dest_binary))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e string'.format(kv, dest_string))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e DER'.format(kv, dest_binary))
self.cmd('keyvault certificate download --vault-name {} -n pem-cert1 --file "{}" -e PEM'.format(kv, dest_string))
self.cmd('keyvault certificate delete --vault-name {} -n pem-cert1'.format(kv))
with open(dest_binary, 'rb') as f:
import base64
downloaded_binary = base64.b64encode(f.read()).decode('utf-8')

with open(dest_string, 'r') as f:
downloaded_string = f.read().replace('\r\n', '\n').replace('\n', '')
def verify(path, file_type):
with open(path, 'rb') as f:
x509 = OpenSSL.crypto.load_certificate(file_type, f.read())
actual_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, x509)
if isinstance(actual_pem, bytes):
actual_pem = actual_pem.decode("utf-8")
self.assertIn(expected_pem, actual_pem.replace('\n', ''))

verify(dest_binary, OpenSSL.crypto.FILETYPE_ASN1)
verify(dest_string, OpenSSL.crypto.FILETYPE_PEM)
finally:
if os.path.exists(dest_binary):
os.remove(dest_binary)
if os.path.exists(dest_string):
os.remove(dest_string)

self.assertEqual(cert_data, downloaded_string)
self.assertEqual(cert_data, downloaded_binary)

def body(self):
_create_keyvault(self, self.keyvault_name, self.resource_group, self.location)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
text: az role assignment create --assignee <name> --role Contributor
- name: Revoke the service principal when done with it.
text: az ad app delete --id <name>
- name: Create using certificate from Key Vault
text: >
az keyvault certificate download --vault-name vault -n cert-name -f cert.pem \n
az ad sp create-for-rbac --cert @cert.pem
"""

helps['role'] = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from azure.cli.core.commands import CliArgumentType
from azure.cli.core.commands import register_cli_argument
from azure.cli.core.commands.parameters import enum_choice_list
from .custom import get_role_definition_name_completion_list
from .custom import get_role_definition_name_completion_list, x509_type
from ._validators import validate_group, validate_member_id, VARIANT_GROUP_ID_ARGS

register_cli_argument('ad app', 'application_object_id', options_list=('--object-id',))
Expand Down Expand Up @@ -36,6 +36,8 @@
register_cli_argument('ad sp create-for-rbac', 'role', completer=get_role_definition_name_completion_list)
register_cli_argument('ad sp create-for-rbac', 'skip_assignment', action='store_true', help='do not create default assignment')
register_cli_argument('ad sp create-for-rbac', 'expanded_view', action='store_true', help='Once created, display more information like subscription and cloud environments')
register_cli_argument('ad sp create-for-rbac', 'cert', type=x509_type)

register_cli_argument('ad sp reset-credentials', 'name', name_arg_type)
register_cli_argument('ad sp reset-credentials', 'years', type=int, default=None)
register_cli_argument('ad sp reset-credentials', 'create_cert', action='store_true', help='re-create and upload self-signed certificate')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def get_role_definition_name_completion_list(prefix, **kwargs): # pylint: disab
return [x.properties.role_name for x in list(definitions)]


def x509_type(cert):
x509 = _try_x509_pem(cert) or _try_x509_der(cert)
if not x509:
raise CLIError("The value provided for --cert was not a PEM or a DER file.")
return x509


def create_role_definition(role_definition):
return _create_update_role_definition(role_definition, for_update=False)

Expand Down Expand Up @@ -508,7 +515,8 @@ def create_service_principal_for_rbac(
'''create a service principal and configure its access to Azure resources
:param str name: a display name or an app id uri. Command will generate one if missing.
:param str password: the password used to login. If missing, command will generate one.
:param str cert: PEM formatted public certificate. Do not include private key info.
:param str cert: PEM or DER formatted public certificate using string or `@<file path>` to
load from a file. Do not include private key info.
:param str years: Years the password will be valid. Default: 1 year
:param str scopes: space separated scopes the service principal's role assignment applies to.
Defaults to the root of the current subscription.
Expand Down Expand Up @@ -682,17 +690,49 @@ def _create_self_signed_cert(years):
return (cert_string, creds_file)


def _normalize_cert(cert):
pem_data = cert
cert_header = '-----BEGIN CERTIFICATE-----'
cert_end = '-----END CERTIFICATE-----'
if not pem_data.startswith(cert_header):
pem_data = cert_header + '\n' + pem_data + '\n' + cert_end
def _try_x509_pem(cert):
import OpenSSL.crypto
try:
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
except OpenSSL.crypto.Error:
# could not load the pem, try with headers
try:
pem_with_headers = '-----BEGIN CERTIFICATE-----\n' \
+ cert + \
'-----END CERTIFICATE-----\n'
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_with_headers)
except OpenSSL.crypto.Error:
return None
except UnicodeEncodeError:
# this must be a binary encoding
return None


def _try_x509_der(cert):
import OpenSSL.crypto
import base64
try:
cert = base64.b64decode(cert)
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
except OpenSSL.crypto.Error:
return None


def _get_public(x509):
import OpenSSL.crypto
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem_data)
pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, x509)
if isinstance(pem, bytes):
pem = pem.decode("utf-8")
stripped = pem.replace('-----BEGIN CERTIFICATE-----\n', '')
stripped = stripped.replace('-----END CERTIFICATE-----\n', '')
return stripped


def _normalize_cert(x509):
logger.debug("normalizing x509 certificate with fingerprint %s", x509.digest("sha1"))
pkey = x509.get_notAfter().decode()
end_date = dateutil.parser.parse(pkey)
return cert.replace(cert_header, '').replace(cert_end, '').strip(), end_date
return _get_public(x509), end_date


def reset_service_principal_credential(name, password=None, create_cert=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def test_create_for_rbac_password_plumbed_through(self, graph_client_mock, auth_
@mock.patch('azure.cli.command_modules.role.custom._graph_client_factory', autospec=True)
@mock.patch('azure.cli.command_modules.role.custom.logger', autospec=True)
def test_create_for_rbac_use_cert_date(self, logger_mock, graph_client_mock, auth_client_mock):
import OpenSSL.crypto
test_app_id = 'app_id_123'
app = Application(app_id=test_app_id)

Expand All @@ -140,7 +141,7 @@ def mock_app_create(parameters):
curr_dir = os.path.dirname(os.path.realpath(__file__))
cert_file = os.path.join(curr_dir, 'cert.pem').replace('\\', '\\\\')
with open(cert_file) as f:
cert = f.read()
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())

name = 'mysp'
faked_graph_client.applications.create.side_effect = mock_app_create
Expand Down