From 6df76c8e091ae8cb60155d0cd8bc8c75a2bab3e7 Mon Sep 17 00:00:00 2001 From: Dalitso Banda Date: Fri, 12 Apr 2024 10:34:10 -0700 Subject: [PATCH] rust: add more rust logging (#351) * add info logging to show when processor shuts down * log member id in python * log member id in rust * simplify rust logging * linting --- arroyo/backends/abstract.py | 9 +++++++++ arroyo/backends/kafka/consumer.py | 5 +++++ arroyo/backends/local/backend.py | 4 ++++ arroyo/processing/processor.py | 4 +++- rust-arroyo/src/backends/kafka/mod.rs | 11 +++++++++++ rust-arroyo/src/processing/mod.rs | 4 ++++ 6 files changed, 36 insertions(+), 1 deletion(-) diff --git a/arroyo/backends/abstract.py b/arroyo/backends/abstract.py index 00483dad..4859c220 100644 --- a/arroyo/backends/abstract.py +++ b/arroyo/backends/abstract.py @@ -165,6 +165,15 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: raise NotImplementedError + @property + @abstractmethod + def member_id(self) -> str: + """ + Return the member ID of the consumer as supplied by the broker. + This is useful for debugging purposes. + """ + raise NotImplementedError + class Producer(Generic[TStrategyPayload], ABC): @abstractmethod diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 68bc1bc8..c9264e3f 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -628,6 +628,11 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: return self.__state is KafkaConsumerState.CLOSED + @property + def member_id(self) -> str: + member_id: str = self.__consumer.memberid() + return member_id + class KafkaProducer(Producer[KafkaPayload]): def __init__(self, configuration: Mapping[str, Any]) -> None: diff --git a/arroyo/backends/local/backend.py b/arroyo/backends/local/backend.py index a7c438b5..1976c051 100644 --- a/arroyo/backends/local/backend.py +++ b/arroyo/backends/local/backend.py @@ -359,6 +359,10 @@ def close(self, timeout: Optional[float] = None) -> None: def closed(self) -> bool: return self.__closed + @property + def member_id(self) -> str: + return "local-consumer" + class LocalProducer(Producer[TStrategyPayload]): def __init__(self, broker: LocalBroker[TStrategyPayload]) -> None: diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 6966e30d..631d3396 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -233,6 +233,7 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None: @_rdkafka_callback(metrics=self.__metrics_buffer) def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) + logger.info("Member id: %r", self.__consumer.member_id) self.__metrics_buffer.metrics.increment( "arroyo.consumer.partitions_assigned.count", len(partitions) ) @@ -422,7 +423,8 @@ def _run_once(self) -> None: self.__backpressure_timestamp = time.time() elif not self.__is_paused and ( - time.time() - self.__backpressure_timestamp > BACKPRESSURE_THRESHOLD + time.time() - self.__backpressure_timestamp + > BACKPRESSURE_THRESHOLD ): self.__metrics_buffer.incr_counter("arroyo.consumer.pause", 1) logger.debug( diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index d30790a3..f146046f 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -8,6 +8,7 @@ use crate::gauge; use crate::types::{BrokerMessage, Partition, Topic}; use chrono::{DateTime, NaiveDateTime, Utc}; use parking_lot::Mutex; +use rdkafka::bindings::rd_kafka_memberid; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::base_consumer::BaseConsumer; @@ -240,6 +241,16 @@ impl ConsumerContext for CustomContext { HashMap::with_capacity(committed_offsets.count()); let mut tpl = TopicPartitionList::with_capacity(committed_offsets.count()); + // TODO: this will log member id every time we get an assignment, which is not ideal + // it would be better to log it only when it changes. We log for debugging purposes. + unsafe { + let member_id = rd_kafka_memberid(base_consumer.client().native_ptr()); + tracing::info!( + "Kafka consumer member id: {:?}", + std::ffi::CStr::from_ptr(member_id).to_str().unwrap() + ); + }; + for partition in committed_offsets.elements() { let raw_offset = partition.offset().to_raw().unwrap(); diff --git a/rust-arroyo/src/processing/mod.rs b/rust-arroyo/src/processing/mod.rs index 510ac541..f6319b7b 100644 --- a/rust-arroyo/src/processing/mod.rs +++ b/rust-arroyo/src/processing/mod.rs @@ -105,6 +105,7 @@ pub struct ProcessorHandle { impl ProcessorHandle { pub fn signal_shutdown(&mut self) { + tracing::info!("Shutdown requested"); self.shutdown_requested.store(true, Ordering::Relaxed); } } @@ -145,6 +146,7 @@ impl AssignmentCallbacks for Callbacks StreamProcessor { if let Err(e) = self.run_once() { let mut trait_callbacks = self.consumer_state.locked_state(); + tracing::info!("Caught error, terminating strategy"); if let Some(strategy) = trait_callbacks.strategy.as_mut() { strategy.terminate(); } @@ -418,6 +421,7 @@ impl StreamProcessor { return Err(e); } } + tracing::info!("Shutdown processor"); Ok(()) }