diff --git a/Cargo.lock b/Cargo.lock index 60c5a2325d4fe..b1c7c205fe52e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5745,6 +5745,7 @@ dependencies = [ "anyhow", "aws-config", "aws-credential-types", + "aws-sdk-sts", "aws-types", "base64 0.13.1", "bytes", @@ -5783,6 +5784,7 @@ dependencies = [ "rdkafka", "scopeguard", "serde", + "serde_json", "thiserror", "timely", "tokio", diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 0c167117c4d15..ae4c80981b4ce 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -381,18 +381,18 @@ steps: composition: zippy args: [--scenario=AlterConnectionWithKafkaSources, --actions=10000, --max-execution-time=30m] - - group: Secrets - key: secrets + - group: AWS + key: aws steps: - - id: secrets-aws-secrets-manager - label: "Secrets AWS" + - id: aws + label: AWS timeout_in_minutes: 30 agents: queue: linux-x86_64 artifact_paths: junit_*.xml plugins: - ./ci/plugins/mzcompose: - composition: secrets-aws-secrets-manager + composition: aws - id: secrets-local-file label: "Secrets Local File" diff --git a/doc/developer/design/20231110_aws_connections.md b/doc/developer/design/20231110_aws_connections.md index 0c8ca2f0c1867..33d326b6181ae 100644 --- a/doc/developer/design/20231110_aws_connections.md +++ b/doc/developer/design/20231110_aws_connections.md @@ -127,7 +127,7 @@ CREATE TABLE mz_internal.mz_aws_connections ( -- The value of the `AWS ACCESS KEY ID` option, if specified as a string. -- `NULL` otherwise. access_key_id text, - -- The value of the `AWS ACCESS KEY ID` option, if specified as a secret. + -- The ID of the `AWS ACCESS KEY ID` secret in `mz_secrets`, if specified as a secret. -- `NULL` otherwise. access_key_id_secret_id text, -- The value of the `ASSUME ROLE ARN` option, if specified. `NULL` otherwise. diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index def903ef8e843..ccb23893cede7 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -1222,6 +1222,7 @@ The `mz_scheduling_parks_histogram` view describes a histogram of [dataflow] wor + diff --git a/misc/python/materialize/checks/all_checks/aws.py b/misc/python/materialize/checks/all_checks/aws.py new file mode 100644 index 0000000000000..65bbcf476bab3 --- /dev/null +++ b/misc/python/materialize/checks/all_checks/aws.py @@ -0,0 +1,64 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +from __future__ import annotations + +from textwrap import dedent + +from materialize.checks.actions import Testdrive +from materialize.checks.checks import Check, externally_idempotent + + +@externally_idempotent(False) +class AwsConnection(Check): + def initialize(self) -> Testdrive: + return Testdrive( + dedent( + """ + $[version>=8000] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} + ALTER SYSTEM SET enable_aws_connection = true + ALTER SYSTEM SET enable_connection_validation_syntax = true + + > CREATE CONNECTION aws_assume_role + TO AWS (ASSUME ROLE ARN 'assume-role', ASSUME ROLE SESSION NAME 'session-name'); + + > CREATE SECRET aws_secret_access_key as '...'; + + > CREATE CONNECTION aws_credentials + TO AWS (ACCESS KEY ID = 'access_key', SECRET ACCESS KEY = SECRET aws_secret_access_key); + """ + ) + ) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(dedent(s)) + for s in [ + """ + > ALTER CONNECTION aws_assume_role SET (ASSUME ROLE ARN 'assume-role-2'); + """, + """ + > ALTER CONNECTION aws_credentials SET (ACCESS KEY ID 'access_key_2'); + """, + ] + ] + + def validate(self) -> Testdrive: + # We just check that the connections are still safe to reference. + # The error is inconsistent depending on the way the check is being run. + return Testdrive( + dedent( + """ + ! VALIDATE CONNECTION aws_assume_role; + regex:.* + + ! VALIDATE CONNECTION aws_credentials; + regex:.* + """ + ) + ) diff --git a/misc/python/materialize/cli/ci_logged_errors_detect.py b/misc/python/materialize/cli/ci_logged_errors_detect.py index 2fe94d7203934..8cd3243bad02f 100644 --- a/misc/python/materialize/cli/ci_logged_errors_detect.py +++ b/misc/python/materialize/cli/ci_logged_errors_detect.py @@ -94,6 +94,8 @@ | (platform-checks|legacy-upgrade|upgrade-matrix|feature-benchmark)-materialized-.* \| .*cannot\ load\ unknown\ system\ parameter\ from\ catalog\ storage # For platform-checks upgrade tests | cannot\ load\ unknown\ system\ parameter\ from\ catalog\ storage(\ to\ set\ (default|configured)\ parameter)?\ name=enable_dangerous_functions + | internal\ error:\ no\ AWS\ external\ ID\ prefix\ configured + | failed\ writing\ row\ to\ mz_aws_connections.*no\ AWS\ external\ ID\ prefix\ configured ) """, re.VERBOSE | re.MULTILINE, diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index b1853976a5de2..0529c88d29de5 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -13,10 +13,11 @@ use bytesize::ByteSize; use chrono::{DateTime, Utc}; use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage}; use mz_catalog::builtin::{ - MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, - MZ_CLUSTERS, MZ_CLUSTER_LINKS, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_METRICS, - MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, - MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS, + MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS, + MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTERS, MZ_CLUSTER_LINKS, + MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_METRICS, MZ_CLUSTER_REPLICA_SIZES, + MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_DATABASES, + MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS, MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES, MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEWS, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_ROLES, MZ_ROLE_MEMBERS, MZ_SCHEMAS, MZ_SECRETS, @@ -46,8 +47,9 @@ use mz_sql::catalog::{CatalogCluster, CatalogDatabase, CatalogSchema, CatalogTyp use mz_sql::func::FuncImplCatalogDetails; use mz_sql::names::{CommentObjectId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier}; use mz_sql_parser::ast::display::AstDisplay; +use mz_storage_types::connections::aws::{AwsAuth, AwsConnection}; use mz_storage_types::connections::inline::ReferencedConnection; -use mz_storage_types::connections::KafkaConnection; +use mz_storage_types::connections::{KafkaConnection, StringOrSecret}; use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection}; use mz_storage_types::sources::{ GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, @@ -636,26 +638,35 @@ impl CatalogState { diff, )); } else { - tracing::error!("does this even happen?"); + tracing::error!(%id, "missing SSH public key; cannot write row to mz_ssh_connections table"); } } mz_storage_types::connections::Connection::Kafka(ref kafka) => { updates.extend(self.pack_kafka_connection_update(id, kafka, diff)); } + mz_storage_types::connections::Connection::Aws(ref aws_config) => { + match self.pack_aws_connection_update(id, aws_config, diff) { + Ok(update) => { + updates.push(update); + } + Err(e) => { + tracing::error!(%id, %e, "failed writing row to mz_aws_connections table"); + } + } + } mz_storage_types::connections::Connection::AwsPrivatelink(_) => { if let Some(aws_principal_context) = self.aws_principal_context.as_ref() { - updates.extend(self.pack_aws_privatelink_connection_update( + updates.push(self.pack_aws_privatelink_connection_update( id, aws_principal_context, diff, )); } else { - tracing::error!("Missing AWS principal context, cannot write to mz_aws_privatelink_connections table"); + tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table"); } } mz_storage_types::connections::Connection::Csr(_) - | mz_storage_types::connections::Connection::Postgres(_) - | mz_storage_types::connections::Connection::Aws(_) => (), + | mz_storage_types::connections::Connection::Postgres(_) => (), }; updates } @@ -714,12 +725,69 @@ impl CatalogState { connection_id: GlobalId, aws_principal_context: &AwsPrincipalContext, diff: Diff, - ) -> Result { + ) -> BuiltinTableUpdate { let id = self.resolve_builtin_table(&MZ_AWS_PRIVATELINK_CONNECTIONS); let row = Row::pack_slice(&[ Datum::String(&connection_id.to_string()), Datum::String(&aws_principal_context.to_principal_string(connection_id)), ]); + BuiltinTableUpdate { id, row, diff } + } + + pub fn pack_aws_connection_update( + &self, + connection_id: GlobalId, + aws_config: &AwsConnection, + diff: Diff, + ) -> Result { + let id = self.resolve_builtin_table(&MZ_AWS_CONNECTIONS); + + let mut access_key_id = None; + let mut access_key_id_secret_id = None; + let mut assume_role_arn = None; + let mut assume_role_session_name = None; + let mut principal = None; + let mut external_id = None; + let mut example_trust_policy = None; + match &aws_config.auth { + AwsAuth::Credentials(credentials) => match &credentials.access_key_id { + StringOrSecret::String(access_key) => access_key_id = Some(access_key.as_str()), + StringOrSecret::Secret(secret_id) => { + access_key_id_secret_id = Some(secret_id.to_string()) + } + }, + AwsAuth::AssumeRole(assume_role) => { + assume_role_arn = Some(assume_role.arn.as_str()); + assume_role_session_name = assume_role.session_name.as_deref(); + principal = self + .config + .connection_context + .aws_connection_role_arn + .as_deref(); + external_id = + Some(assume_role.external_id(&self.config.connection_context, connection_id)?); + example_trust_policy = { + let policy = assume_role + .example_trust_policy(&self.config.connection_context, connection_id)?; + let policy = Jsonb::from_serde_json(policy).expect("valid json"); + Some(policy.into_row()) + }; + } + } + + let row = Row::pack_slice(&[ + Datum::String(&connection_id.to_string()), + Datum::from(aws_config.endpoint.as_deref()), + Datum::from(aws_config.region.as_deref()), + Datum::from(access_key_id), + Datum::from(access_key_id_secret_id.as_deref()), + Datum::from(assume_role_arn), + Datum::from(assume_role_session_name), + Datum::from(principal), + Datum::from(external_id.as_deref()), + Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())), + ]); + Ok(BuiltinTableUpdate { id, row, diff }) } diff --git a/src/buf.yaml b/src/buf.yaml index f443f487ebf30..9f9c0b2e31647 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -43,6 +43,8 @@ breaking: - persist-types/src/stats.proto # reason: does currently not require backward-compatibility - storage-client/src/client.proto + # reason: currently does not require backward-compatibility + - storage-types/src/connections/aws.proto lint: use: - DEFAULT diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 3d5d4be2a6af8..4468bfc464274 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -2646,6 +2646,27 @@ pub static MZ_AWS_PRIVATELINK_CONNECTIONS: Lazy = Lazy::new(|| Bui sensitivity: DataSensitivity::Public, }); +pub static MZ_AWS_CONNECTIONS: Lazy = Lazy::new(|| BuiltinTable { + name: "mz_aws_connections", + schema: MZ_INTERNAL_SCHEMA, + desc: RelationDesc::empty() + .with_column("id", ScalarType::String.nullable(false)) + .with_column("endpoint", ScalarType::String.nullable(true)) + .with_column("region", ScalarType::String.nullable(true)) + .with_column("access_key_id", ScalarType::String.nullable(true)) + .with_column("access_key_id_secret_id", ScalarType::String.nullable(true)) + .with_column("assume_role_arn", ScalarType::String.nullable(true)) + .with_column( + "assume_role_session_name", + ScalarType::String.nullable(true), + ) + .with_column("principal", ScalarType::String.nullable(true)) + .with_column("external_id", ScalarType::String.nullable(true)) + .with_column("example_trust_policy", ScalarType::Jsonb.nullable(true)), + is_retained_metrics_object: false, + sensitivity: DataSensitivity::Public, +}); + pub static MZ_CLUSTER_REPLICA_METRICS: Lazy = Lazy::new(|| BuiltinTable { name: "mz_cluster_replica_metrics", // TODO[btv] - make this public once we work out whether and how to fuse it with @@ -6185,6 +6206,7 @@ pub static BUILTINS_STATIC: Lazy>> = Lazy::new(|| { Builtin::Table(&MZ_STORAGE_USAGE_BY_SHARD), Builtin::Table(&MZ_EGRESS_IPS), Builtin::Table(&MZ_AWS_PRIVATELINK_CONNECTIONS), + Builtin::Table(&MZ_AWS_CONNECTIONS), Builtin::Table(&MZ_SUBSCRIPTIONS), Builtin::Table(&MZ_SESSIONS), Builtin::Table(&MZ_DEFAULT_PRIVILEGES), diff --git a/src/clusterd/src/bin/clusterd.rs b/src/clusterd/src/bin/clusterd.rs index ef5c51b6735f6..d4e4c1e220e9b 100644 --- a/src/clusterd/src/bin/clusterd.rs +++ b/src/clusterd/src/bin/clusterd.rs @@ -174,6 +174,11 @@ struct Args { #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", parse(from_str = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable))] aws_external_id_prefix: Option, + /// The ARN for a Materialize-controlled role to assume before assuming + /// a customer's requested role for an AWS connection. + #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")] + aws_connection_role_arn: Option, + // === Process orchestrator options. === /// Where to write a PID lock file. /// @@ -333,6 +338,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { args.environment_id, &args.tracing.startup_log_filter, args.aws_external_id_prefix, + args.aws_connection_role_arn, secrets_reader, None, ), diff --git a/src/controller/src/clusters.rs b/src/controller/src/clusters.rs index 0b774689c9fad..d603c87935f17 100644 --- a/src/controller/src/clusters.rs +++ b/src/controller/src/clusters.rs @@ -587,6 +587,7 @@ where }; let environment_id = self.connection_context().environment_id.clone(); let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone(); + let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone(); let persist_pubsub_url = self.persist_pubsub_url.clone(); let persist_txn_tables = self.persist_txn_tables; let secrets_args = self.secrets_args.to_flags(); @@ -620,6 +621,12 @@ where aws_external_id_prefix )); } + if let Some(aws_connection_role_arn) = &aws_connection_role_arn { + args.push(format!( + "--aws-connection-role-arn={}", + aws_connection_role_arn + )); + } if let Some(memory_limit) = location.allocation.memory_limit { args.push(format!( "--announce-memory-limit={}", diff --git a/src/environmentd/src/bin/environmentd/main.rs b/src/environmentd/src/bin/environmentd/main.rs index 7990241729ae7..5a9197eef09dd 100644 --- a/src/environmentd/src/bin/environmentd/main.rs +++ b/src/environmentd/src/bin/environmentd/main.rs @@ -892,6 +892,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { args.environment_id.to_string(), &args.tracing.startup_log_filter, args.aws_external_id_prefix, + args.aws_connection_role_arn, secrets_reader, cloud_resource_reader, ); diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 16c7e31891873..2076b6bd882a7 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -37,6 +37,7 @@ Array As Asc Assert +Assume At Auction Authority diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index a7f74436ea6fa..b5d662347b46a 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -669,6 +669,8 @@ impl_display_t!(DbzTxMetadataOption); #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ConnectionOptionName { AccessKeyId, + AssumeRoleArn, + AssumeRoleSessionName, AvailabilityZones, AwsPrivatelink, Broker, @@ -680,7 +682,6 @@ pub enum ConnectionOptionName { Port, ProgressTopic, Region, - RoleArn, SaslMechanisms, SaslPassword, SaslUsername, @@ -712,7 +713,8 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::Port => "PORT", ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC", ConnectionOptionName::Region => "REGION", - ConnectionOptionName::RoleArn => "ROLE ARN", + ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN", + ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME", ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS", ConnectionOptionName::SaslPassword => "SASL PASSWORD", ConnectionOptionName::SaslUsername => "SASL USERNAME", diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 96bac7ef4d94a..6083e73832cd4 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2410,6 +2410,7 @@ impl<'a> Parser<'a> { Ok( match self.expect_one_of_keywords(&[ ACCESS, + ASSUME, AVAILABILITY, AWS, BROKER, @@ -2437,6 +2438,17 @@ impl<'a> Parser<'a> { self.expect_keywords(&[KEY, ID])?; ConnectionOptionName::AccessKeyId } + ASSUME => { + self.expect_keyword(ROLE)?; + match self.expect_one_of_keywords(&[ARN, SESSION])? { + ARN => ConnectionOptionName::AssumeRoleArn, + SESSION => { + self.expect_keyword(NAME)?; + ConnectionOptionName::AssumeRoleSessionName + } + _ => unreachable!(), + } + } AVAILABILITY => { self.expect_keyword(ZONES)?; ConnectionOptionName::AvailabilityZones @@ -2461,10 +2473,6 @@ impl<'a> Parser<'a> { ConnectionOptionName::SecurityProtocol } REGION => ConnectionOptionName::Region, - ROLE => { - self.expect_keyword(ARN)?; - ConnectionOptionName::RoleArn - } SASL => match self.expect_one_of_keywords(&[MECHANISMS, PASSWORD, USERNAME])? { MECHANISMS => ConnectionOptionName::SaslMechanisms, PASSWORD => ConnectionOptionName::SaslPassword, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 79458599c6b65..abba844b9bed6 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -387,11 +387,25 @@ CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b) AS SELECT 1 parse-statement -CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', ROLE ARN 'role-arn', SECRET ACCESS KEY 'key', TOKEN 'token') +CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', SECRET ACCESS KEY 'key', TOKEN 'token') ---- -CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID = 'id', ENDPOINT = 'endpoint', REGION = 'region', ROLE ARN = 'role-arn', SECRET ACCESS KEY = 'key', TOKEN = 'token') +CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID = 'id', ENDPOINT = 'endpoint', REGION = 'region', SECRET ACCESS KEY = 'key', TOKEN = 'token') => -CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("awsconn")]), connection_type: Aws, if_not_exists: false, values: [ConnectionOption { name: AccessKeyId, value: Some(Value(String("id"))) }, ConnectionOption { name: Endpoint, value: Some(Value(String("endpoint"))) }, ConnectionOption { name: Region, value: Some(Value(String("region"))) }, ConnectionOption { name: RoleArn, value: Some(Value(String("role-arn"))) }, ConnectionOption { name: SecretAccessKey, value: Some(Value(String("key"))) }, ConnectionOption { name: Token, value: Some(Value(String("token"))) }], with_options: [] }) +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("awsconn")]), connection_type: Aws, if_not_exists: false, values: [ConnectionOption { name: AccessKeyId, value: Some(Value(String("id"))) }, ConnectionOption { name: Endpoint, value: Some(Value(String("endpoint"))) }, ConnectionOption { name: Region, value: Some(Value(String("region"))) }, ConnectionOption { name: SecretAccessKey, value: Some(Value(String("key"))) }, ConnectionOption { name: Token, value: Some(Value(String("token"))) }], with_options: [] }) + +parse-statement +CREATE CONNECTION awsconn TO AWS (ASSUME ROLE ARN 'role-arn', ASSUME ROLE SESSION NAME 'session-name') +---- +CREATE CONNECTION awsconn TO AWS (ASSUME ROLE ARN = 'role-arn', ASSUME ROLE SESSION NAME = 'session-name') +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("awsconn")]), connection_type: Aws, if_not_exists: false, values: [ConnectionOption { name: AssumeRoleArn, value: Some(Value(String("role-arn"))) }, ConnectionOption { name: AssumeRoleSessionName, value: Some(Value(String("session-name"))) }], with_options: [] }) + +parse-statement +CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', ASSUME ROLE ARN 'role-arn', SECRET ACCESS KEY 'key', TOKEN 'token') +---- +CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID = 'id', ENDPOINT = 'endpoint', REGION = 'region', ASSUME ROLE ARN = 'role-arn', SECRET ACCESS KEY = 'key', TOKEN = 'token') +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("awsconn")]), connection_type: Aws, if_not_exists: false, values: [ConnectionOption { name: AccessKeyId, value: Some(Value(String("id"))) }, ConnectionOption { name: Endpoint, value: Some(Value(String("endpoint"))) }, ConnectionOption { name: Region, value: Some(Value(String("region"))) }, ConnectionOption { name: AssumeRoleArn, value: Some(Value(String("role-arn"))) }, ConnectionOption { name: SecretAccessKey, value: Some(Value(String("key"))) }, ConnectionOption { name: Token, value: Some(Value(String("token"))) }], with_options: [] }) parse-statement CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', AVAILABILITY ZONES ('use1-az1', 'use1-az4')) diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index cd4a148c4bdd1..f95264c94a13d 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -20,7 +20,7 @@ use mz_sql_parser::ast::{ ConnectionOption, ConnectionOptionName, CreateConnectionType, KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName, KafkaBrokerTunnel, }; -use mz_storage_types::connections::aws::{AwsAssumeRole, AwsConfig, AwsCredentials}; +use mz_storage_types::connections::aws::{AwsAssumeRole, AwsAuth, AwsConnection, AwsCredentials}; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::{ AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth, @@ -36,6 +36,8 @@ use crate::plan::{PlanError, StatementContext}; generate_extracted_config!( ConnectionOption, (AccessKeyId, StringOrSecret), + (AssumeRoleArn, String), + (AssumeRoleSessionName, String), (AvailabilityZones, Vec), (AwsPrivatelink, with_options::Object), (Broker, Vec>), @@ -47,7 +49,6 @@ generate_extracted_config!( (Port, u16), (ProgressTopic, String), (Region, String), - (RoleArn, String), (SaslMechanisms, String), (SaslPassword, with_options::Secret), (SaslUsername, StringOrSecret), @@ -88,7 +89,8 @@ pub(super) fn validate_options_per_connection_type( Token, Endpoint, Region, - RoleArn, + AssumeRoleArn, + AssumeRoleSessionName, ] .as_slice(), CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName], @@ -164,17 +166,41 @@ impl ConnectionOptionExtracted { let connection: Connection = match connection_type { CreateConnectionType::Aws => { - Connection::Aws(AwsConfig { - credentials: AwsCredentials { - access_key_id: self - .access_key_id - .ok_or_else(|| sql_err!("ACCESS KEY ID option is required"))?, - secret_access_key: self - .secret_access_key - .ok_or_else(|| sql_err!("SECRET ACCESS KEY option is required"))? - .into(), - session_token: self.token, - }, + let credentials = match (self.access_key_id, self.secret_access_key, self.token) { + (Some(access_key_id), Some(secret_access_key), session_token) => { + Some(AwsCredentials { + access_key_id, + secret_access_key: secret_access_key.into(), + session_token, + }) + } + (None, None, None) => None, + _ => { + sql_bail!("must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"); + } + }; + + let assume_role = match (self.assume_role_arn, self.assume_role_session_name) { + (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }), + (None, Some(_)) => { + sql_bail!( + "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME" + ); + } + _ => None, + }; + + let auth = match (credentials, assume_role) { + (None, None) => sql_bail!("must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"), + (Some(credentials), None) => AwsAuth::Credentials(credentials), + (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role), + (Some(_), Some(_)) => { + sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN"); + } + }; + + Connection::Aws(AwsConnection { + auth, endpoint: match self.endpoint { // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL` // endpoint, but making that change now would break testdrive. AWS connections are @@ -184,7 +210,6 @@ impl ConnectionOptionExtracted { _ => None, }, region: self.region, - role: self.role_arn.map(|arn| AwsAssumeRole { arn }), }) } CreateConnectionType::AwsPrivatelink => { diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index 1371db0fac62b..7de7f8cf173e5 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -10,6 +10,7 @@ publish = false anyhow = "1.0.66" aws-config = { version = "0.55", default-features = false, features = ["native-tls"] } aws-credential-types = { version = "0.55", features = ["hardcoded-credentials"] } +aws-sdk-sts = { version = "0.26", default-features = false, features = ["native-tls", "rt-tokio"] } aws-types = "0.55" bytes = "1.3.0" dec = "0.4.8" @@ -45,6 +46,7 @@ prost = { version = "0.11.3", features = ["no-recursion-limit"] } rdkafka = { version = "0.29.0", features = ["cmake-build", "ssl-vendored", "libz-static", "zstd"] } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } +serde_json = "1.0.89" thiserror = "1.0.37" timely = { version = "0.12.0", default-features = false, features = ["bincode"] } tokio = { version = "1.24.2", features = ["fs", "rt", "sync", "test-util", "time"] } diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index febd268df6457..9965d2e1a1d4f 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -44,7 +44,7 @@ use tokio_postgres::config::SslMode; use url::Url; use crate::configuration::StorageConfiguration; -use crate::connections::aws::AwsConfig; +use crate::connections::aws::AwsConnection; use crate::errors::{ContextCreationError, CsrConnectError}; pub mod aws; @@ -131,6 +131,9 @@ pub struct ConnectionContext { pub librdkafka_log_level: tracing::Level, /// A prefix for an external ID to use for all AWS AssumeRole operations. pub aws_external_id_prefix: Option, + /// The ARN for a Materialize-controlled role to assume before assuming + /// a customer's requested role for an AWS connection. + pub aws_connection_role_arn: Option, /// A secrets reader. pub secrets_reader: Arc, /// A cloud resource reader, if supported in this configuration. @@ -151,6 +154,7 @@ impl ConnectionContext { environment_id: String, startup_log_level: &CloneableEnvFilter, aws_external_id_prefix: Option, + aws_connection_role_arn: Option, secrets_reader: Arc, cloud_resource_reader: Option>, ) -> ConnectionContext { @@ -161,6 +165,7 @@ impl ConnectionContext { "librdkafka", ), aws_external_id_prefix, + aws_connection_role_arn, secrets_reader, cloud_resource_reader, ssh_tunnel_manager: SshTunnelManager::default(), @@ -173,6 +178,7 @@ impl ConnectionContext { environment_id: "test-environment-id".into(), librdkafka_log_level: tracing::Level::INFO, aws_external_id_prefix: None, + aws_connection_role_arn: None, secrets_reader, cloud_resource_reader: None, ssh_tunnel_manager: SshTunnelManager::default(), @@ -186,7 +192,7 @@ pub enum Connection { Csr(CsrConnection), Postgres(PostgresConnection), Ssh(SshConnection), - Aws(AwsConfig), + Aws(AwsConnection), AwsPrivatelink(AwsPrivatelinkConnection), } diff --git a/src/storage-types/src/connections/aws.proto b/src/storage-types/src/connections/aws.proto index 23509181fde28..0c58a03131ee7 100644 --- a/src/storage-types/src/connections/aws.proto +++ b/src/storage-types/src/connections/aws.proto @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +// buf breaking: ignore (currently does not require backward-compatibility) + syntax = "proto3"; import "google/protobuf/empty.proto"; @@ -16,10 +18,12 @@ import "storage-types/src/connections.proto"; package mz_storage_types.connections.aws; -message ProtoAwsConfig { - ProtoAwsCredentials credentials = 1; +message ProtoAwsConnection { + oneof auth { + ProtoAwsCredentials credentials = 1; + ProtoAwsAssumeRole assume_role = 3; + } optional string region = 2; - ProtoAwsAssumeRole role = 3; optional string endpoint = 4; } @@ -31,4 +35,5 @@ message ProtoAwsCredentials { message ProtoAwsAssumeRole { string arn = 1; + optional string session_name = 2; } diff --git a/src/storage-types/src/connections/aws.rs b/src/storage-types/src/connections/aws.rs index f30036d428d02..f15980fedea48 100644 --- a/src/storage-types/src/connections/aws.rs +++ b/src/storage-types/src/connections/aws.rs @@ -9,69 +9,135 @@ //! AWS configuration for sources and sinks. -use anyhow::anyhow; -use mz_cloud_resources::AwsExternalIdPrefix; +use anyhow::{anyhow, bail}; +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_config::default_provider::region; +use aws_config::meta::region::ProvideRegion; +use aws_config::sts::AssumeRoleProvider; +use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; +use aws_credential_types::Credentials; +use aws_types::region::Region; +use aws_types::SdkConfig; + use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::GlobalId; -use mz_secrets::SecretsReader; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; +use serde_json::json; -use crate::{configuration::StorageConfiguration, connections::StringOrSecret}; +use crate::{ + configuration::StorageConfiguration, + connections::{ConnectionContext, StringOrSecret}, +}; include!(concat!( env!("OUT_DIR"), "/mz_storage_types.connections.aws.rs" )); -/// AWS configuration overrides for a source or sink. -/// -/// This is a distinct type from any of the configuration types built into the -/// AWS SDK so that we can implement `Serialize` and `Deserialize`. +/// AWS connection configuration. #[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] -pub struct AwsConfig { - /// AWS Credentials, or where to find them - pub credentials: AwsCredentials, +pub struct AwsConnection { + pub auth: AwsAuth, /// The AWS region to use. /// - /// Uses the default region (looking at env vars, config files, etc) if not provided. + /// Uses the default region (looking at env vars, config files, etc) if not + /// provided. pub region: Option, - /// The AWS role to assume. - pub role: Option, /// The custom AWS endpoint to use, if any. pub endpoint: Option, } -impl RustType for AwsConfig { - fn into_proto(&self) -> ProtoAwsConfig { - ProtoAwsConfig { - credentials: Some(self.credentials.into_proto()), +impl RustType for AwsConnection { + fn into_proto(&self) -> ProtoAwsConnection { + let auth = match &self.auth { + AwsAuth::Credentials(credentials) => { + proto_aws_connection::Auth::Credentials(credentials.into_proto()) + } + AwsAuth::AssumeRole(assume_role) => { + proto_aws_connection::Auth::AssumeRole(assume_role.into_proto()) + } + }; + + ProtoAwsConnection { + auth: Some(auth), region: self.region.clone(), - role: self.role.into_proto(), endpoint: self.endpoint.clone(), } } - fn from_proto(proto: ProtoAwsConfig) -> Result { - Ok(AwsConfig { - credentials: proto - .credentials - .into_rust_if_some("ProtoAwsConfig::credentials")?, + fn from_proto(proto: ProtoAwsConnection) -> Result { + let auth = match proto.auth.expect("auth expected") { + proto_aws_connection::Auth::Credentials(credentials) => { + AwsAuth::Credentials(credentials.into_rust()?) + } + proto_aws_connection::Auth::AssumeRole(assume_role) => { + AwsAuth::AssumeRole(assume_role.into_rust()?) + } + }; + + Ok(AwsConnection { + auth, region: proto.region, - role: proto.role.into_rust()?, endpoint: proto.endpoint, }) } } -/// AWS credentials for a source or sink. +/// Describes how to authenticate with AWS. +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum AwsAuth { + /// Authenticate with an access key. + Credentials(AwsCredentials), + //// Authenticate via assuming an IAM role. + AssumeRole(AwsAssumeRole), +} + +/// AWS credentials to access an AWS account using user access keys. #[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct AwsCredentials { + /// The AWS API Access Key required to connect to the AWS account. pub access_key_id: StringOrSecret, + /// The Secret Access Key required to connect to the AWS account. pub secret_access_key: GlobalId, + /// Optional session token to connect to the AWS account. pub session_token: Option, } +impl AwsCredentials { + /// Loads a credentials provider with the configured credentials. + async fn load_credentials_provider( + &self, + connection_context: &ConnectionContext, + ) -> Result { + let secrets_reader = connection_context.secrets_reader.as_ref(); + Ok(Credentials::from_keys( + self.access_key_id + .get_string(secrets_reader) + .await + .map_err(|_| { + anyhow!("internal error: failed to read access key ID from secret store") + })?, + connection_context + .secrets_reader + .read_string(self.secret_access_key) + .await + .map_err(|_| { + anyhow!("internal error: failed to read secret access key from secret store") + })?, + match &self.session_token { + Some(t) => { + let t = t.get_string(secrets_reader).await.map_err(|_| { + anyhow!("internal error: failed to read session token from secret store") + })?; + Some(t) + } + None => None, + }, + )) + } +} + impl RustType for AwsCredentials { fn into_proto(&self) -> ProtoAwsCredentials { ProtoAwsCredentials { @@ -94,100 +160,256 @@ impl RustType for AwsCredentials { } } -/// A role for Materialize to assume when performing AWS API calls. +/// Describes an AWS IAM role to assume. #[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct AwsAssumeRole { /// The Amazon Resource Name of the role to assume. pub arn: String, + /// The optional session name for the session. + pub session_name: Option, +} + +impl AwsAssumeRole { + /// Loads a credentials provider that will assume the specified role + /// with the appropriate external ID. + async fn load_credentials_provider( + &self, + connection_context: &ConnectionContext, + connection_id: GlobalId, + region: Option, + ) -> Result { + let external_id = self.external_id(connection_context, connection_id)?; + // It's okay to use `dangerously_load_credentials_provider` here, as + // this is the method that provides a safe wrapper by forcing use of the + // correct external ID. + self.dangerously_load_credentials_provider( + connection_context, + connection_id, + Some(external_id), + region, + ) + .await + } + + /// DANGEROUS: only for internal use! + /// + /// Like `load_credentials_provider`, but accepts an arbitrary external ID. + /// Only for use in the internal implementation of AWS connections. Using + /// this method incorrectly can result in violating our AWS security + /// requirements. + async fn dangerously_load_credentials_provider( + &self, + connection_context: &ConnectionContext, + connection_id: GlobalId, + external_id: Option, + region: Option, + ) -> Result { + let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else { + bail!("internal error: no AWS connection role configured"); + }; + + // The default session name identifies the environment and the + // connection. + let default_session_name = + format!("{}-{}", &connection_context.environment_id, connection_id); + + // First we create a credentials provider that will assume the "jump + // role" provided to this Materialize environment. This is the role that + // we've told the end user to allow in their role trust policy. No need + // to specify the external ID here as we're still within the Materialize + // sphere of trust. The ambient AWS credentials provided to this + // environment will be provided via the default credentials change and + // allow us to assume the jump role. We always use the default session + // name here, so that we can identify the specific environment and + // connection ID that initiated the session in our internal CloudTrail + // logs. This session isn't visible to the end user. + let mut jump_credentials = AssumeRoleProvider::builder(aws_connection_role_arn) + .session_name(default_session_name.clone()); + + if let Some(region) = region.clone() { + jump_credentials = jump_credentials.region(region); + } + let jump_credentials = + jump_credentials.build(DefaultCredentialsChain::builder().build().await); + + // Then we create the provider that will assume the end user's role. + // Here, we *must* install the external ID, as we're using the jump role + // to hop into the end user's AWS account, and the external ID is the + // only thing that allows them to limit their trust of the jump role to + // this specific Materialize environment and AWS connection. We also + // respect the user's configured session name, if any, as this is the + // session that will be visible to them. + let mut credentials = AssumeRoleProvider::builder(&self.arn) + .session_name(self.session_name.clone().unwrap_or(default_session_name)); + if let Some(external_id) = external_id { + credentials = credentials.external_id(external_id); + } + if let Some(region) = region { + credentials = credentials.region(region); + } + Ok(credentials.build(jump_credentials)) + } + + pub fn external_id( + &self, + connection_context: &ConnectionContext, + connection_id: GlobalId, + ) -> Result { + let Some(aws_external_id_prefix) = &connection_context.aws_external_id_prefix else { + bail!("internal error: no AWS external ID prefix configured"); + }; + Ok(format!("mz_{}_{}", aws_external_id_prefix, connection_id)) + } + + pub fn example_trust_policy( + &self, + connection_context: &ConnectionContext, + connection_id: GlobalId, + ) -> Result { + let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else { + bail!("internal error: no AWS connection role configured"); + }; + Ok(json!( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": aws_connection_role_arn + }, + "Action": "sts:AssumeRole", + "Condition": { + "StringEquals": { + "sts:ExternalId": self.external_id(connection_context, connection_id)? + } + } + } + ] + } + )) + } } impl RustType for AwsAssumeRole { fn into_proto(&self) -> ProtoAwsAssumeRole { ProtoAwsAssumeRole { arn: self.arn.clone(), + session_name: self.session_name.clone(), } } fn from_proto(proto: ProtoAwsAssumeRole) -> Result { - Ok(AwsAssumeRole { arn: proto.arn }) + Ok(AwsAssumeRole { + arn: proto.arn, + session_name: proto.session_name, + }) } } -impl AwsConfig { - /// Loads the AWS SDK configuration object from the environment, then - /// applies the overrides from this object. - pub async fn load( +impl AwsConnection { + /// Loads the AWS SDK configuration with the configuration specified on this + /// object. + pub async fn load_sdk_config( &self, - external_id_prefix: Option<&AwsExternalIdPrefix>, - external_id_suffix: Option<&GlobalId>, - secrets_reader: &dyn SecretsReader, - ) -> aws_types::SdkConfig { - use aws_config::default_provider::region::DefaultRegionChain; - use aws_config::sts::AssumeRoleProvider; - use aws_credential_types::provider::SharedCredentialsProvider; - use aws_credential_types::Credentials; - use aws_types::region::Region; - - let region = match &self.region { - Some(region) => Some(Region::new(region.clone())), - _ => { - let rc = DefaultRegionChain::builder(); - rc.build().region().await - } + connection_context: &ConnectionContext, + connection_id: GlobalId, + ) -> Result { + let credentials = match &self.auth { + AwsAuth::Credentials(credentials) => SharedCredentialsProvider::new( + credentials + .load_credentials_provider(connection_context) + .await?, + ), + AwsAuth::AssumeRole(assume_role) => SharedCredentialsProvider::new( + assume_role + .load_credentials_provider( + connection_context, + connection_id, + self.region_or_default().await, + ) + .await?, + ), }; + self.load_sdk_config_from_credentials(credentials).await + } - let AwsCredentials { - access_key_id, - secret_access_key, - session_token, - } = &self.credentials; + async fn load_sdk_config_from_credentials( + &self, + credentials: impl ProvideCredentials + 'static, + ) -> Result { + let mut loader = aws_config::from_env().credentials_provider(credentials); - let mut cred_provider = SharedCredentialsProvider::new(Credentials::from_keys( - access_key_id.get_string(secrets_reader).await.unwrap(), - secrets_reader - .read_string(*secret_access_key) - .await - .unwrap(), - match session_token { - Some(t) => Some(t.get_string(secrets_reader).await.unwrap()), - None => None, - }, - )); - - if let Some(AwsAssumeRole { arn }) = &self.role { - let mut role = AssumeRoleProvider::builder(arn).session_name("materialize"); - // This affects which region to perform STS on, not where - // anything else happens. - if let Some(region) = ®ion { - role = role.region(region.clone()); - } - if let Some(external_id_prefix) = external_id_prefix { - let external_id = if let Some(suffix) = external_id_suffix { - format!("{}-{}", external_id_prefix, suffix) - } else { - external_id_prefix.to_string() - }; - role = role.external_id(external_id); - } - cred_provider = SharedCredentialsProvider::new(role.build(cred_provider)); + // Technically this is ignored for `AwsAssumeRole`, see `region_or_default` for more + // information. + if let Some(region) = &self.region { + loader = loader.region(Region::new(region.clone())); } - let mut loader = aws_config::from_env() - .region(region) - .credentials_provider(cred_provider); if let Some(endpoint) = &self.endpoint { loader = loader.endpoint_url(endpoint); } - loader.load().await + Ok(loader.load().await) } - #[allow(clippy::unused_async)] pub(crate) async fn validate( &self, - _id: GlobalId, - _storage_configuration: &StorageConfiguration, + id: GlobalId, + storage_configuration: &StorageConfiguration, ) -> Result<(), anyhow::Error> { - Err(anyhow!("Validating SSH connections is not supported yet")) + // TODO(mouli): Update this to return user friendly error in + // case of failure. + let aws_config = self + .load_sdk_config(&storage_configuration.connection_context, id) + .await?; + let sts_client = aws_sdk_sts::Client::new(&aws_config); + // TODO(mouli): Update this to return user friendly error in + // case of failure. + let _ = sts_client.get_caller_identity().send().await?; + + if let AwsAuth::AssumeRole(assume_role) = &self.auth { + // Per AWS's recommendation, when validating a connection using + // `AssumeRole` authentication, we should ensure that the + // role rejects `AssumeRole` requests that don't specify an + // external ID. + let external_id = None; + let credentials = assume_role + .dangerously_load_credentials_provider( + &storage_configuration.connection_context, + id, + external_id, + self.region_or_default().await, + ) + .await?; + let aws_config = self.load_sdk_config_from_credentials(credentials).await?; + let sts_client = aws_sdk_sts::Client::new(&aws_config); + if sts_client.get_caller_identity().send().await.is_ok() { + // TODO(mouli): Return a structured `ValidateConnectionError`` + // instead of `anyhow::Error`, so that we can include additional + // details and a hint. + bail!( + "AWS connection role does not require an external ID; \ + this is INSECURE and allows any Materialize customer \ + access to your AWS account" + ); + } + } + + Ok(()) + } + + /// `credentials_provider` seemingly is bugged and does not correctly inherit + /// the region (which may be filled with a default region) from the aws config. + /// This means we have to _manually_ fill it in in the `AssumeRoleProvider`, defaulting + /// to the default region. Fetching the default region happens in the aws config's + /// `load` function, but we are forced to replicated that logic here. + /// + /// might remove the need to do this. + async fn region_or_default(&self) -> Option { + match &self.region { + Some(region_name) => Some(Region::new(region_name.clone())), + None => region::default_provider().region().await, + } } pub(crate) fn validate_by_default(&self) -> bool { diff --git a/test/aws/aws-connection/aws-connection.td b/test/aws/aws-connection/aws-connection.td new file mode 100644 index 0000000000000..34c7be3ca571c --- /dev/null +++ b/test/aws/aws-connection/aws-connection.td @@ -0,0 +1,90 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Tests for AWS connections. + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_aws_connection = true; +ALTER SYSTEM SET enable_connection_validation_syntax = true; + +# Test assume role connections. + +> CREATE CONNECTION aws_assume_role + TO AWS (ASSUME ROLE ARN 'assume-role', ASSUME ROLE SESSION NAME 'session-name'); + +$ set-from-sql var=conn_id +SELECT id FROM mz_connections WHERE name = 'aws_assume_role'; + +> SELECT * FROM mz_internal.mz_aws_connections WHERE id = '${conn_id}'; +id endpoint region access_key_id access_key_id_secret_id assume_role_arn assume_role_session_name principal external_id example_trust_policy +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- +"${conn_id}" assume-role session-name arn:aws:iam::123456789000:role/MaterializeConnection "mz_eb5cb59b-e2fe-41f3-87ca-d2176a495345_${conn_id}" "{\"Statement\":[{\"Action\":\"sts:AssumeRole\",\"Condition\":{\"StringEquals\":{\"sts:ExternalId\":\"mz_eb5cb59b-e2fe-41f3-87ca-d2176a495345_${conn_id}\"}},\"Effect\":\"Allow\",\"Principal\":{\"AWS\":\"arn:aws:iam::123456789000:role/MaterializeConnection\"}}],\"Version\":\"2012-10-17\"}" + +# Test access credentials connections. + +> CREATE SECRET aws_secret_access_key as '...'; + +> CREATE CONNECTION aws_credentials + TO AWS (ACCESS KEY ID = 'access_key', SECRET ACCESS KEY = SECRET aws_secret_access_key); + +$ set-from-sql var=conn_id +SELECT id FROM mz_connections WHERE name = 'aws_credentials'; + +> SELECT * FROM mz_internal.mz_aws_connections WHERE id = '${conn_id}'; +id endpoint region access_key_id access_key_id_secret_id assume_role_arn assume_role_session_name principal external_id example_trust_policy +------------------------------------------------------------------------------------------------------------------------------------------------------------- +"${conn_id}" access_key + +# Test access credentials connections where the access key ID is also a secret. + +> CREATE SECRET aws_access_key_id as '...'; + +> CREATE CONNECTION aws_credentials_with_secret + TO AWS (ACCESS KEY ID = SECRET aws_access_key_id, SECRET ACCESS KEY = SECRET aws_secret_access_key); + +$ set-from-sql var=conn_id +SELECT id FROM mz_connections WHERE name = 'aws_credentials_with_secret'; + +$ set-from-sql var=secret_id +SELECT id FROM mz_secrets WHERE name = 'aws_access_key_id'; + +> SELECT * FROM mz_internal.mz_aws_connections WHERE id = '${conn_id}'; +id endpoint region access_key_id access_key_id_secret_id assume_role_arn assume_role_session_name principal external_id example_trust_policy +------------------------------------------------------------------------------------------------------------------------------------------------------------- +"${conn_id}" "${secret_id}" + +# TODO: tests for validating connections. + +# Test invalid statements. + +! CREATE CONNECTION conn + TO AWS (ACCESS KEY ID = 'access_key'); +contains:must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN + +! CREATE CONNECTION conn + TO AWS (SECRET ACCESS KEY = SECRET aws_secret_access_key); +contains:must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN + +! CREATE CONNECTION conn + TO AWS (TOKEN = 'token'); +contains:must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN + +! CREATE CONNECTION conn + TO AWS (ASSUME ROLE SESSION NAME 'session-name'); +contains:must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME + +! CREATE CONNECTION conn + TO AWS (ACCESS KEY ID = 'access_key', SECRET ACCESS KEY = SECRET aws_secret_access_key, ASSUME ROLE ARN 'arn-name'); +contains:cannot specify both ACCESS KEY ID and ASSUME ROLE ARN + +! CREATE CONNECTION conn + TO AWS (REGION 'us-east'); +contains:must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY + +# TODO: tests for invalid IAM configuration. diff --git a/test/secrets-aws-secrets-manager/mzcompose b/test/aws/mzcompose similarity index 100% rename from test/secrets-aws-secrets-manager/mzcompose rename to test/aws/mzcompose diff --git a/test/secrets-aws-secrets-manager/mzcompose.py b/test/aws/mzcompose.py similarity index 90% rename from test/secrets-aws-secrets-manager/mzcompose.py rename to test/aws/mzcompose.py index 3b106e3e6f573..8e86b78608b69 100644 --- a/test/secrets-aws-secrets-manager/mzcompose.py +++ b/test/aws/mzcompose.py @@ -28,9 +28,11 @@ NAMESPACE = ENVIRONMENT_NAME SERVICE_ACCOUNT_NAME = ENVIRONMENT_NAME OIDC_SUB = f"system:serviceaccount:{NAMESPACE}:{SERVICE_ACCOUNT_NAME}" -PURPOSE = "Customer Secrets" +PURPOSE = "test-aws" STACK = "mzcompose" KMS_KEY_ALIAS_NAME = f"alias/customer_key_{DEFAULT_MZ_ENVIRONMENT_ID}" +AWS_CONNECTION_ROLE_ARN = "arn:aws:iam::123456789000:role/MaterializeConnection" +AWS_EXTERNAL_ID_PREFIX = "eb5cb59b-e2fe-41f3-87ca-d2176a495345" AWS_ACCESS_KEY_ID = "LSIAQAAAAAAVNCBMPNSG" AWS_SECRET_ACCESS_KEY = "secret" @@ -50,13 +52,21 @@ f"--aws-secrets-controller-tags=Environment={ENVIRONMENT_NAME}", f"--aws-secrets-controller-tags=Purpose={PURPOSE}", f"--aws-secrets-controller-tags=Stack={STACK}", + f"--aws-connection-role-arn={AWS_CONNECTION_ROLE_ARN}", + f"--aws-external-id-prefix={AWS_EXTERNAL_ID_PREFIX}", ], ), - Testdrive(), + Testdrive(default_timeout="5s"), ] def workflow_default(c: Composition) -> None: + for name in ["secrets-manager", "aws-connection"]: + with c.test_case(name): + c.workflow(name) + + +def workflow_secrets_manager(c: Composition) -> None: c.up("localstack") aws_endpoint_url = f"http://localhost:{c.port('localstack', 4566)}" @@ -191,3 +201,8 @@ def get_secret_value(_id: str) -> bytes: # Check that the file has been deleted from Secrets Manager secrets = list_secrets() assert secret_name("u1") not in secrets + + +def workflow_aws_connection(c: Composition) -> None: + c.up("localstack", "materialized") + c.run("testdrive", "aws-connection/aws-connection.td") diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index 7e752273b999c..42229c917c522 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -658,6 +658,7 @@ mz_arrangement_sharing_per_worker mz_arrangement_sharing_raw mz_arrangement_sizes mz_arrangement_sizes_per_worker +mz_aws_connections mz_aws_privatelink_connection_status_history mz_cluster_links mz_cluster_replica_frontiers diff --git a/test/sqllogictest/information_schema_tables.slt b/test/sqllogictest/information_schema_tables.slt index c1939b22bc8bf..862ccb9b37078 100644 --- a/test/sqllogictest/information_schema_tables.slt +++ b/test/sqllogictest/information_schema_tables.slt @@ -317,6 +317,10 @@ mz_arrangement_sizes_per_worker VIEW materialize mz_internal +mz_aws_connections +BASE TABLE +materialize +mz_internal mz_aws_privatelink_connection_status_history SOURCE materialize diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 6bc4adcb08d12..0fb0aaba8c990 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -573,6 +573,7 @@ mz_storage_shards source name ---- mz_aggregates +mz_aws_connections mz_cluster_links mz_cluster_replica_metrics mz_cluster_replica_sizes @@ -682,7 +683,7 @@ test_table # `SHOW TABLES` and `mz_tables` should agree. > SELECT COUNT(*) FROM mz_tables WHERE id LIKE 's%' -47 +48 # There is one entry in mz_indexes for each field_number/expression of the index. > SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%'