Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,44 @@ impl ExecutionPlan for FFIReaderExec {
}
}

/// Reads Arrow data from a Java-side exporter via FFI (Foreign Function
/// Interface).
///
/// This function establishes a bridge between Java and Rust to read Arrow data
/// efficiently using the Arrow C Data Interface. It continuously fetches record
/// batches from the Java exporter, converts them from FFI format to native Rust
/// Arrow format, and streams them back to the execution engine.
///
/// # Arguments
///
/// * `schema` - The Arrow schema reference defining the structure of the data
/// * `exporter` - A JNI global reference to the Java-side Arrow FFI exporter
/// object
/// * `exec_ctx` - The execution context for metrics collection and stream
/// management
///
/// # Returns
///
/// Returns a `Result<SendableRecordBatchStream>` containing a stream of record
/// batches on success, or an error if the FFI operation fails.
///
/// # Behavior
///
/// - Continuously polls the Java exporter for new batches until no more data is
/// available
/// - Converts FFI Arrow arrays to native Rust Arrow data structures
/// - Tracks memory usage and output row counts via metrics
/// - Automatically closes the Java exporter resource when the stream ends or
/// fails
/// - Processes batches asynchronously using tokio's blocking task spawning
///
/// # FFI Safety
///
/// This function uses unsafe operations to convert FFI Arrow data. The safety
/// is ensured by:
/// - Proper FFI Arrow array initialization and cleanup
/// - Correct data type matching between Java and Rust sides
/// - Automatic resource management through RAII patterns
fn read_ffi(
schema: SchemaRef,
exporter: GlobalRef,
Expand Down
26 changes: 26 additions & 0 deletions native-engine/datafusion-ext-plans/src/filter_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,32 @@ impl ExecuteWithColumnPruning for FilterExec {
}
}

/// Executes filtering operation on the input record batch stream.
///
/// This function applies the provided predicates to filter records from the
/// input stream. It uses a cached expression evaluator for efficient predicate
/// evaluation and returns a filtered stream containing only records that
/// satisfy all predicates.
///
/// # Arguments
///
/// * `input` - The input record batch stream to be filtered
/// * `predicates` - A vector of physical expressions representing filter
/// predicates
/// * `exec_ctx` - The execution context containing metrics and runtime
/// information
///
/// # Returns
///
/// Returns a `Result<SendableRecordBatchStream>` containing the filtered record
/// batch stream on success, or an error if the filtering operation fails.
///
/// # Behavior
///
/// - Evaluates all predicates against each record batch in the input stream
/// - Records metrics for computation time and output record counts
/// - Processes batches asynchronously while maintaining order
/// - Only includes records that satisfy all provided predicates
fn execute_filter(
mut input: SendableRecordBatchStream,
predicates: Vec<PhysicalExprRef>,
Expand Down