Skip to content

Commit

Permalink
fix: add fetch_id_token_credentials (#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
arithmetic1728 authored Oct 29, 2021
1 parent 77d7f1b commit 8f1e9cf
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 47 deletions.
75 changes: 63 additions & 12 deletions google/oauth2/id_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import jwt
import google.auth.transport.requests


# The URL that provides public certificates for verifying ID tokens issued
Expand Down Expand Up @@ -201,8 +202,8 @@ def verify_firebase_token(id_token, request, audience=None, clock_skew_in_second
)


def fetch_id_token(request, audience):
"""Fetch the ID Token from the current environment.
def fetch_id_token_credentials(audience, request=None):
"""Create the ID Token credentials from the current environment.
This function acquires ID token from the environment in the following order.
See https://google.aip.dev/auth/4110.
Expand All @@ -224,15 +225,22 @@ def fetch_id_token(request, audience):
request = google.auth.transport.requests.Request()
target_audience = "https://pubsub.googleapis.com"
id_token = google.oauth2.id_token.fetch_id_token(request, target_audience)
# Create ID token credentials.
credentials = google.oauth2.id_token.fetch_id_token_credentials(target_audience, request=request)
# Refresh the credential to obtain an ID token.
credentials.refresh(request)
id_token = credentials.token
id_token_expiry = credentials.expiry
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
audience (str): The audience that this ID token is intended for.
request (Optional[google.auth.transport.Request]): A callable used to make
HTTP requests. A request object will be created if not provided.
Returns:
str: The ID token.
google.auth.credentials.Credentials: The ID token credentials.
Raises:
~google.auth.exceptions.DefaultCredentialsError:
Expand All @@ -257,11 +265,9 @@ def fetch_id_token(request, audience):

info = json.load(f)
if info.get("type") == "service_account":
credentials = service_account.IDTokenCredentials.from_service_account_info(
return service_account.IDTokenCredentials.from_service_account_info(
info, target_audience=audience
)
credentials.refresh(request)
return credentials.token
except ValueError as caught_exc:
new_exc = exceptions.DefaultCredentialsError(
"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
Expand All @@ -275,15 +281,60 @@ def fetch_id_token(request, audience):
from google.auth import compute_engine
from google.auth.compute_engine import _metadata

# Create a request object if not provided.
if not request:
request = google.auth.transport.requests.Request()

if _metadata.ping(request):
credentials = compute_engine.IDTokenCredentials(
return compute_engine.IDTokenCredentials(
request, audience, use_metadata_identity_endpoint=True
)
credentials.refresh(request)
return credentials.token
except (ImportError, exceptions.TransportError):
pass

raise exceptions.DefaultCredentialsError(
"Neither metadata server or valid service account credentials are found."
)


def fetch_id_token(request, audience):
"""Fetch the ID Token from the current environment.
This function acquires ID token from the environment in the following order.
See https://google.aip.dev/auth/4110.
1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
to the path of a valid service account JSON file, then ID token is
acquired using this service account credentials.
2. If the application is running in Compute Engine, App Engine or Cloud Run,
then the ID token are obtained from the metadata server.
3. If metadata server doesn't exist and no valid service account credentials
are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
be raised.
Example::
import google.oauth2.id_token
import google.auth.transport.requests
request = google.auth.transport.requests.Request()
target_audience = "https://pubsub.googleapis.com"
id_token = google.oauth2.id_token.fetch_id_token(request, target_audience)
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
audience (str): The audience that this ID token is intended for.
Returns:
str: The ID token.
Raises:
~google.auth.exceptions.DefaultCredentialsError:
If metadata server doesn't exist and no valid service account
credentials are found.
"""
id_token_credentials = fetch_id_token_credentials(audience, request=request)
id_token_credentials.refresh(request)
return id_token_credentials.token
88 changes: 53 additions & 35 deletions tests/oauth2/test_id_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import transport
import google.auth.compute_engine._metadata
from google.oauth2 import id_token
from google.oauth2 import service_account

SERVICE_ACCOUNT_FILE = os.path.join(
os.path.dirname(__file__), "../data/service_account.json"
)
ID_TOKEN_AUDIENCE = "https://pubsub.googleapis.com"


def make_request(status, data=None):
Expand Down Expand Up @@ -201,93 +201,111 @@ def test_verify_firebase_token_clock_skew(verify_token):
)


def test_fetch_id_token_from_metadata_server(monkeypatch):
def test_fetch_id_token_credentials_optional_request(monkeypatch):
monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)

def mock_init(self, request, audience, use_metadata_identity_endpoint):
assert use_metadata_identity_endpoint
self.token = "id_token"

# Test a request object is created if not provided
with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
with mock.patch.multiple(
google.auth.compute_engine.IDTokenCredentials,
__init__=mock_init,
refresh=mock.Mock(),
with mock.patch(
"google.auth.compute_engine.IDTokenCredentials.__init__", return_value=None
):
request = mock.Mock()
token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
assert token == "id_token"
with mock.patch(
"google.auth.transport.requests.Request.__init__", return_value=None
) as mock_request:
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
mock_request.assert_called()


def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch):
monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE)
def test_fetch_id_token_credentials_from_metadata_server(monkeypatch):
monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)

