diff --git a/src/storage-client/src/sink.rs b/src/storage-client/src/sink.rs index baeaf2a0dd8a..e4f41bba628f 100644 --- a/src/storage-client/src/sink.rs +++ b/src/storage-client/src/sink.rs @@ -10,9 +10,10 @@ use std::collections::BTreeMap; use std::time::Duration; -use anyhow::{anyhow, Context}; +use anyhow::{anyhow, bail, Context}; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication}; use rdkafka::ClientContext; +use tracing::warn; use mz_kafka_util::client::{MzClientContext, DEFAULT_FETCH_METADATA_TIMEOUT}; use mz_ore::collections::CollectionExt; @@ -36,96 +37,125 @@ pub async fn build_sink_connection( } } -async fn ensure_kafka_topic( +struct TopicConfigs { + partition_count: i32, + replication_factor: i32, +} + +async fn discover_topic_configs( client: &AdminClient, topic: &str, - mut partition_count: i32, - mut replication_factor: i32, - retention: KafkaSinkConnectionRetention, -) -> Result<(), anyhow::Error> -where - C: ClientContext, -{ - // if either partition count or replication factor should be defaulted to the broker's config - // (signaled by a value of -1), explicitly poll the broker to discover the defaults. - // Newer versions of Kafka can instead send create topic requests with -1 and have this happen - // behind the scenes, but this is unsupported and will result in errors on pre-2.4 Kafka - if partition_count == -1 || replication_factor == -1 { - let metadata = client - .inner() - .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT) - .with_context(|| { - format!( - "error fetching metadata when creating new topic {} for sink", - topic - ) - })?; +) -> Result { + let mut partition_count = -1; + let mut replication_factor = -1; - if metadata.brokers().len() == 0 { - Err(anyhow!("zero brokers discovered in metadata request"))?; - } + let metadata = client + .inner() + .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT) + .with_context(|| { + format!( + "error fetching metadata when creating new topic {} for sink", + topic + ) + })?; - let broker = metadata.brokers()[0].id(); - let configs = client - .describe_configs( - &[ResourceSpecifier::Broker(broker)], - &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))), + if metadata.brokers().len() == 0 { + Err(anyhow!("zero brokers discovered in metadata request"))?; + } + + let broker = metadata.brokers()[0].id(); + let configs = client + .describe_configs( + &[ResourceSpecifier::Broker(broker)], + &AdminOptions::new().request_timeout(Some(Duration::from_secs(5))), + ) + .await + .with_context(|| { + format!( + "error fetching configuration from broker {} when creating new topic {} for sink", + broker, topic ) - .await - .with_context(|| { - format!( - "error fetching configuration from broker {} when creating new topic {} for sink", - broker, - topic - ) })?; - if configs.len() != 1 { - Err(anyhow!( + if configs.len() != 1 { + Err(anyhow!( "error creating topic {} for sink: broker {} returned {} config results, but one was expected", topic, broker, configs.len() ))?; - } + } - let config = configs.into_element().map_err(|e| { - anyhow!( - "error reading broker configuration when creating topic {} for sink: {}", - topic, - e - ) - })?; + let config = configs.into_element().map_err(|e| { + anyhow!( + "error reading broker configuration when creating topic {} for sink: {}", + topic, + e + ) + })?; - for entry in config.entries { - if entry.name == "num.partitions" && partition_count == -1 { - if let Some(s) = entry.value { - partition_count = s.parse::().with_context(|| { - format!( - "default partition count {} cannot be parsed into an integer", - s - ) - })?; - } - } else if entry.name == "default.replication.factor" && replication_factor == -1 { - if let Some(s) = entry.value { - replication_factor = s.parse::().with_context(|| { - format!( - "default replication factor {} cannot be parsed into an integer", - s - ) - })?; - } + if config.entries.is_empty() { + bail!("read empty custer configuration; do we have DescribeConfigs permissions?") + } + + for entry in config.entries { + if entry.name == "num.partitions" && partition_count == -1 { + if let Some(s) = entry.value { + partition_count = s.parse::().with_context(|| { + format!( + "default partition count {} cannot be parsed into an integer", + s + ) + })?; + } + } else if entry.name == "default.replication.factor" && replication_factor == -1 { + if let Some(s) = entry.value { + replication_factor = s.parse::().with_context(|| { + format!( + "default replication factor {} cannot be parsed into an integer", + s + ) + })?; } } + } - if partition_count == -1 { - Err(anyhow!("default was requested for partition_count, but num.partitions was not found in broker config"))?; - } + Ok(TopicConfigs { + partition_count, + replication_factor, + }) +} - if replication_factor == -1 { - Err(anyhow!("default was requested for replication_factor, but default.replication.factor was not found in broker config"))?; - } +async fn ensure_kafka_topic( + client: &AdminClient, + topic: &str, + mut partition_count: i32, + mut replication_factor: i32, + retention: KafkaSinkConnectionRetention, +) -> Result<(), anyhow::Error> +where + C: ClientContext, +{ + // if either partition count or replication factor should be defaulted to the broker's config + // (signaled by a value of -1), explicitly poll the broker to discover the defaults. + // Newer versions of Kafka can instead send create topic requests with -1 and have this happen + // behind the scenes, but this is unsupported and will result in errors on pre-2.4 Kafka. + if partition_count == -1 || replication_factor == -1 { + match discover_topic_configs(client, topic).await { + Ok(configs) => { + if partition_count == -1 { + partition_count = configs.partition_count; + } + if replication_factor == -1 { + replication_factor = configs.replication_factor; + } + } + Err(e) => { + // Since recent versions of Kafka can handle an explicit -1 config, this + // request will probably still succeed. Logging anyways for visibility. + warn!("Failed to discover default values for topic configs: {e}"); + } + }; } let mut kafka_topic = NewTopic::new( diff --git a/test/kafka-sasl-plain/mzcompose.py b/test/kafka-sasl-plain/mzcompose.py index b8c5e4009200..c7d9fa77a2e1 100644 --- a/test/kafka-sasl-plain/mzcompose.py +++ b/test/kafka-sasl-plain/mzcompose.py @@ -42,6 +42,10 @@ "KAFKA_SSL_CLIENT_AUTH=required", "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_SSL", "KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/sasl.jaas.config", + # -Dauthorizer.class.name=kafka.security.authorizer.AclAuthorizer", + "KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer", + "KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=true", + "KAFKA_SUPER_USERS=User:materialize;User:broker", # Standard options we don't want to overwrite! "KAFKA_MIN_INSYNC_REPLICAS=1", "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1", @@ -51,6 +55,7 @@ volumes=[ "secrets:/etc/kafka/secrets", "./sasl.jaas.config:/etc/kafka/sasl.jaas.config", + "./no-describe-configs.properties:/etc/kafka/no-describe-configs.properties", ], ), SchemaRegistry( @@ -118,4 +123,22 @@ def workflow_default(c: Composition) -> None: c.up("test-certs") c.up("zookeeper", "kafka", "schema-registry") c.up("materialized") - c.run("testdrive", "*.td") + + c.run("testdrive", "smoketest.td") + + # Deny the DescribeConfigs cluster privilege to user no_describe_configs + c.exec( + "kafka", + "kafka-acls", + "--bootstrap-server", + "kafka:9092", + "--add", + "--deny-principal", + "User:CN=no_describe_configs", + "--operation", + "DescribeConfigs", + "--cluster", + "--command-config", + "/etc/kafka/no-describe-configs.properties", + ) + c.run("testdrive", "no-describe-configs.td") diff --git a/test/kafka-sasl-plain/no-describe-configs.properties b/test/kafka-sasl-plain/no-describe-configs.properties new file mode 100644 index 000000000000..5ffc7d42d5f1 --- /dev/null +++ b/test/kafka-sasl-plain/no-describe-configs.properties @@ -0,0 +1,18 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ + required username="materialize" password="sekurity"; +security.protocol=SASL_SSL +sasl.mechanism=PLAIN +ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks +ssl.truststore.password=mzmzmz +ssl.keystore.location=/etc/kafka/secrets/kafka.keystore.jks +ssl.keystore.password=mzmzmz +ssl.key.password=mzmzmz diff --git a/test/kafka-sasl-plain/no-describe-configs.td b/test/kafka-sasl-plain/no-describe-configs.td new file mode 100644 index 000000000000..242a89bf7b97 --- /dev/null +++ b/test/kafka-sasl-plain/no-describe-configs.td @@ -0,0 +1,54 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# +# Test that sinks can be created even in the absence of the DescribeConfigs permission +# + +> CREATE MATERIALIZED VIEW no_describe_configs_view AS SELECT 1; + +> CREATE SECRET no_describe_configs_password AS 'password' + +> CREATE CONNECTION no_describe_configs_kafka_conn TO KAFKA ( + BROKER 'kafka:9092', + SASL MECHANISMS = 'PLAIN', + SASL USERNAME = 'no_describe_config', + SASL PASSWORD = SECRET no_describe_configs_password, + SSL CERTIFICATE AUTHORITY = '${arg.ca}' + ); + +> CREATE CONNECTION no_describe_configs_csr_conn + FOR CONFLUENT SCHEMA REGISTRY + URL '${testdrive.schema-registry-url}', + SSL CERTIFICATE AUTHORITY = '${arg.ca}' + + +$ kafka-create-topic topic=no-describe-configs1 + +> CREATE SINK no_describe_configs_sink1 + FROM no_describe_configs_view + INTO KAFKA CONNECTION no_describe_configs_kafka_conn (TOPIC 'testdrive-no-describe-configs1-${testdrive.seed}') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn + ENVELOPE DEBEZIUM; + +$ kafka-create-topic topic=no-describe-configs2 + +> CREATE SINK no_describe_configs_sink2 + FROM no_describe_configs_view + INTO KAFKA CONNECTION no_describe_configs_kafka_conn (REPLICATION FACTOR = -1 , TOPIC 'testdrive-no-describe-configs2-${testdrive.seed}') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn + ENVELOPE DEBEZIUM; + +$ kafka-create-topic topic=no-describe-configs3 + +> CREATE SINK no_describe_configs_sink3 + FROM no_describe_configs_view + INTO KAFKA CONNECTION no_describe_configs_kafka_conn (PARTITION COUNT = -1 , TOPIC 'testdrive-no-describe-configs3-${testdrive.seed}') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn + ENVELOPE DEBEZIUM; diff --git a/test/kafka-sasl-plain/sasl.jaas.config b/test/kafka-sasl-plain/sasl.jaas.config index 5ffc2d568e4d..682508da54fe 100644 --- a/test/kafka-sasl-plain/sasl.jaas.config +++ b/test/kafka-sasl-plain/sasl.jaas.config @@ -25,7 +25,8 @@ KafkaServer { password="broker" user_broker="broker" user_schemaregistry="schemaregistry" - user_materialize="sekurity"; + user_materialize="sekurity" + user_no_describe_config="password"; }; // Zookeeper client, despite the generic name. diff --git a/test/test-certs/create-certs.sh b/test/test-certs/create-certs.sh index fe5ec653c87c..50d3f9921367 100755 --- a/test/test-certs/create-certs.sh +++ b/test/test-certs/create-certs.sh @@ -128,7 +128,7 @@ create_cert() { -passout pass:$SSL_SECRET } -for i in materialized producer postgres certuser +for i in materialized producer postgres certuser no_describe_configs do create_cert $i "ca" $i