Skip to content

Commit

Permalink
feat: AwsCredentials should not call metadata server if security cred…
Browse files Browse the repository at this point in the history
…s and region are retrievable through the environment variables (#1195)

* feat: AwsCredentials should not call metadata server if security creds and region are retrievable through the environment variables

* add verification of metadata token request not called

* Adding new coverage of test and address comments

* add previous missing period

Co-authored-by: Leo <39062083+lsirac@users.noreply.github.com>
  • Loading branch information
BigTailWolf and lsirac committed Dec 7, 2022
1 parent 84895cf commit 5e27c8f
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 3 deletions.
27 changes: 25 additions & 2 deletions google/auth/aws.py
Expand Up @@ -466,8 +466,12 @@ def retrieve_subject_token(self, request):
Returns:
str: The retrieved subject token.
"""
# Fetch the session token required to make meta data endpoint calls to aws
if request is not None and self._imdsv2_session_token_url is not None:
# Fetch the session token required to make meta data endpoint calls to aws.
if (
request is not None
and self._imdsv2_session_token_url is not None
and self._should_use_metadata_server()
):
headers = {"X-aws-ec2-metadata-token-ttl-seconds": "300"}

imdsv2_session_token_response = request(
Expand Down Expand Up @@ -738,6 +742,25 @@ def _get_metadata_role_name(self, request, imdsv2_session_token):

return response_body

def _should_use_metadata_server(self):
# The AWS region can be provided through AWS_REGION or AWS_DEFAULT_REGION.
# The metadata server should be used if it cannot be retrieved from one of
# these environment variables.
if not os.environ.get(environment_vars.AWS_REGION) and not os.environ.get(
environment_vars.AWS_DEFAULT_REGION
):
return True

# AWS security credentials can be retrieved from the AWS_ACCESS_KEY_ID
# and AWS_SECRET_ACCESS_KEY environment variables. The metadata server
# should be used if either of these are not available.
if not os.environ.get(environment_vars.AWS_ACCESS_KEY_ID) or not os.environ.get(
environment_vars.AWS_SECRET_ACCESS_KEY
):
return True

return False

@classmethod
def from_info(cls, info, **kwargs):
"""Creates an AWS Credentials instance from parsed external account info.
Expand Down
208 changes: 207 additions & 1 deletion tests/test_aws.py
Expand Up @@ -14,6 +14,7 @@

import datetime
import json
import os

import mock
import pytest # type: ignore
Expand Down Expand Up @@ -1224,6 +1225,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {})
def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
self, utcnow
):
Expand Down Expand Up @@ -1300,7 +1302,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(

# Only 3 requests should be sent as the region is cached.
assert len(new_request.call_args_list) == 3
# Assert session token request
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
Expand All @@ -1323,6 +1325,210 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
},
)
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secret_access_key_idmsv2(
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
},
)
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_access_key_id_idmsv2(
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {environment_vars.AWS_REGION: AWS_REGION})
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_creds_idmsv2(
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
},
)
def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK, role_name=self.AWS_ROLE
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

credentials.retrieve_subject_token(request)
assert not request.called

def test_validate_metadata_server_url_if_any(self):
aws.Credentials.validate_metadata_server_url_if_any(
"http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone", "url"
Expand Down

0 comments on commit 5e27c8f

Please sign in to comment.