Skip to content

Commit

Permalink
fix: fix impersonated cred for gcloud (#516)
Browse files Browse the repository at this point in the history
* fix: fix impersonated cred for gcloud

(1) For IDTokenCredentials.refresh(self.request), use the provided 
request instead of creating a new one
(2) For Credentials, only refresh the source credentials if it is not 
valid

* update expired func
  • Loading branch information
arithmetic1728 committed May 28, 2020
1 parent 178e691 commit eb7be3f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
13 changes: 6 additions & 7 deletions google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,6 @@ def __init__(
def refresh(self, request):
self._update_token(request)

@property
def expired(self):
return _helpers.utcnow() >= self.expiry

def _update_token(self, request):
"""Updates credentials with a new access_token representing
the impersonated account.
Expand All @@ -239,8 +235,9 @@ def _update_token(self, request):
to use for refreshing credentials.
"""

# Refresh our source credentials.
self._source_credentials.refresh(request)
# Refresh our source credentials if it is not valid.
if not self._source_credentials.valid:
self._source_credentials.refresh(request)

body = {
"delegates": self._delegates,
Expand Down Expand Up @@ -347,7 +344,9 @@ def refresh(self, request):

headers = {"Content-Type": "application/json"}

authed_session = AuthorizedSession(self._target_credentials._source_credentials)
authed_session = AuthorizedSession(
self._target_credentials._source_credentials, auth_request=request
)

response = authed_session.post(
url=iam_sign_endpoint,
Expand Down
38 changes: 38 additions & 0 deletions tests/test_impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,44 @@ def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
assert credentials.valid
assert not credentials.expired

@pytest.mark.parametrize("time_skew", [100, -100])
def test_refresh_source_credentials(self, time_skew):
credentials = self.make_credentials(lifetime=None)

# Source credentials is refreshed only if it is expired within
# _helpers.CLOCK_SKEW from now. We add a time_skew to the expiry, so
# source credentials is refreshed only if time_skew <= 0.
credentials._source_credentials.expiry = (
_helpers.utcnow()
+ _helpers.CLOCK_SKEW
+ datetime.timedelta(seconds=time_skew)
)
credentials._source_credentials.token = "Token"

with mock.patch(
"google.oauth2.service_account.Credentials.refresh", autospec=True
) as source_cred_refresh:
expire_time = (
_helpers.utcnow().replace(microsecond=0)
+ datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": "token", "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)

credentials.refresh(request)

assert credentials.valid
assert not credentials.expired

# Source credentials is refreshed only if it is expired within
# _helpers.CLOCK_SKEW
if time_skew > 0:
source_cred_refresh.assert_not_called()
else:
source_cred_refresh.assert_called_once()

def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = "token"
Expand Down

0 comments on commit eb7be3f

Please sign in to comment.