diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index 429980210..c6822776b 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -226,10 +226,6 @@ def __init__( def refresh(self, request): self._update_token(request) - @property - def expired(self): - return _helpers.utcnow() >= self.expiry - def _update_token(self, request): """Updates credentials with a new access_token representing the impersonated account. @@ -239,8 +235,9 @@ def _update_token(self, request): to use for refreshing credentials. """ - # Refresh our source credentials. - self._source_credentials.refresh(request) + # Refresh our source credentials if it is not valid. + if not self._source_credentials.valid: + self._source_credentials.refresh(request) body = { "delegates": self._delegates, @@ -347,7 +344,9 @@ def refresh(self, request): headers = {"Content-Type": "application/json"} - authed_session = AuthorizedSession(self._target_credentials._source_credentials) + authed_session = AuthorizedSession( + self._target_credentials._source_credentials, auth_request=request + ) response = authed_session.post( url=iam_sign_endpoint, diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 19e2f3421..e0b5b1179 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -172,6 +172,44 @@ def test_refresh_success(self, use_data_bytes, mock_donor_credentials): assert credentials.valid assert not credentials.expired + @pytest.mark.parametrize("time_skew", [100, -100]) + def test_refresh_source_credentials(self, time_skew): + credentials = self.make_credentials(lifetime=None) + + # Source credentials is refreshed only if it is expired within + # _helpers.CLOCK_SKEW from now. We add a time_skew to the expiry, so + # source credentials is refreshed only if time_skew <= 0. + credentials._source_credentials.expiry = ( + _helpers.utcnow() + + _helpers.CLOCK_SKEW + + datetime.timedelta(seconds=time_skew) + ) + credentials._source_credentials.token = "Token" + + with mock.patch( + "google.oauth2.service_account.Credentials.refresh", autospec=True + ) as source_cred_refresh: + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": "token", "expireTime": expire_time} + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + # Source credentials is refreshed only if it is expired within + # _helpers.CLOCK_SKEW + if time_skew > 0: + source_cred_refresh.assert_not_called() + else: + source_cred_refresh.assert_called_once() + def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials): credentials = self.make_credentials(lifetime=None) token = "token"