Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for imersonated_credentials.Sign, IDToken #348

Merged
merged 7 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ also provides integration with several HTTP libraries.

- Support for Google :func:`Application Default Credentials <google.auth.default>`.
- Support for signing and verifying :mod:`JWTs <google.auth.jwt>`.
- Support for creating `Google ID Tokens <user-guide.html#identity-tokens>`__.
- Support for verifying and decoding :mod:`ID Tokens <google.oauth2.id_token>`.
- Support for Google :mod:`Service Account credentials <google.oauth2.service_account>`.
- Support for Google :mod:`Impersonated Credentials <google.auth.impersonated_credentials>`.
Expand Down
50 changes: 50 additions & 0 deletions docs/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,56 @@ In the example above `source_credentials` does not have direct access to list bu
in the target project. Using `ImpersonatedCredentials` will allow the source_credentials
to assume the identity of a target_principal that does have access.

Identity Tokens
+++++++++++++++

`Google OpenID Connect`_ tokens are avaiable through both ServiceAccount, ImpersonatedCredentials,
and Compute modules. These tokens can be used to authenticate against `Cloud Functions`_,
`Cloud Run`_, a user service behind `Identity Aware Proxy`_ or any other service capable
of verifying a `Google ID Token`_.

ServiceAccount ::

from google.oauth2 import service_account

target_audience = 'https://example.com'

creds = service_account.IDTokenCredentials.from_service_account_file(
'/path/to/svc.json',
target_audience=target_audience)


Compute ::

from google.auth import compute_engine
import google.auth.transport.requests

target_audience = 'https://example.com'

request = google.auth.transport.requests.Request()
creds = compute_engine.IDTokenCredentials(request,
target_audience=target_audience)

Impersonated ::

from google.auth import impersonated_credentials

# get target_credentials from a source_credentials

target_audience = 'https://example.com'

creds = impersonated_credentials.IDTokenCredentials(
target_credentials,
target_audience=target_audience)

IDToken verification can be done for various type of IDTokens using the :class:`google.oauth2.id_token` module

.. _Cloud Functions: https://cloud.google.com/functions/
.. _Cloud Run: https://cloud.google.com/run/
.. _Identity Aware Proxy: https://cloud.google.com/iap/
.. _Google OpenID Connect: https://developers.google.com/identity/protocols/OpenIDConnect
.. _Google ID Token: https://developers.google.com/identity/protocols/OpenIDConnect#validatinganidtoken

Making authenticated requests
-----------------------------

Expand Down
123 changes: 121 additions & 2 deletions google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
https://cloud.google.com/iam/credentials/reference/rest/
"""

import base64
import copy
from datetime import datetime
import json
Expand All @@ -35,6 +36,8 @@
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
from google.auth import jwt
from google.auth.transport.requests import AuthorizedSession

_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds

Expand All @@ -43,8 +46,18 @@
_IAM_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
'/serviceAccounts/{}:generateAccessToken')

_IAM_SIGN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
'/serviceAccounts/{}:signBlob')

_IAM_IDTOKEN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/' +
'projects/-/serviceAccounts/{}:generateIdToken')

_REFRESH_ERROR = 'Unable to acquire impersonated credentials'

_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds

_DEFAULT_TOKEN_URI = 'https://oauth2.googleapis.com/token'


def _make_iam_token_request(request, principal, headers, body):
"""Makes a request to the Google Cloud IAM service for an access token.
Expand Down Expand Up @@ -94,7 +107,7 @@ def _make_iam_token_request(request, principal, headers, body):
six.raise_from(new_exc, caught_exc)


class Credentials(credentials.Credentials):
class Credentials(credentials.Credentials, credentials.Signing):
"""This module defines impersonated credentials which are essentially
impersonated identities.

Expand Down Expand Up @@ -172,7 +185,8 @@ def __init__(self, source_credentials, target_principal,
granted to the prceeding identity. For example, if set to
[serviceAccountB, serviceAccountC], the source_credential
must have the Token Creator role on serviceAccountB.
serviceAccountB must have the Token Creator on serviceAccountC.
credentials.refresh(request) must have the Token Creator on
salrashid123 marked this conversation as resolved.
Show resolved Hide resolved
serviceAccountC.
Finally, C must have Token Creator on target_principal.
If left unset, source_credential must have that role on
target_principal.
Expand Down Expand Up @@ -229,3 +243,108 @@ def _update_token(self, request):
principal=self._target_principal,
headers=headers,
body=body)

def sign_bytes(self, message):

iam_sign_endpoint = _IAM_SIGN_ENDPOINT.format(self._target_principal)

body = {
"payload": base64.b64encode(message),
"delegates": self._delegates
}

headers = {
'Content-Type': 'application/json',
}

authed_session = AuthorizedSession(self._source_credentials)

salrashid123 marked this conversation as resolved.
Show resolved Hide resolved
response = authed_session.post(
url=iam_sign_endpoint,
headers=headers,
data=json.dumps(body).encode('utf-8'))

return base64.b64decode(response.json()['signedBlob'])

@property
def signer_email(self):
return self._target_principal

@property
def service_account_email(self):
return self._target_principal

@property
def signer(self):
return self


class IDTokenCredentials(credentials.Credentials):
"""Open ID Connect ID Token-based service account credentials.

