diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs index f2d5d602f..1d579234b 100644 --- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs @@ -141,6 +141,44 @@ impl ExecutionPlan for FFIReaderExec { } } +/// Reads Arrow data from a Java-side exporter via FFI (Foreign Function +/// Interface). +/// +/// This function establishes a bridge between Java and Rust to read Arrow data +/// efficiently using the Arrow C Data Interface. It continuously fetches record +/// batches from the Java exporter, converts them from FFI format to native Rust +/// Arrow format, and streams them back to the execution engine. +/// +/// # Arguments +/// +/// * `schema` - The Arrow schema reference defining the structure of the data +/// * `exporter` - A JNI global reference to the Java-side Arrow FFI exporter +/// object +/// * `exec_ctx` - The execution context for metrics collection and stream +/// management +/// +/// # Returns +/// +/// Returns a `Result` containing a stream of record +/// batches on success, or an error if the FFI operation fails. +/// +/// # Behavior +/// +/// - Continuously polls the Java exporter for new batches until no more data is +/// available +/// - Converts FFI Arrow arrays to native Rust Arrow data structures +/// - Tracks memory usage and output row counts via metrics +/// - Automatically closes the Java exporter resource when the stream ends or +/// fails +/// - Processes batches asynchronously using tokio's blocking task spawning +/// +/// # FFI Safety +/// +/// This function uses unsafe operations to convert FFI Arrow data. The safety +/// is ensured by: +/// - Proper FFI Arrow array initialization and cleanup +/// - Correct data type matching between Java and Rust sides +/// - Automatic resource management through RAII patterns fn read_ffi( schema: SchemaRef, exporter: GlobalRef, diff --git a/native-engine/datafusion-ext-plans/src/filter_exec.rs b/native-engine/datafusion-ext-plans/src/filter_exec.rs index fa6489380..6f76a7c32 100644 --- a/native-engine/datafusion-ext-plans/src/filter_exec.rs +++ b/native-engine/datafusion-ext-plans/src/filter_exec.rs @@ -171,6 +171,32 @@ impl ExecuteWithColumnPruning for FilterExec { } } +/// Executes filtering operation on the input record batch stream. +/// +/// This function applies the provided predicates to filter records from the +/// input stream. It uses a cached expression evaluator for efficient predicate +/// evaluation and returns a filtered stream containing only records that +/// satisfy all predicates. +/// +/// # Arguments +/// +/// * `input` - The input record batch stream to be filtered +/// * `predicates` - A vector of physical expressions representing filter +/// predicates +/// * `exec_ctx` - The execution context containing metrics and runtime +/// information +/// +/// # Returns +/// +/// Returns a `Result` containing the filtered record +/// batch stream on success, or an error if the filtering operation fails. +/// +/// # Behavior +/// +/// - Evaluates all predicates against each record batch in the input stream +/// - Records metrics for computation time and output record counts +/// - Processes batches asynchronously while maintaining order +/// - Only includes records that satisfy all provided predicates fn execute_filter( mut input: SendableRecordBatchStream, predicates: Vec,