diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index a4164b99..f0311dbe 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -27,6 +27,7 @@ use crate::config::Config; use crate::error::Error::UnsupportedOperation; use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; +use crate::metrics::{SCANNER_POLL_IDLE_RATIO, SCANNER_TIME_BETWEEN_POLL_MS}; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, }; @@ -277,6 +278,58 @@ struct LogScannerInner { /// Guards against subscription changes while a /// [`crate::client::RecordBatchLogReader`] is iterating. reader_active: std::sync::atomic::AtomicBool, + /// Holds the snapshot fields used by [`PollGuard`] to derive the + /// scanner poll-timing metrics. The mutex makes each individual + /// `record_poll_start` / `record_poll_end` call atomic, but the + /// start↔end pairing depends on the single-consumer contract + /// documented on [`LogScanner::poll`] and + /// [`RecordBatchLogScanner::poll`] (mirrors Java's + /// `LogScannerImpl.acquire()`). Overlapping polls on the same + /// scanner trip a `debug_assert!` in `record_poll_start`. + poll_state: Mutex, +} + +/// Snapshot state used to derive the scanner poll-timing metrics. +/// +/// The mutex makes each `record_poll_start` / `record_poll_end` call +/// atomic with respect to itself. It does **not** by itself preserve +/// start↔end pairing across overlapping `poll()` calls — that invariant +/// relies on the single-consumer contract that mirrors Java's +/// `LogScannerImpl.acquire()`. Concurrent polls on the same scanner are +/// detected by a `debug_assert!` in `record_poll_start` to surface +/// contract violations in tests; release builds favor low overhead and +/// assume callers honor the contract. +#[derive(Default, Debug)] +struct PollState { + /// Instant captured at the most recent `record_poll_start()`. `None` + /// before the first poll. + last_poll_at: Option, + /// Instant captured at the start of the in-flight poll. `None` after + /// the last `record_poll_end()`. + poll_start_at: Option, + /// Cached ms between the two most recent poll starts, used to compute + /// `poll_idle_ratio` in `record_poll_end`. + time_between_poll_ms: f64, +} + +/// Pairs `record_poll_start` with `record_poll_end`. Created +/// at the top of `poll_records` / `poll_batches`; `record_poll_end` runs on +/// drop, including the cancellation path (caller drops the future). +struct PollGuard<'a> { + inner: &'a LogScannerInner, +} + +impl<'a> PollGuard<'a> { + fn new(inner: &'a LogScannerInner) -> Self { + inner.record_poll_start(); + Self { inner } + } +} + +impl Drop for PollGuard<'_> { + fn drop(&mut self) { + self.inner.record_poll_end(); + } } impl LogScannerInner { @@ -318,6 +371,7 @@ impl LogScannerInner { )?, arrow_schema, reader_active: std::sync::atomic::AtomicBool::new(false), + poll_state: Mutex::new(PollState::default()), }) } @@ -336,6 +390,10 @@ impl LogScannerInner { } async fn poll_records(&self, timeout: Duration) -> Result { + // Pairs record_poll_start (now) with record_poll_end + // (drop). Runs on every exit, including the cancellation path + // where the caller drops this future. + let _poll_guard = PollGuard::new(self); let start = Instant::now(); let deadline = start + timeout; @@ -374,6 +432,51 @@ impl LogScannerInner { } } + /// Records the start of a `poll()` call and emits + /// `SCANNER_TIME_BETWEEN_POLL_MS`. The first poll emits `0.0`, + /// matching Java's `ScannerMetricGroup.recordPollStart` + /// (`timeMsBetweenPoll = lastPollMs != 0L ? pollStartMs - lastPollMs : 0L`). + /// + /// In debug builds, panics if a previous poll has not yet recorded + /// its end — that indicates a concurrent `poll()` on the same scanner, + /// which violates the single-consumer contract (Java enforces this + /// with `LogScannerImpl.acquire()` and throws + /// `ConcurrentModificationException`). + fn record_poll_start(&self) { + let now = Instant::now(); + let mut state = self.poll_state.lock(); + debug_assert!( + state.poll_start_at.is_none(), + "concurrent poll() detected on the same scanner; \ + LogScanner / RecordBatchLogScanner are single-consumer \ + (see LogScannerImpl.acquire() for Java parity)" + ); + let between_ms = match state.last_poll_at { + Some(prev) => now.duration_since(prev).as_secs_f64() * 1000.0, + None => 0.0, + }; + state.time_between_poll_ms = between_ms; + metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(between_ms); + state.last_poll_at = Some(now); + state.poll_start_at = Some(now); + } + + /// Computes `poll_idle_ratio = poll_time / (poll_time + between_time)`. + /// On the first poll, `between_time` is 0 so the ratio is 1.0 + /// (poll-bound). + fn record_poll_end(&self) { + let now = Instant::now(); + let mut state = self.poll_state.lock(); + let Some(start) = state.poll_start_at.take() else { + return; + }; + let poll_time_ms = now.duration_since(start).as_secs_f64() * 1000.0; + let total = poll_time_ms + state.time_between_poll_ms; + if total > 0.0 { + metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(poll_time_ms / total); + } + } + async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { self.check_no_active_reader()?; if self.is_partitioned_table { @@ -516,6 +619,7 @@ impl LogScannerInner { } async fn poll_batches(&self, timeout: Duration) -> Result> { + let _poll_guard = PollGuard::new(self); let start = Instant::now(); let deadline = start + timeout; @@ -2204,4 +2308,142 @@ mod tests { } Ok(()) } + + /// Builds a self-contained `LogScannerInner` for poll-timing tests + /// inside a `current_thread` runtime so callers can drive `PollGuard` + /// lifecycles synchronously. + fn with_test_log_scanner_inner(body: F) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build current_thread runtime"); + rt.block_on(async { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let inner = LogScannerInner::new( + &table_info, + metadata, + Arc::new(RpcClient::new()), + &Config::default(), + None, + ) + .expect("build LogScannerInner"); + body(&inner); + }); + } + + fn snapshot_gauge( + snapshotter: &metrics_util::debugging::Snapshotter, + name: &str, + ) -> Option { + use metrics_util::debugging::DebugValue; + snapshotter + .snapshot() + .into_vec() + .into_iter() + .find_map(|(key, _, _, val)| { + if key.key().name() == name { + if let DebugValue::Gauge(g) = val { + return Some(g.into_inner()); + } + } + None + }) + } + + /// Exercises the `PollGuard` lifecycle across two consecutive + /// `record_poll_start` calls. Asserts both poll-timing gauges are + /// emitted at the right moments and `record_poll_end` runs on guard + /// drop (also the cancellation-safety path, since dropping the + /// `poll()` future drops the guard). + #[test] + fn poll_guard_emits_time_between_poll_and_idle_ratio() { + use crate::metrics::{SCANNER_POLL_IDLE_RATIO, SCANNER_TIME_BETWEEN_POLL_MS}; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + with_test_log_scanner_inner(|inner| { + // First poll: emits time_between_poll_ms=0 (Java parity: + // ScannerMetricGroup.recordPollStart emits 0 when there is + // no previous poll). Idle ratio is also emitted as 1.0 + // on drop (poll_time / (poll_time + 0) = 1.0). + { + let _g = PollGuard::new(inner); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + + // Brief gap so time_between_poll_ms is observably > 0. + std::thread::sleep(std::time::Duration::from_millis(5)); + + // Second poll: refreshes both time_between_poll_ms (>0) + // and a fresh idle ratio. + { + let _g = PollGuard::new(inner); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + }); + }); + + let between = snapshot_gauge(&snapshotter, SCANNER_TIME_BETWEEN_POLL_MS) + .expect("time_between_poll_ms must be emitted on every poll"); + assert!( + between > 0.0, + "second-poll time_between_poll_ms must be positive, got {between}" + ); + + let ratio = snapshot_gauge(&snapshotter, SCANNER_POLL_IDLE_RATIO) + .expect("poll_idle_ratio must be emitted on poll end"); + assert!( + (0.0..=1.0).contains(&ratio), + "poll_idle_ratio must be in [0, 1], got {ratio}" + ); + } + + /// Java parity: `ScannerMetricGroup.recordPollStart` emits + /// `timeMsBetweenPoll = 0` on the very first poll. The Rust gauge + /// must do the same so dashboards see the metric series from poll #1. + #[test] + fn time_between_poll_ms_emits_zero_on_first_poll() { + use crate::metrics::SCANNER_TIME_BETWEEN_POLL_MS; + use metrics_util::debugging::DebuggingRecorder; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + with_test_log_scanner_inner(|inner| { + let _g = PollGuard::new(inner); + // Drop at end of scope completes the poll; the value of + // SCANNER_TIME_BETWEEN_POLL_MS was emitted at start, not end. + }); + }); + + let between = snapshot_gauge(&snapshotter, SCANNER_TIME_BETWEEN_POLL_MS) + .expect("time_between_poll_ms must be emitted on the first poll"); + assert_eq!( + between, 0.0, + "first-poll time_between_poll_ms must be 0.0 (Java parity), got {between}" + ); + } + + /// Pins the single-consumer contract: overlapping `PollGuard`s on the + /// same scanner trip the `debug_assert!` in `record_poll_start`. + /// Release builds skip the check, so the test is gated on + /// `debug_assertions`. + #[cfg(debug_assertions)] + #[test] + #[should_panic(expected = "concurrent poll() detected")] + fn overlapping_polls_panic_in_debug_builds() { + with_test_log_scanner_inner(|inner| { + let _g1 = PollGuard::new(inner); + // _g1 has not been dropped → poll_start_at is still Some, + // so the second start must panic. + let _g2 = PollGuard::new(inner); + }); + } } diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 756e2db5..040dd162 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -49,6 +49,34 @@ pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms"; pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight"; +// --------------------------------------------------------------------------- +// Scanner poll-timing metrics +// +// Java reference: ScannerMetricGroup.java, LogScannerImpl.java +// +// These track consumer liveness and processing efficiency at the `poll()` +// boundary. Java records via `volatile long` fields read by gauge suppliers; +// Rust snapshots the values at poll start/end. +// +// Java's `lastPollSecondsAgo` gauge is intentionally NOT ported. Java +// implements it as a gauge supplier evaluated at scrape time, which the +// `metrics` crate facade has no equivalent for. A snapshot-at-poll-start +// port would just duplicate `time_between_poll_ms / 1000` and would not +// advance while a consumer is hung — defeating the metric's purpose +// (detecting a stuck consumer). Revisit if the `metrics` crate gains a +// supplier abstraction or we add a background liveness task. +// --------------------------------------------------------------------------- + +/// Gauge: milliseconds between the start of consecutive `poll()` calls. A +/// large value usually means the consumer's downstream processing is slow. +pub const SCANNER_TIME_BETWEEN_POLL_MS: &str = "fluss.client.scanner.time_between_poll_ms"; + +/// Gauge: fraction of wall-clock time spent inside `poll()` — +/// `poll_time_ms / (poll_time_ms + time_between_poll_ms)`. A value near 1.0 +/// means the scanner is starved for data; a low value means the consumer is +/// the bottleneck. +pub const SCANNER_POLL_IDLE_RATIO: &str = "fluss.client.scanner.poll_idle_ratio"; + /// Returns a label value for reportable API keys, matching Java's /// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`, /// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to @@ -267,4 +295,24 @@ mod tests { assert_eq!(counter_by_api_key.get("produce_log"), Some(&5)); assert_eq!(counter_by_api_key.get("fetch_log"), Some(&3)); } + + #[test] + fn scanner_poll_timing_metrics_emit_correctly() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + metrics::gauge!(SCANNER_TIME_BETWEEN_POLL_MS).set(200.0); + metrics::gauge!(SCANNER_POLL_IDLE_RATIO).set(0.8); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + assert_eq!( + find_gauge!(entries, SCANNER_TIME_BETWEEN_POLL_MS), + Some(200.0) + ); + assert_eq!(find_gauge!(entries, SCANNER_POLL_IDLE_RATIO), Some(0.8)); + } } diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 5d3068b5..90fea1bf 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -139,6 +139,8 @@ Complete API reference for the Fluss Rust client. ## `LogScanner` +Single-consumer: do not call `poll` concurrently on the same scanner (e.g. from `tokio::join!` or two tasks sharing an `Arc`). Mirrors Java's `LogScannerImpl.acquire()` guard. Debug builds surface overlapping calls via a `debug_assert!`; release builds skip the check for performance and produce skewed poll-timing metrics (`fluss.client.scanner.time_between_poll_ms`, `fluss.client.scanner.poll_idle_ratio`) if the contract is violated. + | Method | Description | |-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------| | `async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>` | Subscribe to a bucket | @@ -151,7 +153,7 @@ Complete API reference for the Fluss Rust client. ## `RecordBatchLogScanner` -Overlapping `poll` calls on clones that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, are not supported. Use one active polling/consumption call at a time per underlying scanner state. +Single-consumer: overlapping `poll` calls on handles that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, are not supported — use one active polling/consumption call at a time per underlying scanner state. Mirrors Java's `LogScannerImpl.acquire()` guard. Debug builds surface overlapping calls via a `debug_assert!`; release builds skip the check for performance and produce skewed poll-timing metrics (`fluss.client.scanner.time_between_poll_ms`, `fluss.client.scanner.poll_idle_ratio`) if the contract is violated. | Method | Description | |-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------|