mock_req = mock.Mock()

with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
with mock.patch(
"google.auth.compute_engine.IDTokenCredentials.__init__", return_value=None
) as mock_init:
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE, request=mock_req)
mock_init.assert_called_once_with(
mock_req, ID_TOKEN_AUDIENCE, use_metadata_identity_endpoint=True
)

def mock_refresh(self, request):
self.token = "id_token"

with mock.patch.object(service_account.IDTokenCredentials, "refresh", mock_refresh):
request = mock.Mock()
token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
assert token == "id_token"
def test_fetch_id_token_credentials_from_explicit_cred_json_file(monkeypatch):
monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE)

cred = id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert isinstance(cred, service_account.IDTokenCredentials)
assert cred._target_audience == ID_TOKEN_AUDIENCE


def test_fetch_id_token_no_cred_exists(monkeypatch):
def test_fetch_id_token_credentials_no_cred_exists(monkeypatch):
monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)

with mock.patch(
"google.auth.compute_engine._metadata.ping",
side_effect=exceptions.TransportError(),
):
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
request = mock.Mock()
id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert excinfo.match(
r"Neither metadata server or valid service account credentials are found."
)

with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
request = mock.Mock()
id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert excinfo.match(
r"Neither metadata server or valid service account credentials are found."
)


def test_fetch_id_token_invalid_cred_file_type(monkeypatch):
def test_fetch_id_token_credentials_invalid_cred_file_type(monkeypatch):
user_credentials_file = os.path.join(
os.path.dirname(__file__), "../data/authorized_user.json"
)
monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file)

with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
request = mock.Mock()
id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert excinfo.match(
r"Neither metadata server or valid service account credentials are found."
)


def test_fetch_id_token_invalid_json(monkeypatch):
def test_fetch_id_token_credentials_invalid_json(monkeypatch):
not_json_file = os.path.join(os.path.dirname(__file__), "../data/public_cert.pem")
monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)

with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
request = mock.Mock()
id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert excinfo.match(
r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials."
)


def test_fetch_id_token_invalid_cred_path(monkeypatch):
def test_fetch_id_token_credentials_invalid_cred_path(monkeypatch):
not_json_file = os.path.join(os.path.dirname(__file__), "../data/not_exists.json")
monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)

with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
request = mock.Mock()
id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
assert excinfo.match(
r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
)


def test_fetch_id_token(monkeypatch):
mock_cred = mock.MagicMock()
mock_cred.token = "token"

mock_req = mock.Mock()

with mock.patch(
"google.oauth2.id_token.fetch_id_token_credentials", return_value=mock_cred
) as mock_fetch:
token = id_token.fetch_id_token(mock_req, ID_TOKEN_AUDIENCE)
mock_fetch.assert_called_once_with(ID_TOKEN_AUDIENCE, request=mock_req)
mock_cred.refresh.assert_called_once_with(mock_req)
assert token == "token"

0 comments on commit 8f1e9cf

Please sign in to comment.