Skip to content

Commit

Permalink
ref: Use coarsetime consistently (#366)
Browse files Browse the repository at this point in the history
* ref: Use coarsetime consistently

coarsetime is a crate that allows you to fetch the current time with an
in-memory cache. This tends to be faster than actually getting the
current time.

Previously we were using Instant::now_without_update, which does not
actually give us any perf benefits. Let's use ::recent everywhere and
update the timestamp in the StreamProcessor.

* fix tests and examples

* fix tests
  • Loading branch information
untitaker committed May 17, 2024
1 parent 8c67d09 commit 41457bf
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 29 deletions.
5 changes: 3 additions & 2 deletions rust-arroyo/examples/base_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate rust_arroyo;

use chrono::Duration;
use std::time::Duration;

use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
Expand All @@ -12,7 +13,7 @@ use rust_arroyo::types::Topic;
struct TestFactory {}
impl ProcessingStrategyFactory<KafkaPayload> for TestFactory {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
Box::new(CommitOffsets::new(Duration::seconds(1)))
Box::new(CommitOffsets::new(Duration::from_secs(1)))
}
}

Expand Down
3 changes: 2 additions & 1 deletion rust-arroyo/src/metrics/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Display for MetricValue {
}

macro_rules! into_metric_value {
($($from:ident),+ => $variant:ident) => {
($($from:path),+ => $variant:ident) => {
$(
impl From<$from> for MetricValue {
#[inline(always)]
Expand All @@ -82,6 +82,7 @@ into_metric_value!(i8, i16, i32, i64 => I64);
into_metric_value!(u8, u16, u32, u64 => U64);
into_metric_value!(f32, f64 => F64);
into_metric_value!(Duration => Duration);
into_metric_value!(coarsetime::Duration => Duration);

/// An alias for a list of Metric tags.
pub type MetricTags<'a> = &'a [(Option<&'a dyn Display>, &'a dyn Display)];
Expand Down
29 changes: 18 additions & 11 deletions rust-arroyo/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use parking_lot::{Mutex, MutexGuard};
use thiserror::Error;
Expand Down Expand Up @@ -124,14 +124,17 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
partitions.len() as i64
);

let start = Instant::now();
let start = coarsetime::Instant::recent();

let mut state = self.0.locked_state();
state.processing_factory.update_partitions(&partitions);
state.strategy = Some(state.processing_factory.create());
state.dlq_policy.reset_dlq_limits(&partitions);

timer!("arroyo.consumer.create_strategy.time", start.elapsed());
timer!(
"arroyo.consumer.create_strategy.time",
start.elapsed_since_recent()
);
}

fn on_revoke<C: CommitOffsets>(&self, commit_offsets: C, partitions: Vec<Partition>) {
Expand All @@ -141,7 +144,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
partitions.len() as i64,
);

let start = Instant::now();
let start = coarsetime::Instant::recent();

let mut state = self.0.locked_state();
if let Some(s) = state.strategy.as_mut() {
Expand Down Expand Up @@ -173,7 +176,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
self.0.set_paused(false);
state.clear_backpressure();

timer!("arroyo.consumer.join.time", start.elapsed());
timer!("arroyo.consumer.join.time", start.elapsed_since_recent());

tracing::info!("Partition revocation complete.");

Expand All @@ -192,6 +195,7 @@ pub struct StreamProcessor<TPayload: Clone> {
processor_handle: ProcessorHandle,
buffered_messages: BufferedMessages<TPayload>,
metrics_buffer: metrics_buffer::MetricsBuffer,
_time_updater: coarsetime::Updater,
}

impl StreamProcessor<KafkaPayload> {
Expand Down Expand Up @@ -230,6 +234,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
},
buffered_messages: BufferedMessages::new(max_buffered_messages_per_partition),
metrics_buffer: metrics_buffer::MetricsBuffer::new(),
_time_updater: coarsetime::Updater::new(10).start().unwrap(),
}
}

Expand All @@ -254,12 +259,14 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
} else if self.message.is_none() {
// Otherwise, we need to try fetch a new message from the consumer,
// even if there is no active assignment and/or processing strategy.
let poll_start = Instant::now();
let poll_start = coarsetime::Instant::recent();
//TODO: Support errors properly
match self.consumer.poll(Some(Duration::from_secs(1))) {
Ok(msg) => {
self.metrics_buffer
.incr_timing("arroyo.consumer.poll.time", poll_start.elapsed());
self.metrics_buffer.incr_timing(
"arroyo.consumer.poll.time",
poll_start.elapsed_since_recent().into(),
);

if let Some(broker_msg) = msg {
self.message = Some(Message {
Expand Down Expand Up @@ -289,7 +296,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
Some(_) => return Err(RunError::InvalidState),
}
};
let processing_start = Instant::now();
let processing_start = coarsetime::Instant::recent();

match strategy.poll() {
Ok(None) => {}
Expand Down Expand Up @@ -321,15 +328,15 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
let Some(msg_s) = self.message.take() else {
self.metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
processing_start.elapsed(),
processing_start.elapsed_since_recent().into(),
);
return Ok(());
};

let ret = strategy.submit(msg_s);
self.metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
processing_start.elapsed(),
processing_start.elapsed_since_recent().into(),
);

match ret {
Expand Down
34 changes: 20 additions & 14 deletions rust-arroyo/src/processing/strategies/commit_offsets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::time::Duration;

use chrono::{DateTime, Duration, Utc};
use chrono::Utc;

use crate::processing::strategies::{CommitRequest, ProcessingStrategy, SubmitError};
use crate::timer;
Expand All @@ -10,35 +11,38 @@ use super::StrategyError;

pub struct CommitOffsets {
partitions: HashMap<Partition, u64>,
last_commit_time: DateTime<Utc>,
last_record_time: DateTime<Utc>,
commit_frequency: Duration,
last_commit_time: coarsetime::Instant,
last_record_time: coarsetime::Instant,
commit_frequency: coarsetime::Duration,
}

impl CommitOffsets {
pub fn new(commit_frequency: Duration) -> Self {
CommitOffsets {
partitions: Default::default(),
last_commit_time: Utc::now(),
last_record_time: Utc::now(),
commit_frequency,
last_commit_time: coarsetime::Instant::recent(),
last_record_time: coarsetime::Instant::recent(),
commit_frequency: commit_frequency.into(),
}
}

fn commit(&mut self, force: bool) -> Option<CommitRequest> {
if Utc::now() - self.last_commit_time <= self.commit_frequency && !force {
// check if there is anything to commit first, since this is much cheaper than getting the
// current time
if self.partitions.is_empty() {
return None;
}

if self.partitions.is_empty() {
if coarsetime::Instant::recent() - self.last_commit_time <= self.commit_frequency && !force
{
return None;
}

let ret = Some(CommitRequest {
positions: self.partitions.clone(),
});
self.partitions.clear();
self.last_commit_time = Utc::now();
self.last_commit_time = coarsetime::Instant::recent();
ret
}
}
Expand All @@ -49,13 +53,13 @@ impl<T> ProcessingStrategy<T> for CommitOffsets {
}

fn submit(&mut self, message: Message<T>) -> Result<(), SubmitError<T>> {
let now = Utc::now();
if now - self.last_record_time > Duration::seconds(1) {
let now = coarsetime::Instant::recent();
if now - self.last_record_time > coarsetime::Duration::from_secs(1) {
if let Some(timestamp) = message.timestamp() {
// FIXME: this used to be in seconds
timer!(
"arroyo.consumer.latency",
(now - timestamp).to_std().unwrap_or_default()
(Utc::now() - timestamp).to_std().unwrap_or_default()
);
self.last_record_time = now;
}
Expand Down Expand Up @@ -91,6 +95,8 @@ mod tests {
#[test]
fn test_commit_offsets() {
tracing_subscriber::fmt().with_test_writer().init();
let updater = coarsetime::Updater::new(10).start().unwrap();

let partition1 = Partition::new(Topic::new("noop-commit"), 0);
let partition2 = Partition::new(Topic::new("noop-commit"), 1);
let timestamp = DateTime::from(SystemTime::now());
Expand All @@ -113,7 +119,7 @@ mod tests {
}),
};

let mut noop = CommitOffsets::new(chrono::Duration::seconds(1));
let mut noop = CommitOffsets::new(Duration::from_secs(1));

let mut commit_req1 = CommitRequest {
positions: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/utils/timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Deadline {

#[inline(always)]
fn now() -> coarsetime::Instant {
coarsetime::Instant::now_without_cache_update()
coarsetime::Instant::recent()
}

impl Deadline {
Expand Down

0 comments on commit 41457bf

Please sign in to comment.