Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/mount_efs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@
AWS_CONTAINER_CREDS_FULL_URI_ENV = "AWS_CONTAINER_CREDENTIALS_FULL_URI"
AWS_CONTAINER_AUTH_TOKEN_FILE_ENV = "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE"

# CloudShell credentials endpoint
CLOUDSHELL_METADATA_ENDPOINT = "http://localhost:1338/latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just quick question: Is this endpoint created on the host and playing as a fake endpoint to get security-credentials ?

CLOUDSHELL_TOKEN_URL = CLOUDSHELL_METADATA_ENDPOINT + "/api/token"
CLOUDSHELL_CREDS_URL = CLOUDSHELL_METADATA_ENDPOINT + "/meta-data/container/security-credentials"
CLOUDSHELL_TOKEN_HEADER = "X-aws-ec2-metadata-token"
CLOUDSHELL_TOKEN_TTL_HEADER = "X-aws-ec2-metadata-token-ttl-seconds"

# Environment variables for AWS credentials
AWS_ACCESS_KEY_ID_ENV = "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY_ENV = "AWS_SECRET_ACCESS_KEY"
AWS_SESSION_TOKEN_ENV = "AWS_SESSION_TOKEN"


def is_ipv6_address(ip_address):
try:
Expand Down Expand Up @@ -780,6 +792,18 @@ def get_aws_security_credentials(
if credentials and credentials_source:
return credentials, credentials_source

# attempt to lookup AWS security credentials from CloudShell metadata endpoint
credentials, credentials_source = get_aws_security_credentials_from_cloudshell(
config, False
)
if credentials and credentials_source:
return credentials, credentials_source

# attempt to lookup AWS security credentials from environment variables
credentials, credentials_source = get_aws_security_credentials_from_env_vars()
if credentials and credentials_source:
return credentials, credentials_source

# attempt to lookup AWS security credentials with IAM role name attached to instance
# through IAM role name security credentials lookup uri
iam_role_name = get_iam_role_name(config)
Expand All @@ -793,6 +817,7 @@ def get_aws_security_credentials(

error_msg = (
"AWS Access Key ID and Secret Access Key are not found in AWS credentials file (%s), config file (%s), "
"environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY), CloudShell metadata endpoint, "
"from ECS credentials relative uri, or from the instance security credentials service"
% (AWS_CREDENTIALS_FILE, AWS_CONFIG_FILE)
)
Expand Down Expand Up @@ -1004,6 +1029,63 @@ def get_aws_security_credentials_from_instance_metadata(config, iam_role_name):
return None, None


def get_aws_security_credentials_from_cloudshell(config, is_fatal=False):
"""
Attempt to retrieve AWS security credentials from CloudShell metadata endpoint.
CloudShell provides a limited IMDSv2-like endpoint at localhost:1338.
"""
try:
# First get the token
token_headers = {CLOUDSHELL_TOKEN_TTL_HEADER: "60"}
token_request = Request(CLOUDSHELL_TOKEN_URL, headers=token_headers)
token_request.get_method = lambda: "PUT"

token_response = urlopen(token_request, timeout=DEFAULT_TIMEOUT)
token = token_response.read().decode("utf-8")

# Then get credentials using the token
creds_headers = {CLOUDSHELL_TOKEN_HEADER: token}
creds_response = url_request_helper(
config,
CLOUDSHELL_CREDS_URL,
"Unsuccessful retrieval of AWS security credentials from CloudShell",
"Unable to reach CloudShell metadata endpoint",
headers=creds_headers
)

if creds_response and all(k in creds_response for k in CREDENTIALS_KEYS):
logging.debug("Retrieved credentials from CloudShell metadata endpoint")
return creds_response, "cloudshell"

except Exception as e:
logging.debug("Failed to retrieve CloudShell credentials: %s" % str(e))

if is_fatal:
fatal_error("Failed to retrieve credentials from CloudShell metadata endpoint")
return None, None


def get_aws_security_credentials_from_env_vars():
"""
Attempt to retrieve AWS security credentials from environment variables.
Supports AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN.
"""
access_key = os.environ.get(AWS_ACCESS_KEY_ID_ENV)
secret_key = os.environ.get(AWS_SECRET_ACCESS_KEY_ENV)
session_token = os.environ.get(AWS_SESSION_TOKEN_ENV)

if access_key and secret_key:
credentials = {
"AccessKeyId": access_key,
"SecretAccessKey": secret_key,
"Token": session_token # Can be None
}
logging.debug("Retrieved credentials from environment variables")
return credentials, "environment"

return None, None


def get_iam_role_name(config):
iam_role_unsuccessful_resp = (
"Unsuccessful retrieval of IAM role name at %s." % INSTANCE_IAM_URL
Expand Down
225 changes: 225 additions & 0 deletions test/mount_efs_test/test_get_aws_security_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ def _test_get_aws_security_credentials_get_ecs_from_env_url(mocker):
config = get_fake_config()
mocker.patch.dict(os.environ, {})
mocker.patch("os.path.exists", return_value=False)
# Mock new credential functions to return None so they don't interfere
mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None))
mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None))
response = json.dumps(
{
"AccessKeyId": ACCESS_KEY_ID_VAL,
Expand Down Expand Up @@ -261,6 +264,9 @@ def _test_get_aws_security_credentials_get_instance_metadata_role_name(
config = get_fake_config()
mocker.patch.dict(os.environ, {})
mocker.patch("os.path.exists", return_value=False)
# Mock new credential functions to return None so they don't interfere
mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None))
mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None))
response = json.dumps(
{
"Code": "Success",
Expand Down Expand Up @@ -299,6 +305,9 @@ def test_get_aws_security_credentials_no_credentials_found(mocker, capsys):
config = get_fake_config()
mocker.patch.dict(os.environ, {})
mocker.patch("os.path.exists", return_value=False)
# Mock new credential functions to return None so they don't interfere
mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None))
mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None))
mocker.patch("mount_efs.urlopen")

