Skip to content

Commit

Permalink
feat: adds X509 workload cert logic (#1527)
Browse files Browse the repository at this point in the history
* feat: adds X509 workload cert logic

* add JSON checking, and edits comments

* Adds comment with more explanation

* fix test coverage

---------

Co-authored-by: Leo <39062083+lsirac@users.noreply.github.com>
  • Loading branch information
aeitzman and lsirac committed Jun 10, 2024
1 parent 6c15c9a commit 05220e0
Show file tree
Hide file tree
Showing 3 changed files with 339 additions and 62 deletions.
146 changes: 136 additions & 10 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

import json
import logging
from os import path
from os import environ, path
import re
import subprocess

from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
Expand Down Expand Up @@ -63,26 +65,150 @@ def _check_dca_metadata_path(metadata_path):
return metadata_path


def _read_dca_metadata_file(metadata_path):
"""Loads context aware metadata from the given path.
def _load_json_file(path):
"""Reads and loads JSON from the given path. Used to read both X509 workload certificate and
secure connect configurations.
Args:
metadata_path (str): context aware metadata path.
path (str): the path to read from.
Returns:
Dict[str, str]: The metadata.
Dict[str, str]: The JSON stored at the file.
Raises:
google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON.
google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
"""
try:
with open(metadata_path) as f:
metadata = json.load(f)
with open(path) as f:
json_data = json.load(f)
except ValueError as caught_exc:
new_exc = exceptions.ClientCertError(caught_exc)
raise new_exc from caught_exc

return metadata
return json_data


def _get_workload_cert_and_key(certificate_config_path=None):
"""Read the workload identity cert and key files specified in the certificate config provided.
If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".
Args:
certificate_config_path (string): The certificate config path. If no path is provided,
the environment variable will be checked first, then the well known gcloud location.
Returns:
Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
bytes in PEM format.
Raises:
google.auth.exceptions.ClientCertError: if problems occurs when retrieving
the certificate or key information.
"""
absolute_path = _get_cert_config_path(certificate_config_path)
if absolute_path is None:
return None, None
data = _load_json_file(absolute_path)

if "cert_configs" not in data:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
absolute_path
)
)
cert_configs = data["cert_configs"]

if "workload" not in cert_configs:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format(
absolute_path
)
)
workload = cert_configs["workload"]

if "cert_path" not in workload:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format(
absolute_path
)
)
cert_path = workload["cert_path"]

if "key_path" not in workload:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format(
absolute_path
)
)
key_path = workload["key_path"]

return _read_cert_and_key_files(cert_path, key_path)


def _get_cert_config_path(certificate_config_path=None):
"""Gets the certificate configuration full path using the following order of precedence:
1: Explicit override, if set
2: Environment variable, if set
3: Well-known location
Returns "None" if the selected config file does not exist.
Args:
certificate_config_path (string): The certificate config path. If provided, the well known
location and environment variable will be ignored.
Returns:
The absolute path of the certificate config file, and None if the file does not exist.
"""

if certificate_config_path is None:
env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
return None
return certificate_config_path


def _read_cert_and_key_files(cert_path, key_path):
cert_data = _read_cert_file(cert_path)
key_data = _read_key_file(key_path)

return cert_data, key_data


def _read_cert_file(cert_path):
with open(cert_path, "rb") as cert_file:
cert_data = cert_file.read()

cert_match = re.findall(_CERT_REGEX, cert_data)
if len(cert_match) != 1:
raise exceptions.ClientCertError(
"Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
cert_path
)
)
return cert_match[0]


def _read_key_file(key_path):
with open(key_path, "rb") as key_file:
key_data = key_file.read()

key_match = re.findall(_KEY_REGEX, key_data)
if len(key_match) != 1:
raise exceptions.ClientCertError(
"Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
key_path
)
)

return key_match[0]


def _run_cert_provider_command(command, expect_encrypted_key=False):
Expand Down Expand Up @@ -163,7 +289,7 @@ def get_client_ssl_credentials(
metadata_path = _check_dca_metadata_path(context_aware_metadata_path)

if metadata_path:
metadata_json = _read_dca_metadata_file(metadata_path)
metadata_json = _load_json_file(metadata_path)

if _CERT_PROVIDER_COMMAND not in metadata_json:
raise exceptions.ClientCertError("Cert provider command is not found")
Expand Down
Loading

0 comments on commit 05220e0

Please sign in to comment.