Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub mod transaction;
pub mod transform;

mod runtime;
pub use runtime::{Runtime, RuntimeHandle};
pub use runtime::{Runtime, RuntimeHandle, RuntimeTracer};

pub mod arrow;
pub(crate) mod delete_file_index;
Expand Down
159 changes: 155 additions & 4 deletions crates/iceberg/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

// This module contains the async runtime abstraction for iceberg.

mod tracer;

use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use tokio::task;
pub use tracer::RuntimeTracer;

use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -57,17 +61,23 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
#[derive(Clone)]
pub struct RuntimeHandle {
handle: tokio::runtime::Handle,
tracer: Option<Arc<dyn RuntimeTracer>>,
}

impl fmt::Debug for RuntimeHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RuntimeHandle").finish()
f.debug_struct("RuntimeHandle")
.field("has_tracer", &self.tracer.is_some())
.finish()
}
}

impl RuntimeHandle {
fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
Self { handle }
Self {
handle,
tracer: None,
}
}

/// Spawn an async task.
Expand All @@ -76,7 +86,13 @@ impl RuntimeHandle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
JoinHandle(self.handle.spawn(future))
match &self.tracer {
Some(tracer) => {
let traced = tracer::trace_future(tracer.as_ref(), future);
JoinHandle(self.handle.spawn(traced))
}
None => JoinHandle(self.handle.spawn(future)),
}
}

/// Spawn a blocking task.
Expand All @@ -85,7 +101,13 @@ impl RuntimeHandle {
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
JoinHandle(self.handle.spawn_blocking(f))
match &self.tracer {
Some(tracer) => {
let traced = tracer::trace_block(tracer.as_ref(), f);
JoinHandle(self.handle.spawn_blocking(traced))
}
None => JoinHandle(self.handle.spawn_blocking(f)),
}
}
}

Expand Down Expand Up @@ -137,6 +159,17 @@ impl Runtime {
}
}

/// Attach a [`RuntimeTracer`] to both IO and CPU handles.
///
/// Every future or blocking closure spawned through this runtime will be
/// passed through the tracer, allowing callers to inject instrumentation
/// (e.g. tracing spans, metrics) without modifying spawn sites.
pub fn with_tracer(mut self, tracer: Arc<dyn RuntimeTracer>) -> Self {
self.io.tracer = Some(tracer.clone());
self.cpu.tracer = Some(tracer);
self
}

/// Borrows the tokio runtime the caller is currently running in.
///
/// Panics if called outside a tokio runtime context. Use
Expand Down Expand Up @@ -309,4 +342,122 @@ mod tests {
let result = driver.block_on(handle);
assert!(result.is_err(), "expected error after runtime shutdown");
}

mod tracer_tests {
use std::any::Any;
use std::sync::atomic::{AtomicUsize, Ordering};

use futures::future::BoxFuture;

use super::*;

struct CountingTracer {
futures_count: AtomicUsize,
blocks_count: AtomicUsize,
}

impl CountingTracer {
fn new() -> Self {
Self {
futures_count: AtomicUsize::new(0),
blocks_count: AtomicUsize::new(0),
}
}
}

impl RuntimeTracer for CountingTracer {
fn trace_future(
&self,
fut: BoxFuture<'static, Box<dyn Any + Send>>,
) -> BoxFuture<'static, Box<dyn Any + Send>> {
self.futures_count.fetch_add(1, Ordering::Relaxed);
fut
}

fn trace_block(
&self,
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
self.blocks_count.fetch_add(1, Ordering::Relaxed);
f
}
}

#[test]
fn test_tracer_called_on_spawn() {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let tracer = Arc::new(CountingTracer::new());
let rt = Runtime::new(&tokio_rt).with_tracer(tracer.clone());

let handle = rt.io().spawn(async { 42 });
let result = tokio_rt.block_on(handle).unwrap();
assert_eq!(result, 42);
assert_eq!(tracer.futures_count.load(Ordering::Relaxed), 1);
}

#[test]
fn test_tracer_called_on_spawn_blocking() {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let tracer = Arc::new(CountingTracer::new());
let rt = Runtime::new(&tokio_rt).with_tracer(tracer.clone());

let handle = rt.cpu().spawn_blocking(|| 99);
let result = tokio_rt.block_on(handle).unwrap();
assert_eq!(result, 99);
assert_eq!(tracer.blocks_count.load(Ordering::Relaxed), 1);
}

#[test]
fn test_tracer_counts_multiple_spawns() {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let tracer = Arc::new(CountingTracer::new());
let rt = Runtime::new(&tokio_rt).with_tracer(tracer.clone());

tokio_rt.block_on(async {
rt.io().spawn(async { 1 }).await.unwrap();
rt.io().spawn(async { 2 }).await.unwrap();
rt.cpu().spawn(async { 3 }).await.unwrap();
rt.cpu().spawn_blocking(|| 4).await.unwrap();
rt.cpu().spawn_blocking(|| 5).await.unwrap();
});

assert_eq!(tracer.futures_count.load(Ordering::Relaxed), 3);
assert_eq!(tracer.blocks_count.load(Ordering::Relaxed), 2);
}

#[test]
fn test_no_tracer_passthrough() {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let rt = Runtime::new(&tokio_rt);

let handle = rt.io().spawn(async { "no tracer" });
let result = tokio_rt.block_on(handle).unwrap();
assert_eq!(result, "no tracer");
}

#[test]
fn test_with_tracer_debug_output() {
let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let tracer = Arc::new(CountingTracer::new());
let rt = Runtime::new(&tokio_rt).with_tracer(tracer);

let debug_str = format!("{:?}", rt.io());
assert!(debug_str.contains("has_tracer: true"));
}
}
}
86 changes: 86 additions & 0 deletions crates/iceberg/src/runtime/tracer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::future::Future;

use futures::FutureExt;
use futures::future::BoxFuture;

/// A trait for injecting instrumentation into spawned tasks.
///
/// Implementations can wrap futures or blocking closures with tracing spans,
/// metrics, or other observability hooks. The tracer receives type-erased
/// values and must preserve the output without modification.
pub trait RuntimeTracer: Send + Sync + 'static {
/// Wraps a type-erased future with instrumentation.
///
/// The implementation must not alter the future's output value.
fn trace_future(
&self,
fut: BoxFuture<'static, Box<dyn Any + Send>>,
) -> BoxFuture<'static, Box<dyn Any + Send>>;

/// Wraps a type-erased blocking closure with instrumentation.
///
/// The implementation must not alter the closure's return value.
fn trace_block(
&self,
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>;
}

/// Wraps a concrete future with the tracer, handling type erasure and
/// restoration internally.
pub(crate) fn trace_future<T, F>(
tracer: &dyn RuntimeTracer,
future: F,
) -> impl Future<Output = T> + Send + 'static
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let erased = async move { Box::new(future.await) as Box<dyn Any + Send> }.boxed();

tracer.trace_future(erased).map(|any_box| {
*any_box
.downcast::<T>()
.expect("RuntimeTracer must preserve the future's output type")
})
}

/// Wraps a concrete blocking closure with the tracer, handling type erasure and
/// restoration internally.
pub(crate) fn trace_block<T, F>(
tracer: &dyn RuntimeTracer,
f: F,
) -> impl FnOnce() -> T + Send + 'static
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let erased: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> =
Box::new(|| Box::new(f()) as Box<dyn Any + Send>);

let traced = tracer.trace_block(erased);

move || {
*traced()
.downcast::<T>()
.expect("RuntimeTracer must preserve the closure's return type")
}
}
Loading