with pytest.raises(SystemExit) as ex:
Expand Down Expand Up @@ -586,3 +595,219 @@ def test_get_aws_security_credentials_pod_identity_invalid_token_file(mocker):
mount_efs.get_aws_security_credentials(config, True, "us-east-1")

assert ex.value.code == 1


# Tests for CloudShell credential support
def test_get_aws_security_credentials_from_cloudshell_success(mocker):
"""Test successful CloudShell credential retrieval"""
config = get_fake_config()
token = "mock-token"
credentials_response = {
"AccessKeyId": ACCESS_KEY_ID_VAL,
"SecretAccessKey": SECRET_ACCESS_KEY_VAL,
"Token": SESSION_TOKEN_VAL,
"LastUpdated": "1970-01-01T00:00:00Z",
"Type": "",
"Expiration": "2025-05-15T17:06:59Z",
"Code": "Success"
}

# Mock token request
mock_token_response = MockUrlLibResponse(data=token.encode('utf-8'))
mocker.patch("mount_efs.urlopen", return_value=mock_token_response)

# Mock credentials request
mocker.patch("mount_efs.url_request_helper", return_value=credentials_response)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False)

assert credentials == credentials_response
assert credentials_source == "cloudshell"


def test_get_aws_security_credentials_from_cloudshell_token_failure(mocker):
"""Test CloudShell token request failure"""
config = get_fake_config()

# Mock token request failure
mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused"))

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False)

assert credentials is None
assert credentials_source is None


def test_get_aws_security_credentials_from_cloudshell_credentials_failure(mocker):
"""Test CloudShell credentials request failure"""
config = get_fake_config()
token = "mock-token"

# Mock successful token request
mock_token_response = MockUrlLibResponse(data=token.encode('utf-8'))
mocker.patch("mount_efs.urlopen", return_value=mock_token_response)

# Mock credentials request failure
mocker.patch("mount_efs.url_request_helper", return_value=None)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False)

assert credentials is None
assert credentials_source is None


def test_get_aws_security_credentials_from_cloudshell_invalid_credentials(mocker):
"""Test CloudShell with invalid credentials response"""
config = get_fake_config()
token = "mock-token"
invalid_credentials = {
"AccessKeyId": ACCESS_KEY_ID_VAL,
# Missing SecretAccessKey and Token
"LastUpdated": "1970-01-01T00:00:00Z",
}

