diff --git a/src/mount_efs/__init__.py b/src/mount_efs/__init__.py index c694d7b0..4df8e275 100755 --- a/src/mount_efs/__init__.py +++ b/src/mount_efs/__init__.py @@ -320,6 +320,18 @@ AWS_CONTAINER_CREDS_FULL_URI_ENV = "AWS_CONTAINER_CREDENTIALS_FULL_URI" AWS_CONTAINER_AUTH_TOKEN_FILE_ENV = "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE" +# CloudShell credentials endpoint +CLOUDSHELL_METADATA_ENDPOINT = "http://localhost:1338/latest" +CLOUDSHELL_TOKEN_URL = CLOUDSHELL_METADATA_ENDPOINT + "/api/token" +CLOUDSHELL_CREDS_URL = CLOUDSHELL_METADATA_ENDPOINT + "/meta-data/container/security-credentials" +CLOUDSHELL_TOKEN_HEADER = "X-aws-ec2-metadata-token" +CLOUDSHELL_TOKEN_TTL_HEADER = "X-aws-ec2-metadata-token-ttl-seconds" + +# Environment variables for AWS credentials +AWS_ACCESS_KEY_ID_ENV = "AWS_ACCESS_KEY_ID" +AWS_SECRET_ACCESS_KEY_ENV = "AWS_SECRET_ACCESS_KEY" +AWS_SESSION_TOKEN_ENV = "AWS_SESSION_TOKEN" + def is_ipv6_address(ip_address): try: @@ -780,6 +792,18 @@ def get_aws_security_credentials( if credentials and credentials_source: return credentials, credentials_source + # attempt to lookup AWS security credentials from CloudShell metadata endpoint + credentials, credentials_source = get_aws_security_credentials_from_cloudshell( + config, False + ) + if credentials and credentials_source: + return credentials, credentials_source + + # attempt to lookup AWS security credentials from environment variables + credentials, credentials_source = get_aws_security_credentials_from_env_vars() + if credentials and credentials_source: + return credentials, credentials_source + # attempt to lookup AWS security credentials with IAM role name attached to instance # through IAM role name security credentials lookup uri iam_role_name = get_iam_role_name(config) @@ -793,6 +817,7 @@ def get_aws_security_credentials( error_msg = ( "AWS Access Key ID and Secret Access Key are not found in AWS credentials file (%s), config file (%s), " + "environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY), CloudShell metadata endpoint, " "from ECS credentials relative uri, or from the instance security credentials service" % (AWS_CREDENTIALS_FILE, AWS_CONFIG_FILE) ) @@ -1004,6 +1029,63 @@ def get_aws_security_credentials_from_instance_metadata(config, iam_role_name): return None, None +def get_aws_security_credentials_from_cloudshell(config, is_fatal=False): + """ + Attempt to retrieve AWS security credentials from CloudShell metadata endpoint. + CloudShell provides a limited IMDSv2-like endpoint at localhost:1338. + """ + try: + # First get the token + token_headers = {CLOUDSHELL_TOKEN_TTL_HEADER: "60"} + token_request = Request(CLOUDSHELL_TOKEN_URL, headers=token_headers) + token_request.get_method = lambda: "PUT" + + token_response = urlopen(token_request, timeout=DEFAULT_TIMEOUT) + token = token_response.read().decode("utf-8") + + # Then get credentials using the token + creds_headers = {CLOUDSHELL_TOKEN_HEADER: token} + creds_response = url_request_helper( + config, + CLOUDSHELL_CREDS_URL, + "Unsuccessful retrieval of AWS security credentials from CloudShell", + "Unable to reach CloudShell metadata endpoint", + headers=creds_headers + ) + + if creds_response and all(k in creds_response for k in CREDENTIALS_KEYS): + logging.debug("Retrieved credentials from CloudShell metadata endpoint") + return creds_response, "cloudshell" + + except Exception as e: + logging.debug("Failed to retrieve CloudShell credentials: %s" % str(e)) + + if is_fatal: + fatal_error("Failed to retrieve credentials from CloudShell metadata endpoint") + return None, None + + +def get_aws_security_credentials_from_env_vars(): + """ + Attempt to retrieve AWS security credentials from environment variables. + Supports AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. + """ + access_key = os.environ.get(AWS_ACCESS_KEY_ID_ENV) + secret_key = os.environ.get(AWS_SECRET_ACCESS_KEY_ENV) + session_token = os.environ.get(AWS_SESSION_TOKEN_ENV) + + if access_key and secret_key: + credentials = { + "AccessKeyId": access_key, + "SecretAccessKey": secret_key, + "Token": session_token # Can be None + } + logging.debug("Retrieved credentials from environment variables") + return credentials, "environment" + + return None, None + + def get_iam_role_name(config): iam_role_unsuccessful_resp = ( "Unsuccessful retrieval of IAM role name at %s." % INSTANCE_IAM_URL diff --git a/test/mount_efs_test/test_get_aws_security_credentials.py b/test/mount_efs_test/test_get_aws_security_credentials.py index 3e75014e..b9ca5a7d 100644 --- a/test/mount_efs_test/test_get_aws_security_credentials.py +++ b/test/mount_efs_test/test_get_aws_security_credentials.py @@ -156,6 +156,9 @@ def _test_get_aws_security_credentials_get_ecs_from_env_url(mocker): config = get_fake_config() mocker.patch.dict(os.environ, {}) mocker.patch("os.path.exists", return_value=False) + # Mock new credential functions to return None so they don't interfere + mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None)) + mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None)) response = json.dumps( { "AccessKeyId": ACCESS_KEY_ID_VAL, @@ -261,6 +264,9 @@ def _test_get_aws_security_credentials_get_instance_metadata_role_name( config = get_fake_config() mocker.patch.dict(os.environ, {}) mocker.patch("os.path.exists", return_value=False) + # Mock new credential functions to return None so they don't interfere + mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None)) + mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None)) response = json.dumps( { "Code": "Success", @@ -299,6 +305,9 @@ def test_get_aws_security_credentials_no_credentials_found(mocker, capsys): config = get_fake_config() mocker.patch.dict(os.environ, {}) mocker.patch("os.path.exists", return_value=False) + # Mock new credential functions to return None so they don't interfere + mocker.patch("mount_efs.get_aws_security_credentials_from_cloudshell", return_value=(None, None)) + mocker.patch("mount_efs.get_aws_security_credentials_from_env_vars", return_value=(None, None)) mocker.patch("mount_efs.urlopen") with pytest.raises(SystemExit) as ex: @@ -586,3 +595,219 @@ def test_get_aws_security_credentials_pod_identity_invalid_token_file(mocker): mount_efs.get_aws_security_credentials(config, True, "us-east-1") assert ex.value.code == 1 + + +# Tests for CloudShell credential support +def test_get_aws_security_credentials_from_cloudshell_success(mocker): + """Test successful CloudShell credential retrieval""" + config = get_fake_config() + token = "mock-token" + credentials_response = { + "AccessKeyId": ACCESS_KEY_ID_VAL, + "SecretAccessKey": SECRET_ACCESS_KEY_VAL, + "Token": SESSION_TOKEN_VAL, + "LastUpdated": "1970-01-01T00:00:00Z", + "Type": "", + "Expiration": "2025-05-15T17:06:59Z", + "Code": "Success" + } + + # Mock token request + mock_token_response = MockUrlLibResponse(data=token.encode('utf-8')) + mocker.patch("mount_efs.urlopen", return_value=mock_token_response) + + # Mock credentials request + mocker.patch("mount_efs.url_request_helper", return_value=credentials_response) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False) + + assert credentials == credentials_response + assert credentials_source == "cloudshell" + + +def test_get_aws_security_credentials_from_cloudshell_token_failure(mocker): + """Test CloudShell token request failure""" + config = get_fake_config() + + # Mock token request failure + mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused")) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False) + + assert credentials is None + assert credentials_source is None + + +def test_get_aws_security_credentials_from_cloudshell_credentials_failure(mocker): + """Test CloudShell credentials request failure""" + config = get_fake_config() + token = "mock-token" + + # Mock successful token request + mock_token_response = MockUrlLibResponse(data=token.encode('utf-8')) + mocker.patch("mount_efs.urlopen", return_value=mock_token_response) + + # Mock credentials request failure + mocker.patch("mount_efs.url_request_helper", return_value=None) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False) + + assert credentials is None + assert credentials_source is None + + +def test_get_aws_security_credentials_from_cloudshell_invalid_credentials(mocker): + """Test CloudShell with invalid credentials response""" + config = get_fake_config() + token = "mock-token" + invalid_credentials = { + "AccessKeyId": ACCESS_KEY_ID_VAL, + # Missing SecretAccessKey and Token + "LastUpdated": "1970-01-01T00:00:00Z", + } + + # Mock token request + mock_token_response = MockUrlLibResponse(data=token.encode('utf-8')) + mocker.patch("mount_efs.urlopen", return_value=mock_token_response) + + # Mock credentials request with invalid response + mocker.patch("mount_efs.url_request_helper", return_value=invalid_credentials) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_cloudshell(config, False) + + assert credentials is None + assert credentials_source is None + + +def test_get_aws_security_credentials_from_cloudshell_fatal_error(mocker): + """Test CloudShell with fatal error flag""" + config = get_fake_config() + + # Mock token request failure + mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused")) + + with pytest.raises(SystemExit) as ex: + mount_efs.get_aws_security_credentials_from_cloudshell(config, True) + + assert ex.value.code == 1 + + +# Tests for environment variable credential support +def test_get_aws_security_credentials_from_env_vars_success_with_token(mocker): + """Test successful environment variable credential retrieval with session token""" + mocker.patch.dict(os.environ, { + "AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL, + "AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL, + "AWS_SESSION_TOKEN": SESSION_TOKEN_VAL + }) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars() + + assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL + assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL + assert credentials["Token"] == SESSION_TOKEN_VAL + assert credentials_source == "environment" + + +def test_get_aws_security_credentials_from_env_vars_success_without_token(mocker): + """Test successful environment variable credential retrieval without session token""" + mocker.patch.dict(os.environ, { + "AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL, + "AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL + }) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars() + + assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL + assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL + assert credentials["Token"] is None + assert credentials_source == "environment" + + +def test_get_aws_security_credentials_from_env_vars_missing_access_key(mocker): + """Test environment variable credentials with missing access key""" + mocker.patch.dict(os.environ, { + "AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL, + "AWS_SESSION_TOKEN": SESSION_TOKEN_VAL + }, clear=True) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars() + + assert credentials is None + assert credentials_source is None + + +def test_get_aws_security_credentials_from_env_vars_missing_secret_key(mocker): + """Test environment variable credentials with missing secret key""" + mocker.patch.dict(os.environ, { + "AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL, + "AWS_SESSION_TOKEN": SESSION_TOKEN_VAL + }, clear=True) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars() + + assert credentials is None + assert credentials_source is None + + +def test_get_aws_security_credentials_from_env_vars_no_env_vars(mocker): + """Test environment variable credentials when no environment variables are set""" + mocker.patch.dict(os.environ, {}, clear=True) + + credentials, credentials_source = mount_efs.get_aws_security_credentials_from_env_vars() + + assert credentials is None + assert credentials_source is None + + +# Integration tests for the main credential function +def test_get_aws_security_credentials_cloudshell_integration(mocker): + """Test that CloudShell credentials are used in the main credential chain""" + config = get_fake_config() + token = "mock-token" + credentials_response = { + "AccessKeyId": ACCESS_KEY_ID_VAL, + "SecretAccessKey": SECRET_ACCESS_KEY_VAL, + "Token": SESSION_TOKEN_VAL, + } + + # Mock all preceding credential methods to fail + mocker.patch.dict(os.environ, {}, clear=True) + mocker.patch("os.path.exists", return_value=False) + + # Mock CloudShell success + mock_token_response = MockUrlLibResponse(data=token.encode('utf-8')) + mocker.patch("mount_efs.urlopen", return_value=mock_token_response) + mocker.patch("mount_efs.url_request_helper", return_value=credentials_response) + + credentials, credentials_source = mount_efs.get_aws_security_credentials( + config, True, "us-east-1" + ) + + assert credentials == credentials_response + assert credentials_source == "cloudshell" + + +def test_get_aws_security_credentials_env_vars_integration(mocker): + """Test that environment variable credentials are used in the main credential chain""" + config = get_fake_config() + + # Mock all preceding credential methods to fail + mocker.patch.dict(os.environ, { + "AWS_ACCESS_KEY_ID": ACCESS_KEY_ID_VAL, + "AWS_SECRET_ACCESS_KEY": SECRET_ACCESS_KEY_VAL, + "AWS_SESSION_TOKEN": SESSION_TOKEN_VAL + }) + mocker.patch("os.path.exists", return_value=False) + + # Mock CloudShell failure + mocker.patch("mount_efs.urlopen", side_effect=Exception("Connection refused")) + + credentials, credentials_source = mount_efs.get_aws_security_credentials( + config, True, "us-east-1" + ) + + assert credentials["AccessKeyId"] == ACCESS_KEY_ID_VAL + assert credentials["SecretAccessKey"] == SECRET_ACCESS_KEY_VAL + assert credentials["Token"] == SESSION_TOKEN_VAL + assert credentials_source == "environment"