Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sinks] Fail open when we can't fetch Kafka config #18853

Merged
merged 4 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 103 additions & 73 deletions src/storage-client/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
use std::collections::BTreeMap;
use std::time::Duration;

use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication};
use rdkafka::ClientContext;
use tracing::warn;

use mz_kafka_util::client::{MzClientContext, DEFAULT_FETCH_METADATA_TIMEOUT};
use mz_ore::collections::CollectionExt;
Expand All @@ -36,96 +37,125 @@ pub async fn build_sink_connection(
}
}

async fn ensure_kafka_topic<C>(
struct TopicConfigs {
partition_count: i32,
replication_factor: i32,
}

async fn discover_topic_configs<C: ClientContext>(
client: &AdminClient<C>,
topic: &str,
mut partition_count: i32,
mut replication_factor: i32,
retention: KafkaSinkConnectionRetention,
) -> Result<(), anyhow::Error>
where
C: ClientContext,
{
// if either partition count or replication factor should be defaulted to the broker's config
// (signaled by a value of -1), explicitly poll the broker to discover the defaults.
// Newer versions of Kafka can instead send create topic requests with -1 and have this happen
// behind the scenes, but this is unsupported and will result in errors on pre-2.4 Kafka
if partition_count == -1 || replication_factor == -1 {
let metadata = client
.inner()
.fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
.with_context(|| {
format!(
"error fetching metadata when creating new topic {} for sink",
topic
)
})?;
) -> Result<TopicConfigs, anyhow::Error> {
let mut partition_count = -1;
let mut replication_factor = -1;

if metadata.brokers().len() == 0 {
Err(anyhow!("zero brokers discovered in metadata request"))?;
}
let metadata = client
.inner()
.fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
.with_context(|| {
format!(
"error fetching metadata when creating new topic {} for sink",
topic
)
})?;

let broker = metadata.brokers()[0].id();
let configs = client
.describe_configs(
&[ResourceSpecifier::Broker(broker)],
&AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
if metadata.brokers().len() == 0 {
Err(anyhow!("zero brokers discovered in metadata request"))?;
}

let broker = metadata.brokers()[0].id();
let configs = client
.describe_configs(
&[ResourceSpecifier::Broker(broker)],
&AdminOptions::new().request_timeout(Some(Duration::from_secs(5))),
)
.await
.with_context(|| {
format!(
"error fetching configuration from broker {} when creating new topic {} for sink",
broker, topic
)
.await
.with_context(|| {
format!(
"error fetching configuration from broker {} when creating new topic {} for sink",
broker,
topic
)
})?;

if configs.len() != 1 {
Err(anyhow!(
if configs.len() != 1 {
Err(anyhow!(
"error creating topic {} for sink: broker {} returned {} config results, but one was expected",
topic,
broker,
configs.len()
))?;
}
}

let config = configs.into_element().map_err(|e| {
anyhow!(
"error reading broker configuration when creating topic {} for sink: {}",
topic,
e
)
})?;
let config = configs.into_element().map_err(|e| {
anyhow!(
"error reading broker configuration when creating topic {} for sink: {}",
topic,
e
)
})?;

for entry in config.entries {
if entry.name == "num.partitions" && partition_count == -1 {
if let Some(s) = entry.value {
partition_count = s.parse::<i32>().with_context(|| {
format!(
"default partition count {} cannot be parsed into an integer",
s
)
})?;
}
} else if entry.name == "default.replication.factor" && replication_factor == -1 {
if let Some(s) = entry.value {
replication_factor = s.parse::<i32>().with_context(|| {
format!(
"default replication factor {} cannot be parsed into an integer",
s
)
})?;
}
if config.entries.is_empty() {
bail!("read empty custer configuration; do we have DescribeConfigs permissions?")
}

for entry in config.entries {
if entry.name == "num.partitions" && partition_count == -1 {
if let Some(s) = entry.value {
partition_count = s.parse::<i32>().with_context(|| {
format!(
"default partition count {} cannot be parsed into an integer",
s
)
})?;
}
} else if entry.name == "default.replication.factor" && replication_factor == -1 {
if let Some(s) = entry.value {
replication_factor = s.parse::<i32>().with_context(|| {
format!(
"default replication factor {} cannot be parsed into an integer",
s
)
})?;
}
}
}

if partition_count == -1 {
Err(anyhow!("default was requested for partition_count, but num.partitions was not found in broker config"))?;
}
Ok(TopicConfigs {
partition_count,
replication_factor,
})
}

if replication_factor == -1 {
Err(anyhow!("default was requested for replication_factor, but default.replication.factor was not found in broker config"))?;
}
async fn ensure_kafka_topic<C>(
client: &AdminClient<C>,
topic: &str,
mut partition_count: i32,
mut replication_factor: i32,
retention: KafkaSinkConnectionRetention,
) -> Result<(), anyhow::Error>
where
C: ClientContext,
{
// if either partition count or replication factor should be defaulted to the broker's config
// (signaled by a value of -1), explicitly poll the broker to discover the defaults.
// Newer versions of Kafka can instead send create topic requests with -1 and have this happen
// behind the scenes, but this is unsupported and will result in errors on pre-2.4 Kafka.
if partition_count == -1 || replication_factor == -1 {
match discover_topic_configs(client, topic).await {
Ok(configs) => {
if partition_count == -1 {
partition_count = configs.partition_count;
}
if replication_factor == -1 {
replication_factor = configs.replication_factor;
}
}
Err(e) => {
// Since recent versions of Kafka can handle an explicit -1 config, this
// request will probably still succeed. Logging anyways for visibility.
warn!("Failed to discover default values for topic configs: {e}");
}
};
}

let mut kafka_topic = NewTopic::new(
Expand Down
25 changes: 24 additions & 1 deletion test/kafka-sasl-plain/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
"KAFKA_SSL_CLIENT_AUTH=required",
"KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_SSL",
"KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/sasl.jaas.config",
# -Dauthorizer.class.name=kafka.security.authorizer.AclAuthorizer",
"KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer",
"KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=true",
"KAFKA_SUPER_USERS=User:materialize;User:broker",
# Standard options we don't want to overwrite!
"KAFKA_MIN_INSYNC_REPLICAS=1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
Expand All @@ -51,6 +55,7 @@
volumes=[
"secrets:/etc/kafka/secrets",
"./sasl.jaas.config:/etc/kafka/sasl.jaas.config",
"./no-describe-configs.properties:/etc/kafka/no-describe-configs.properties",
],
),
SchemaRegistry(
Expand Down Expand Up @@ -118,4 +123,22 @@ def workflow_default(c: Composition) -> None:
c.up("test-certs")
c.up("zookeeper", "kafka", "schema-registry")
c.up("materialized")
c.run("testdrive", "*.td")

c.run("testdrive", "smoketest.td")

# Deny the DescribeConfigs cluster priviilege to user no_describe_configs
aljoscha marked this conversation as resolved.
Show resolved Hide resolved
c.exec(
aljoscha marked this conversation as resolved.
Show resolved Hide resolved
"kafka",
"kafka-acls",
"--bootstrap-server",
"kafka:9092",
"--add",
"--deny-principal",
"User:CN=no_describe_configs",
"--operation",
"DescribeConfigs",
"--cluster",
"--command-config",
"/etc/kafka/no-describe-configs.properties",
)
c.run("testdrive", "no-describe-configs.td")
18 changes: 18 additions & 0 deletions test/kafka-sasl-plain/no-describe-configs.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="materialize" password="sekurity";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks
ssl.truststore.password=mzmzmz
ssl.keystore.location=/etc/kafka/secrets/kafka.keystore.jks
ssl.keystore.password=mzmzmz
ssl.key.password=mzmzmz
54 changes: 54 additions & 0 deletions test/kafka-sasl-plain/no-describe-configs.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# Test that sinks can be created even in the absence of the DescribeConfigs permission
#

> CREATE MATERIALIZED VIEW no_describe_configs_view AS SELECT 1;

> CREATE SECRET no_describe_configs_password AS 'password'

> CREATE CONNECTION no_describe_configs_kafka_conn TO KAFKA (
BROKER 'kafka:9092',
SASL MECHANISMS = 'PLAIN',
SASL USERNAME = 'no_describe_config',
SASL PASSWORD = SECRET no_describe_configs_password,
SSL CERTIFICATE AUTHORITY = '${arg.ca}'
);

> CREATE CONNECTION no_describe_configs_csr_conn
FOR CONFLUENT SCHEMA REGISTRY
URL '${testdrive.schema-registry-url}',
SSL CERTIFICATE AUTHORITY = '${arg.ca}'


$ kafka-create-topic topic=no-describe-configs1

> CREATE SINK no_describe_configs_sink1
FROM no_describe_configs_view
INTO KAFKA CONNECTION no_describe_configs_kafka_conn (TOPIC 'testdrive-no-describe-configs1-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn
ENVELOPE DEBEZIUM;

$ kafka-create-topic topic=no-describe-configs2

> CREATE SINK no_describe_configs_sink2
FROM no_describe_configs_view
INTO KAFKA CONNECTION no_describe_configs_kafka_conn (REPLICATION FACTOR = -1 , TOPIC 'testdrive-no-describe-configs2-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn
ENVELOPE DEBEZIUM;

$ kafka-create-topic topic=no-describe-configs3

> CREATE SINK no_describe_configs_sink3
FROM no_describe_configs_view
INTO KAFKA CONNECTION no_describe_configs_kafka_conn (PARTITION COUNT = -1 , TOPIC 'testdrive-no-describe-configs3-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION no_describe_configs_csr_conn
ENVELOPE DEBEZIUM;
3 changes: 2 additions & 1 deletion test/kafka-sasl-plain/sasl.jaas.config
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ KafkaServer {
password="broker"
user_broker="broker"
user_schemaregistry="schemaregistry"
user_materialize="sekurity";
user_materialize="sekurity"
user_no_describe_config="password";
};

// Zookeeper client, despite the generic name.
Expand Down
2 changes: 1 addition & 1 deletion test/test-certs/create-certs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ create_cert() {
-passout pass:$SSL_SECRET
}

for i in materialized producer postgres certuser
for i in materialized producer postgres certuser no_describe_configs
do
create_cert $i "ca" $i

Expand Down