# Mock token request
mock_token_response = MockUrlLibResponse(data=token.encode('utf-8'))
mocker.patch("mount_efs.urlopen", return_value=mock_token_response)

# Mock credentials request with invalid response
mocker.patch("mount_efs.url_request_helper", return_value=invalid_credentials)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False)

assert credentials is None
assert credentials_source is None


def test_get_aws_security_credentials_from_cloudshell_fatal_error(mocker):
"""Test CloudShell with fatal error flag"""
config = get_fake_config()

# Mock token request failure
mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused"))

with pytest.raises(SystemExit) as ex:
mount_efs.get_aws_security_credentials_from_cloudshell(config, True)

assert ex.value.code == 1


# Tests for environment variable credential support
def test_get_aws_security_credentials_from_env_vars_success_with_token(mocker):
"""Test successful environment variable credential retrieval with session token"""
mocker.patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL,
"AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL,
"AWS_SESSION_TOKEN": SESSION_TOKEN_VAL
})

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars()

assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL
assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL
assert credentials["Token"] == SESSION_TOKEN_VAL
assert credentials_source == "environment"


def test_get_aws_security_credentials_from_env_vars_success_without_token(mocker):
"""Test successful environment variable credential retrieval without session token"""
mocker.patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL,
"AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL
})

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars()

assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL
assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL
assert credentials["Token"] is None
assert credentials_source == "environment"


def test_get_aws_security_credentials_from_env_vars_missing_access_key(mocker):
"""Test environment variable credentials with missing access key"""
mocker.patch.dict(os.environ, {
"AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL,
"AWS_SESSION_TOKEN": SESSION_TOKEN_VAL
}, clear=True)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars()

assert credentials is None
assert credentials_source is None


def test_get_aws_security_credentials_from_env_vars_missing_secret_key(mocker):
"""Test environment variable credentials with missing secret key"""
mocker.patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL,
"AWS_SESSION_TOKEN": SESSION_TOKEN_VAL
}, clear=True)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars()

assert credentials is None
assert credentials_source is None


def test_get_aws_security_credentials_from_env_vars_no_env_vars(mocker):
"""Test environment variable credentials when no environment variables are set"""
mocker.patch.dict(os.environ, {}, clear=True)

credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars()

assert credentials is None
assert credentials_source is None


# Integration tests for the main credential function
def test_get_aws_security_credentials_cloudshell_integration(mocker):
"""Test that CloudShell credentials are used in the main credential chain"""
config = get_fake_config()
token = "mock-token"
credentials_response = {
"AccessKeyId": ACCESS_KEY_ID_VAL,
"SecretAccessKey": SECRET_ACCESS_KEY_VAL,
"Token": SESSION_TOKEN_VAL,
}

# Mock all preceding credential methods to fail
mocker.patch.dict(os.environ, {}, clear=True)
mocker.patch("os.path.exists", return_value=False)

# Mock CloudShell success
mock_token_response = MockUrlLibResponse(data=token.encode('utf-8'))
mocker.patch("mount_efs.urlopen", return_value=mock_token_response)
mocker.patch("mount_efs.url_request_helper", return_value=credentials_response)

credentials, credentials_source = mount_efs.get_aws_security_credentials(
config, True, "us-east-1"
)

assert credentials == credentials_response
assert credentials_source == "cloudshell"


def test_get_aws_security_credentials_env_vars_integration(mocker):
"""Test that environment variable credentials are used in the main credential chain"""
config = get_fake_config()

# Mock all preceding credential methods to fail
mocker.patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL,
"AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL,
"AWS_SESSION_TOKEN": SESSION_TOKEN_VAL
})
mocker.patch("os.path.exists", return_value=False)

# Mock CloudShell failure
mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused"))

credentials, credentials_source = mount_efs.get_aws_security_credentials(
config, True, "us-east-1"
)

assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL
assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL
assert credentials["Token"] == SESSION_TOKEN_VAL
assert credentials_source == "environment"