Skip to content

Commit

Permalink
storage: consolidate how we handle kafka errors (#18652)
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Apr 6, 2023
1 parent 865f219 commit 2191ed5
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 167 deletions.
31 changes: 27 additions & 4 deletions src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use mz_ore::collections::CollectionExt;
use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken};
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::{ConsumerContext, Rebalance};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::error::{KafkaError, KafkaResult, RDKafkaErrorCode};
use rdkafka::producer::{DefaultProducerContext, DeliveryResult, ProducerContext};
use rdkafka::types::RDKafkaRespErr;
use rdkafka::util::Timeout;
use rdkafka::{ClientContext, Statistics, TopicPartitionList};
use tracing::{debug, error, info, warn, Level};

/// A reasonable default timeout when fetching metadata or partitions.
pub const DEFAULT_FETCH_METADATA_TIMEOUT: Duration = Duration::from_secs(30);

/// A `ClientContext` implementation that uses `tracing` instead of `log`
/// macros.
///
Expand Down Expand Up @@ -217,12 +220,15 @@ where
}
}

/// Id of a partition in a topic.
pub type PartitionId = i32;

/// Retrieve number of partitions for a given `topic` using the given `client`
pub fn get_partitions<C: ClientContext>(
client: &Client<C>,
topic: &str,
timeout: Duration,
) -> Result<Vec<i32>, anyhow::Error> {
) -> Result<Vec<PartitionId>, anyhow::Error> {
let meta = client.fetch_metadata(Some(topic), timeout)?;
if meta.topics().len() != 1 {
bail!(
Expand All @@ -231,7 +237,17 @@ pub fn get_partitions<C: ClientContext>(
meta.topics().len()
);
}

fn check_err(err: Option<RDKafkaRespErr>) -> anyhow::Result<()> {
if let Some(err) = err {
Err(RDKafkaErrorCode::from(err))?
}
Ok(())
}

let meta_topic = meta.topics().into_element();
check_err(meta_topic.error())?;

if meta_topic.name() != topic {
bail!(
"got results for wrong topic {} (expected {})",
Expand All @@ -240,11 +256,18 @@ pub fn get_partitions<C: ClientContext>(
);
}

if meta_topic.partitions().len() == 0 {
let mut partition_ids = Vec::with_capacity(meta_topic.partitions().len());
for partition_meta in meta_topic.partitions() {
check_err(partition_meta.error())?;

partition_ids.push(partition_meta.id());
}

if partition_ids.len() == 0 {
bail!("topic {} does not exist", topic);
}

Ok(meta_topic.partitions().iter().map(|x| x.id()).collect())
Ok(partition_ids)
}

/// A simpler version of [`create_new_client_config`] that defaults
Expand Down
69 changes: 10 additions & 59 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
//! Provides parsing and convenience functions for working with Kafka from the `sql` package.

use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use anyhow::bail;
use mz_ore::error::ErrorExt;
use rdkafka::client::ClientContext;
use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
use rdkafka::{Offset, TopicPartitionList};
use tokio::time::Duration;

use mz_kafka_util::client::{BrokerRewritingClientContext, MzClientContext};
use mz_kafka_util::client::{
BrokerRewritingClientContext, MzClientContext, DEFAULT_FETCH_METADATA_TIMEOUT,
};
use mz_ore::task;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{AstInfo, KafkaConfigOption, KafkaConfigOptionName};
Expand Down Expand Up @@ -212,18 +213,13 @@ pub async fn create_consumer(
connection_context: &ConnectionContext,
kafka_connection: &KafkaConnection,
topic: &str,
) -> Result<Arc<BaseConsumer<BrokerRewritingClientContext<KafkaErrCheckContext>>>, PlanError> {
) -> Result<Arc<BaseConsumer<BrokerRewritingClientContext<MzClientContext>>>, PlanError> {
let consumer: BaseConsumer<_> = kafka_connection
.create_with_context(
connection_context,
KafkaErrCheckContext::default(),
&BTreeMap::new(),
)
.create_with_context(connection_context, MzClientContext, &BTreeMap::new())
.await
.map_err(|e| sql_err!("{}", e.display_with_causes()))?;
let consumer = Arc::new(consumer);

let context = Arc::clone(consumer.context());
let owned_topic = String::from(topic);
// Wait for a metadata request for up to two seconds. This greatly
// increases the probability that we'll see a connection error if
Expand All @@ -235,16 +231,11 @@ pub async fn create_consumer(
// ensure that at least one metadata request succeeds.
task::spawn_blocking(move || format!("kafka_get_metadata:{topic}"), {
let consumer = Arc::clone(&consumer);
move || {
let _ = consumer.fetch_metadata(Some(&owned_topic), Duration::from_secs(2));
}
move || consumer.fetch_metadata(Some(&owned_topic), DEFAULT_FETCH_METADATA_TIMEOUT)
})
.await
.map_err(|e| sql_err!("{}", e))?;
let error = context.inner().error.lock().expect("lock poisoned");
if let Some(error) = &*error {
sql_bail!("librdkafka: {}", error)
}
.map_err(|e| sql_err!("{}", e))?
.map_err(|e| sql_err!("librdkafka: {}", e.display_with_causes()))?;
Ok(consumer)
}

Expand Down Expand Up @@ -298,7 +289,7 @@ where
let num_partitions = mz_kafka_util::client::get_partitions(
consumer.as_ref().client(),
&topic,
Duration::from_secs(10),
DEFAULT_FETCH_METADATA_TIMEOUT,
)
.map_err(|e| sql_err!("{}", e))?
.len();
Expand Down Expand Up @@ -358,43 +349,3 @@ where
.map_err(|e| sql_err!("{}", e))?;
Ok(high)
}

/// Gets error strings from `rdkafka` when creating test consumers.
#[derive(Default, Debug, Clone)]
pub struct KafkaErrCheckContext {
pub error: Arc<Mutex<Option<String>>>,
}

impl ConsumerContext for KafkaErrCheckContext {}

impl ClientContext for KafkaErrCheckContext {
// `librdkafka` doesn't seem to propagate all errors up the stack, but does
// log them, so we are currently relying on the `log` callback for error
// handling in some situations.
fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
use rdkafka::config::RDKafkaLogLevel::*;
// `INFO` messages with a `fac` of `FAIL` occur when e.g. connecting to
// an SSL-authed broker without credentials.
if fac == "FAIL" || matches!(level, Emerg | Alert | Critical | Error) {
let mut error = self.error.lock().expect("lock poisoned");
// Do not allow logging to overwrite other values if
// present.
if error.is_none() {
*error = Some(format!("error logged: {}", log_message));
}
}
MzClientContext.log(level, fac, log_message)
}
// Refer to the comment on the `log` callback.
fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
// Allow error to overwrite value irrespective of other conditions
// (i.e. logging).

*self.error.lock().expect("lock poisoned") = Some(format!(
"{}: reason: {}",
error.display_with_causes(),
reason
));
MzClientContext.error(error, reason)
}
}
4 changes: 2 additions & 2 deletions src/storage-client/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use anyhow::{anyhow, Context};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, ResourceSpecifier, TopicReplication};
use rdkafka::ClientContext;

