Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds X509 workload cert logic #1527

Merged
merged 8 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 136 additions & 10 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

import json
import logging
from os import path
from os import environ, path
import re
import subprocess

from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
clundin25 marked this conversation as resolved.
Show resolved Hide resolved
_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
Expand Down Expand Up @@ -63,26 +65,150 @@ def _check_dca_metadata_path(metadata_path):
return metadata_path


def _read_dca_metadata_file(metadata_path):
"""Loads context aware metadata from the given path.
def _load_json_file(path):
"""Reads and loads JSON from the given path. Used to read both X509 workload certificate and
secure connect configurations.

Args:
metadata_path (str): context aware metadata path.
path (str): the path to read from.

Returns:
Dict[str, str]: The metadata.
Dict[str, str]: The JSON stored at the file.

Raises:
google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON.
google.auth.exceptions.ClientCertError: If failed to parse the file as JSON.
"""
try:
with open(metadata_path) as f:
metadata = json.load(f)
with open(path) as f:
json_data = json.load(f)
except ValueError as caught_exc:
new_exc = exceptions.ClientCertError(caught_exc)
raise new_exc from caught_exc

return metadata
return json_data


def _get_workload_cert_and_key(certificate_config_path=None):
"""Read the workload identity cert and key files specified in the certificate config provided.
If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG"
first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json".

Args:
certificate_config_path (string): The certificate config path. If no path is provided,
the environment variable will be checked first, then the well known gcloud location.

Returns:
Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key
bytes in PEM format.

Raises:
google.auth.exceptions.ClientCertError: if problems occurs when retrieving
the certificate or key information.
"""
absolute_path = _get_cert_config_path(certificate_config_path)
if absolute_path is None:
return None, None
clundin25 marked this conversation as resolved.
Show resolved Hide resolved
data = _load_json_file(absolute_path)

if "cert_configs" not in data:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format(
absolute_path
)
)
cert_configs = data["cert_configs"]

if "workload" not in cert_configs:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format(
absolute_path
)
)
workload = cert_configs["workload"]

if "cert_path" not in workload:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format(
absolute_path
)
)
cert_path = workload["cert_path"]

if "key_path" not in workload:
raise exceptions.ClientCertError(
'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format(
absolute_path
)
)
key_path = workload["key_path"]

return _read_cert_and_key_files(cert_path, key_path)


def _get_cert_config_path(certificate_config_path=None):
"""Gets the certificate configuration full path using the following order of precedence:

1: Explicit override, if set
2: Environment variable, if set
3: Well-known location

Returns "None" if the selected config file does not exist.

Args:
certificate_config_path (string): The certificate config path. If provided, the well known
location and environment variable will be ignored.

Returns:
The absolute path of the certificate config file, and None if the file does not exist.
"""

if certificate_config_path is None:
env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
return None
return certificate_config_path


def _read_cert_and_key_files(cert_path, key_path):
cert_data = _read_cert_file(cert_path)
key_data = _read_key_file(key_path)

return cert_data, key_data


def _read_cert_file(cert_path):
with open(cert_path, "rb") as cert_file:
cert_data = cert_file.read()

cert_match = re.findall(_CERT_REGEX, cert_data)
if len(cert_match) != 1:
raise exceptions.ClientCertError(
"Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format(
cert_path
)
)
return cert_match[0]


def _read_key_file(key_path):
with open(key_path, "rb") as key_file:
key_data = key_file.read()

key_match = re.findall(_KEY_REGEX, key_data)
if len(key_match) != 1:
raise exceptions.ClientCertError(
"Private key file {} is in an invalid format, a single PEM formatted private key is expected".format(
key_path
)
)

return key_match[0]


def _run_cert_provider_command(command, expect_encrypted_key=False):
Expand Down Expand Up @@ -163,7 +289,7 @@ def get_client_ssl_credentials(
metadata_path = _check_dca_metadata_path(context_aware_metadata_path)

if metadata_path:
metadata_json = _read_dca_metadata_file(metadata_path)
metadata_json = _load_json_file(metadata_path)

if _CERT_PROVIDER_COMMAND not in metadata_json:
raise exceptions.ClientCertError("Cert provider command is not found")
Expand Down
Loading