Skip to content

Commit

Permalink
rust: add more rust logging (#351)
Browse files Browse the repository at this point in the history
* add info logging to show when processor shuts down

* log member id in python

* log member id in rust

* simplify rust logging

* linting
  • Loading branch information
dbanda committed Apr 12, 2024
1 parent 1a076a5 commit 6df76c8
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 1 deletion.
9 changes: 9 additions & 0 deletions arroyo/backends/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ def close(self, timeout: Optional[float] = None) -> None:
def closed(self) -> bool:
raise NotImplementedError

@property
@abstractmethod
def member_id(self) -> str:
"""
Return the member ID of the consumer as supplied by the broker.
This is useful for debugging purposes.
"""
raise NotImplementedError


class Producer(Generic[TStrategyPayload], ABC):
@abstractmethod
Expand Down
5 changes: 5 additions & 0 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ def close(self, timeout: Optional[float] = None) -> None:
def closed(self) -> bool:
return self.__state is KafkaConsumerState.CLOSED

@property
def member_id(self) -> str:
member_id: str = self.__consumer.memberid()
return member_id


class KafkaProducer(Producer[KafkaPayload]):
def __init__(self, configuration: Mapping[str, Any]) -> None:
Expand Down
4 changes: 4 additions & 0 deletions arroyo/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ def close(self, timeout: Optional[float] = None) -> None:
def closed(self) -> bool:
return self.__closed

@property
def member_id(self) -> str:
return "local-consumer"


class LocalProducer(Producer[TStrategyPayload]):
def __init__(self, broker: LocalBroker[TStrategyPayload]) -> None:
Expand Down
4 changes: 3 additions & 1 deletion arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None:
@_rdkafka_callback(metrics=self.__metrics_buffer)
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
logger.info("Member id: %r", self.__consumer.member_id)
self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_assigned.count", len(partitions)
)
Expand Down Expand Up @@ -422,7 +423,8 @@ def _run_once(self) -> None:
self.__backpressure_timestamp = time.time()

elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > BACKPRESSURE_THRESHOLD
time.time() - self.__backpressure_timestamp
> BACKPRESSURE_THRESHOLD
):
self.__metrics_buffer.incr_counter("arroyo.consumer.pause", 1)
logger.debug(
Expand Down
11 changes: 11 additions & 0 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic};
use chrono::{DateTime, NaiveDateTime, Utc};
use parking_lot::Mutex;
use rdkafka::bindings::rd_kafka_memberid;
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::base_consumer::BaseConsumer;
Expand Down Expand Up @@ -240,6 +241,16 @@ impl<C: AssignmentCallbacks> ConsumerContext for CustomContext<C> {
HashMap::with_capacity(committed_offsets.count());
let mut tpl = TopicPartitionList::with_capacity(committed_offsets.count());

// TODO: this will log member id every time we get an assignment, which is not ideal
// it would be better to log it only when it changes. We log for debugging purposes.
unsafe {
let member_id = rd_kafka_memberid(base_consumer.client().native_ptr());
tracing::info!(
"Kafka consumer member id: {:?}",
std::ffi::CStr::from_ptr(member_id).to_str().unwrap()
);
};

for partition in committed_offsets.elements() {
let raw_offset = partition.offset().to_raw().unwrap();

Expand Down
4 changes: 4 additions & 0 deletions rust-arroyo/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub struct ProcessorHandle {

impl ProcessorHandle {
pub fn signal_shutdown(&mut self) {
tracing::info!("Shutdown requested");
self.shutdown_requested.store(true, Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -145,6 +146,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
let mut state = self.0.locked_state();
if let Some(s) = state.strategy.as_mut() {
let result = panic::catch_unwind(AssertUnwindSafe(|| {
tracing::info!("Closing and joining strategy");
s.close();
s.join(None)
}));
Expand Down Expand Up @@ -410,6 +412,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
if let Err(e) = self.run_once() {
let mut trait_callbacks = self.consumer_state.locked_state();

tracing::info!("Caught error, terminating strategy");
if let Some(strategy) = trait_callbacks.strategy.as_mut() {
strategy.terminate();
}
Expand All @@ -418,6 +421,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
return Err(e);
}
}
tracing::info!("Shutdown processor");
Ok(())
}

Expand Down

0 comments on commit 6df76c8

Please sign in to comment.