"""
def __init__(self, target_credentials,
target_audience=None, include_email=False):
"""
Args:
target_credentials (google.auth.Credentials): The target
credential used as to acquire the id tokens for.
target_audience (string): Audience to issue the token for.
include_email (bool): Include email in IdToken
"""
super(IDTokenCredentials, self).__init__()

if not isinstance(target_credentials,
Credentials):
raise exceptions.GoogleAuthError("Provided Credential must be "
"impersonated_credentials")
self._target_credentials = target_credentials
self._target_audience = target_audience
self._include_email = include_email

def from_credentials(self, target_credentials,
target_audience=None):
return self.__class__(
target_credentials=self._target_credentials,
target_audience=target_audience)

def with_target_audience(self, target_audience):
return self.__class__(
target_credentials=self._target_credentials,
target_audience=target_audience)

def with_include_email(self, include_email):
return self.__class__(
target_credentials=self._target_credentials,
target_audience=self._target_audience,
include_email=include_email)

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):

iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format(self.
_target_credentials.
signer_email)

body = {
"audience": self._target_audience,
"delegates": self._target_credentials._delegates,
"includeEmail": self._include_email
}

headers = {
'Content-Type': 'application/json',
}

authed_session = AuthorizedSession(self._target_credentials.
_source_credentials)

response = authed_session.post(
salrashid123 marked this conversation as resolved.
Show resolved Hide resolved
url=iam_sign_endpoint,
headers=headers,
data=json.dumps(body).encode('utf-8'))

id_token = response.json()['token']
self.token = id_token
self.expiry = datetime.fromtimestamp(jwt.decode(id_token,
verify=False)['exp'])
96 changes: 94 additions & 2 deletions tests/test_impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ class TestImpersonatedCredentials(object):
SOURCE_CREDENTIALS = service_account.Credentials(
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)

def make_credentials(self, lifetime=LIFETIME):
def make_credentials(self, lifetime=LIFETIME,
target_principal=TARGET_PRINCIPAL):

return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
target_principal=self.TARGET_PRINCIPAL,
target_principal=target_principal,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime)
Expand Down Expand Up @@ -176,3 +178,93 @@ def test_refresh_failure_http_error(self, mock_donor_credentials):
def test_expired(self):
credentials = self.make_credentials(lifetime=None)
assert credentials.expired

def test_signer(self):
credentials = self.make_credentials()
assert isinstance(credentials.signer,
impersonated_credentials.Credentials)

def test_signer_email(self):
credentials = self.make_credentials(
target_principal=self.TARGET_PRINCIPAL)
assert credentials.signer_email == self.TARGET_PRINCIPAL

def test_service_account_email(self):
credentials = self.make_credentials(
target_principal=self.TARGET_PRINCIPAL)
assert credentials.service_account_email == self.TARGET_PRINCIPAL

def test_sign_bytes(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = 'token'

expire_time = (
_helpers.utcnow().replace(microsecond=0) +
datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
response_body = {
"accessToken": token,
"expireTime": expire_time
}

request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)

# sign_response_body = {
# "keyId": "1",
# "signedBlob": "c2lnbmF0dXJl"
# }
# sign_response = mock.create_autospec(transport.Response,
# instance=True)
# sign_response.data = sign_response_body
# sign_response.status = http_client.OK
# sign_request = self.make_request(
# data=json.dumps(sign_response_body),
# status=http_client.OK, side_effect=[sign_response])

credentials.refresh(request)

assert credentials.valid
assert not credentials.expired

# signature = credentials.sign_bytes(b"some bytes")
# assert signature == b'signature'

def test_id_token_success(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = 'token'
# idtoken = 'idtoken'
# target_audience = 'https://foo.bar'

expire_time = (
_helpers.utcnow().replace(microsecond=0) +
datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
response_body = {
"accessToken": token,
"expireTime": expire_time
}

request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)

# id_token_response_body = {
# "token": idtoken
# }
# id_token_response = mock.create_autospec(transport.Response,
# instance=True)
# id_token_response.data = id_token_response_body
# id_token_response.status = http_client.OK
# sign_request = self.make_request(
# data=json.dumps(id_token_response_body),
# status=http_client.OK)

credentials.refresh(request)

assert credentials.valid
assert not credentials.expired

# id_creds = impersonated_credentials.IDTokenCredentials(
# credentials, target_audience=target_audience)
# id_creds.refresh(request)
# assert id_creds.token == idtoken