Skip to content

Commit

Permalink
feat: ADC can load an impersonated service account credentials. (#956)
Browse files Browse the repository at this point in the history
* Make code changes in _default

* Add unit tests.

* Fix docstring.

Co-authored-by: arithmetic1728 <58957152+arithmetic1728@users.noreply.github.com>
  • Loading branch information
liuchaoren and arithmetic1728 committed Jan 25, 2022
1 parent 87706fd commit a8eb4c8
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 36 deletions.
151 changes: 115 additions & 36 deletions google/auth/_default.py
Expand Up @@ -35,7 +35,13 @@
_AUTHORIZED_USER_TYPE = "authorized_user"
_SERVICE_ACCOUNT_TYPE = "service_account"
_EXTERNAL_ACCOUNT_TYPE = "external_account"
_VALID_TYPES = (_AUTHORIZED_USER_TYPE, _SERVICE_ACCOUNT_TYPE, _EXTERNAL_ACCOUNT_TYPE)
_IMPERSONATED_SERVICE_ACCOUNT_TYPE = "impersonated_service_account"
_VALID_TYPES = (
_AUTHORIZED_USER_TYPE,
_SERVICE_ACCOUNT_TYPE,
_EXTERNAL_ACCOUNT_TYPE,
_IMPERSONATED_SERVICE_ACCOUNT_TYPE,
)

# Help message when no credentials can be found.
_HELP_MESSAGE = """\
Expand Down Expand Up @@ -79,7 +85,8 @@ def load_credentials_from_file(
"""Loads Google credentials from a file.
The credentials file must be a service account key, stored authorized
user credentials or external account credentials.
user credentials, external account credentials, or impersonated service
account credentials.
Args:
filename (str): The full path to the credentials file.
Expand Down Expand Up @@ -119,42 +126,25 @@ def load_credentials_from_file(
"File {} is not a valid json file.".format(filename), caught_exc
)
six.raise_from(new_exc, caught_exc)
return _load_credentials_from_info(
filename, info, scopes, default_scopes, quota_project_id, request
)


# The type key should indicate that the file is either a service account
# credentials file or an authorized user credentials file.
def _load_credentials_from_info(
filename, info, scopes, default_scopes, quota_project_id, request
):
credential_type = info.get("type")

if credential_type == _AUTHORIZED_USER_TYPE:
from google.oauth2 import credentials

try:
credentials = credentials.Credentials.from_authorized_user_info(
info, scopes=scopes
)
except ValueError as caught_exc:
msg = "Failed to load authorized user credentials from {}".format(filename)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
if quota_project_id:
credentials = credentials.with_quota_project(quota_project_id)
if not credentials.quota_project_id:
_warn_about_problematic_credentials(credentials)
return credentials, None
credentials, project_id = _get_authorized_user_credentials(
filename, info, scopes
)

elif credential_type == _SERVICE_ACCOUNT_TYPE:
from google.oauth2 import service_account

try:
credentials = service_account.Credentials.from_service_account_info(
info, scopes=scopes, default_scopes=default_scopes
)
except ValueError as caught_exc:
msg = "Failed to load service account credentials from {}".format(filename)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
if quota_project_id:
credentials = credentials.with_quota_project(quota_project_id)
return credentials, info.get("project_id")
credentials, project_id = _get_service_account_credentials(
filename, info, scopes, default_scopes
)

elif credential_type == _EXTERNAL_ACCOUNT_TYPE:
credentials, project_id = _get_external_account_credentials(
Expand All @@ -164,17 +154,19 @@ def load_credentials_from_file(
default_scopes=default_scopes,
request=request,
)
if quota_project_id:
credentials = credentials.with_quota_project(quota_project_id)
return credentials, project_id

elif credential_type == _IMPERSONATED_SERVICE_ACCOUNT_TYPE:
credentials, project_id = _get_impersonated_service_account_credentials(
filename, info, scopes
)
else:
raise exceptions.DefaultCredentialsError(
"The file {file} does not have a valid type. "
"Type is {type}, expected one of {valid_types}.".format(
file=filename, type=credential_type, valid_types=_VALID_TYPES
)
)
credentials = _apply_quota_project_id(credentials, quota_project_id)
return credentials, project_id


def _get_gcloud_sdk_credentials(quota_project_id=None):
Expand Down Expand Up @@ -371,6 +363,93 @@ def get_api_key_credentials(api_key_value):
return api_key.Credentials(api_key_value)


def _get_authorized_user_credentials(filename, info, scopes=None):
from google.oauth2 import credentials

try:
credentials = credentials.Credentials.from_authorized_user_info(
info, scopes=scopes
)
except ValueError as caught_exc:
msg = "Failed to load authorized user credentials from {}".format(filename)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
return credentials, None


def _get_service_account_credentials(filename, info, scopes=None, default_scopes=None):
from google.oauth2 import service_account

try:
credentials = service_account.Credentials.from_service_account_info(
info, scopes=scopes, default_scopes=default_scopes
)
except ValueError as caught_exc:
msg = "Failed to load service account credentials from {}".format(filename)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
return credentials, info.get("project_id")


def _get_impersonated_service_account_credentials(filename, info, scopes):
from google.auth import impersonated_credentials

try:
source_credentials_info = info.get("source_credentials")
source_credentials_type = source_credentials_info.get("type")
if source_credentials_type == _AUTHORIZED_USER_TYPE:
source_credentials, _ = _get_authorized_user_credentials(
filename, source_credentials_info
)
elif source_credentials_type == _SERVICE_ACCOUNT_TYPE:
source_credentials, _ = _get_service_account_credentials(
filename, source_credentials_info
)
else:
raise ValueError(
"source credential of type {} is not supported.".format(
source_credentials_type
)
)
impersonation_url = info.get("service_account_impersonation_url")
start_index = impersonation_url.rfind("/")
end_index = impersonation_url.find(":generateAccessToken")
if start_index == -1 or end_index == -1 or start_index > end_index:
raise ValueError(
"Cannot extract target principal from {}".format(impersonation_url)
)
target_principal = impersonation_url[start_index + 1 : end_index]
delegates = info.get("delegates")
quota_project_id = info.get("quota_project_id")
credentials = impersonated_credentials.Credentials(
source_credentials,
target_principal,
scopes,
delegates,
quota_project_id=quota_project_id,
)
except ValueError as caught_exc:
msg = "Failed to load impersonated service account credentials from {}".format(
filename
)
new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
six.raise_from(new_exc, caught_exc)
return credentials, None


def _apply_quota_project_id(credentials, quota_project_id):
if quota_project_id:
credentials = credentials.with_quota_project(quota_project_id)

from google.oauth2 import credentials as authorized_user_credentials

if isinstance(credentials, authorized_user_credentials.Credentials) and (
not credentials.quota_project_id
):
_warn_about_problematic_credentials(credentials)
return credentials


def default(scopes=None, request=None, quota_project_id=None, default_scopes=None):
"""Gets the default credentials for the current environment.
Expand Down
@@ -0,0 +1,13 @@
{
"delegates": [
"service-account-delegate@example.com"
],
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-account-target@example.com:generateAccessToken",
"source_credentials": {
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"type": "authorized_user"
},
"type": "impersonated_service_account"
}
@@ -0,0 +1,17 @@
{
"delegates": [
"service-account-delegate@example.com"
],
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-account-target@example.com:generateAccessToken",
"source_credentials": {
"type": "service_account",
"project_id": "example-project",
"private_key_id": "1",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj\n7wZgkdmM7oVK2OfgrSj/FCTkInKPqaCR0gD7K80q+mLBrN3PUkDrJQZpvRZIff3/\nxmVU1WeruQLFJjnFb2dqu0s/FY/2kWiJtBCakXvXEOb7zfbINuayL+MSsCGSdVYs\nSliS5qQpgyDap+8b5fpXZVJkq92hrcNtbkg7hCYUJczt8n9hcCTJCfUpApvaFQ18\npe+zpyl4+WzkP66I28hniMQyUlA1hBiskT7qiouq0m8IOodhv2fagSZKjOTTU2xk\nSBc//fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQABAoIBAQDGGHzQxGKX+ANk\nnQi53v/c6632dJKYXVJC+PDAz4+bzU800Y+n/bOYsWf/kCp94XcG4Lgsdd0Gx+Zq\nHD9CI1IcqqBRR2AFscsmmX6YzPLTuEKBGMW8twaYy3utlFxElMwoUEsrSWRcCA1y\nnHSDzTt871c7nxCXHxuZ6Nm/XCL7Bg8uidRTSC1sQrQyKgTPhtQdYrPQ4WZ1A4J9\nIisyDYmZodSNZe5P+LTJ6M1SCgH8KH9ZGIxv3diMwzNNpk3kxJc9yCnja4mjiGE2\nYCNusSycU5IhZwVeCTlhQGcNeV/skfg64xkiJE34c2y2ttFbdwBTPixStGaF09nU\nZ422D40BAoGBAPvVyRRsC3BF+qZdaSMFwI1yiXY7vQw5+JZh01tD28NuYdRFzjcJ\nvzT2n8LFpj5ZfZFvSMLMVEFVMgQvWnN0O6xdXvGov6qlRUSGaH9u+TCPNnIldjMP\nB8+xTwFMqI7uQr54wBB+Poq7dVRP+0oHb0NYAwUBXoEuvYo3c/nDoRcZAoGBAOWl\naLHjMv4CJbArzT8sPfic/8waSiLV9Ixs3Re5YREUTtnLq7LoymqB57UXJB3BNz/2\neCueuW71avlWlRtE/wXASj5jx6y5mIrlV4nZbVuyYff0QlcG+fgb6pcJQuO9DxMI\naqFGrWP3zye+LK87a6iR76dS9vRU+bHZpSVvGMKJAoGAFGt3TIKeQtJJyqeUWNSk\nklORNdcOMymYMIlqG+JatXQD1rR6ThgqOt8sgRyJqFCVT++YFMOAqXOBBLnaObZZ\nCFbh1fJ66BlSjoXff0W+SuOx5HuJJAa5+WtFHrPajwxeuRcNa8jwxUsB7n41wADu\nUqWWSRedVBg4Ijbw3nWwYDECgYB0pLew4z4bVuvdt+HgnJA9n0EuYowVdadpTEJg\nsoBjNHV4msLzdNqbjrAqgz6M/n8Ztg8D2PNHMNDNJPVHjJwcR7duSTA6w2p/4k28\nbvvk/45Ta3XmzlxZcZSOct3O31Cw0i2XDVc018IY5be8qendDYM08icNo7vQYkRH\n504kQQKBgQDjx60zpz8ozvm1XAj0wVhi7GwXe+5lTxiLi9Fxq721WDxPMiHDW2XL\nYXfFVy/9/GIMvEiGYdmarK1NW+VhWl1DC5xhDg0kvMfxplt4tynoq1uTsQTY31Mx\nBeF5CT/JuNYk3bEBF0H/Q3VGO1/ggVS+YezdFbLWIRoMnLj6XCFEGg==\n-----END RSA PRIVATE KEY-----\n",
"client_email": "service-account@example.com",
"client_id": "1234",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token"
},
"type": "impersonated_service_account"
}
14 changes: 14 additions & 0 deletions tests/data/impersonated_service_account_with_quota_project.json
@@ -0,0 +1,14 @@
{
"delegates": [
"service-account-delegate@example.com"
],
"quota_project_id": "quota_project",
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/service-account-target@example.com:generateAccessToken",
"source_credentials": {
"client_id": "123",
"client_secret": "secret",
"refresh_token": "alabalaportocala",
"type": "authorized_user"
},
"type": "impersonated_service_account"
}
92 changes: 92 additions & 0 deletions tests/test__default.py
Expand Up @@ -28,6 +28,7 @@
from google.auth import exceptions
from google.auth import external_account
from google.auth import identity_pool
from google.auth import impersonated_credentials
from google.oauth2 import service_account
import google.oauth2.credentials

Expand Down Expand Up @@ -128,6 +129,19 @@
"workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
}

IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join(
DATA_DIR, "impersonated_service_account_authorized_user_source.json"
)

IMPERSONATED_SERVICE_ACCOUNT_WITH_QUOTA_PROJECT_FILE = os.path.join(
DATA_DIR, "impersonated_service_account_with_quota_project.json"
)

IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE = os.path.join(
DATA_DIR, "impersonated_service_account_service_account_source.json"
)


MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS

Expand Down Expand Up @@ -278,6 +292,84 @@ def test_load_credentials_from_file_service_account_bad_format(tmpdir):
assert excinfo.match(r"missing fields")


def test_load_credentials_from_file_impersonated_with_authorized_user_source():
credentials, project_id = _default.load_credentials_from_file(
IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE
)
assert isinstance(credentials, impersonated_credentials.Credentials)
assert isinstance(
credentials._source_credentials, google.oauth2.credentials.Credentials
)
assert credentials.service_account_email == "service-account-target@example.com"
assert credentials._delegates == ["service-account-delegate@example.com"]
assert not credentials._quota_project_id
assert not credentials._target_scopes
assert project_id is None


def test_load_credentials_from_file_impersonated_with_quota_project():
credentials, _ = _default.load_credentials_from_file(
IMPERSONATED_SERVICE_ACCOUNT_WITH_QUOTA_PROJECT_FILE
)
assert isinstance(credentials, impersonated_credentials.Credentials)
assert credentials._quota_project_id == "quota_project"


def test_load_credentials_from_file_impersonated_with_service_account_source():
credentials, _ = _default.load_credentials_from_file(
IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE
)
assert isinstance(credentials, impersonated_credentials.Credentials)
assert isinstance(credentials._source_credentials, service_account.Credentials)
assert not credentials._quota_project_id


def test_load_credentials_from_file_impersonated_passing_quota_project():
credentials, _ = _default.load_credentials_from_file(
IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE,
quota_project_id="new_quota_project",
)
assert credentials._quota_project_id == "new_quota_project"


def test_load_credentials_from_file_impersonated_passing_scopes():
credentials, _ = _default.load_credentials_from_file(
IMPERSONATED_SERVICE_ACCOUNT_SERVICE_ACCOUNT_SOURCE_FILE,
scopes=["scope1", "scope2"],
)
assert credentials._target_scopes == ["scope1", "scope2"]


def test_load_credentials_from_file_impersonated_wrong_target_principal(tmpdir):

with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh:
impersonated_credentials_info = json.load(fh)
impersonated_credentials_info[
"service_account_impersonation_url"
] = "something_wrong"

jsonfile = tmpdir.join("invalid.json")
jsonfile.write(json.dumps(impersonated_credentials_info))
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default.load_credentials_from_file(str(jsonfile))

assert excinfo.match(r"Cannot extract target principal")


def test_load_credentials_from_file_impersonated_wrong_source_type(tmpdir):

with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE) as fh:
impersonated_credentials_info = json.load(fh)
impersonated_credentials_info["source_credentials"]["type"] = "external_account"

jsonfile = tmpdir.join("invalid.json")
jsonfile.write(json.dumps(impersonated_credentials_info))
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default.load_credentials_from_file(str(jsonfile))

assert excinfo.match(r"source credential of type external_account is not supported")


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_identity_pool(
get_project_id, tmpdir
Expand Down

0 comments on commit a8eb4c8

Please sign in to comment.