use mz_kafka_util::client::MzClientContext;
use mz_kafka_util::client::{MzClientContext, DEFAULT_FETCH_METADATA_TIMEOUT};
use mz_ore::collections::CollectionExt;

use crate::types::connections::ConnectionContext;
Expand Down Expand Up @@ -53,7 +53,7 @@ where
if partition_count == -1 || replication_factor == -1 {
let metadata = client
.inner()
.fetch_metadata(None, Duration::from_secs(5))
.fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
.with_context(|| {
format!(
"error fetching metadata when creating new topic {} for sink",
Expand Down
53 changes: 7 additions & 46 deletions src/storage/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ use timely::dataflow::operators::{Enter, Leave, Map};
use timely::dataflow::{Scope, Stream};
use timely::progress::{Antichain, Timestamp as _};
use timely::PartialOrder;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};

use mz_interchange::avro::{AvroEncoder, AvroSchemaGenerator};
use mz_interchange::encode::Encode;
use mz_interchange::json::JsonEncoder;
use mz_kafka_util::client::{BrokerRewritingClientContext, MzClientContext};
use mz_kafka_util::client::{
BrokerRewritingClientContext, MzClientContext, DEFAULT_FETCH_METADATA_TIMEOUT,
};
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::error::ErrorExt;
Expand Down Expand Up @@ -224,7 +226,6 @@ impl KafkaSinkSendRetryManager {
#[derive(Clone)]
pub struct SinkProducerContext {
metrics: Arc<SinkMetrics>,
status_tx: mpsc::Sender<SinkStatus>,
retry_manager: Arc<Mutex<KafkaSinkSendRetryManager>>,
}

Expand All @@ -235,11 +236,6 @@ impl ClientContext for SinkProducerContext {
MzClientContext.log(level, fac, log_message)
}
fn error(&self, error: KafkaError, reason: &str) {
let status = SinkStatus::Stalled {
error: error.to_string(),
hint: None,
};
let _ = self.status_tx.try_send(status);
MzClientContext.error(error, reason)
}
}
Expand Down Expand Up @@ -386,7 +382,7 @@ struct KafkaSinkState {

progress_topic: String,
progress_key: String,
progress_client: Option<Arc<BaseConsumer<BrokerRewritingClientContext<SinkConsumerContext>>>>,
progress_client: Option<Arc<BaseConsumer<BrokerRewritingClientContext<MzClientContext>>>>,

healthchecker: Arc<Mutex<Option<Healthchecker>>>,
internal_cmd_tx: Rc<RefCell<dyn InternalCommandSender>>,
Expand All @@ -406,27 +402,6 @@ struct KafkaSinkState {
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
}

#[derive(Clone)]
struct SinkConsumerContext {
status_tx: mpsc::Sender<SinkStatus>,
}

impl ClientContext for SinkConsumerContext {
fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
MzClientContext.log(level, fac, log_message)
}
fn error(&self, error: KafkaError, reason: &str) {
let status = SinkStatus::Stalled {
error: error.to_string(),
hint: None,
};
let _ = self.status_tx.try_send(status);
MzClientContext.error(error, reason)
}
}

impl ConsumerContext for SinkConsumerContext {}

impl KafkaSinkState {
async fn new(
connection: KafkaSinkConnection,
Expand All @@ -448,27 +423,13 @@ impl KafkaSinkState {

let retry_manager = Arc::new(Mutex::new(KafkaSinkSendRetryManager::new()));

let (status_tx, mut status_rx) = mpsc::channel(16);

let producer_context = SinkProducerContext {
metrics: Arc::clone(&metrics),
status_tx: status_tx.clone(),
retry_manager: Arc::clone(&retry_manager),
};

let healthchecker: Arc<Mutex<Option<Healthchecker>>> = Arc::new(Mutex::new(None));

{
let healthchecker = Arc::clone(&healthchecker);
task::spawn(|| "producer-error-report", async move {
while let Some(status) = status_rx.recv().await {
if let Some(hc) = healthchecker.lock().await.as_mut() {
hc.update_status(status).await;
}
}
});
}

let producer = connection
.connection
.create_with_context(
Expand Down Expand Up @@ -514,7 +475,7 @@ impl KafkaSinkState {
.connection
.create_with_context(
connection_context,
SinkConsumerContext { status_tx },
MzClientContext,
&btreemap! {
"group.id" => format!("materialize-bootstrap-sink-{sink_id}"),
"isolation.level" => "read_committed".into(),
Expand Down Expand Up @@ -769,7 +730,7 @@ impl KafkaSinkState {
&progress_topic,
&progress_key,
&progress_client,
Duration::from_secs(10),
DEFAULT_FETCH_METADATA_TIMEOUT,
)
},
)
Expand Down
Loading

0 comments on commit 2191ed5

Please sign in to comment.