From cce8b535553ce92d4028a8f9b3b6a12680f2d872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 22 Nov 2025 17:01:17 +0800 Subject: [PATCH 01/21] test example_table --- crates/examples/Cargo.toml | 7 ++++++- crates/examples/src/example_table.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 82d864f..1cf3079 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,8 +27,13 @@ version = { workspace = true } fluss = { workspace = true } tokio = { workspace = true } clap = { workspace = true} +env_logger = "0.11" [[example]] name = "example-table" -path = "src/example_table.rs" \ No newline at end of file +path = "src/example_table.rs" + +[[example]] +name = "projection-example" +path = "src/projection_example.rs" \ No newline at end of file diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 3eb8dd8..ead257a 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -27,7 +27,7 @@ use tokio::try_join; #[tokio::main] pub async fn main() -> Result<()> { let mut config = Config::parse(); - config.bootstrap_server = Some("127.0.0.1:56405".to_string()); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); let conn = FlussConnection::new(config).await?; From 590e476370c62dfdcbe8a1049f4d02b26f9dbd94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 22 Nov 2025 21:18:48 +0800 Subject: [PATCH 02/21] projection example --- Cargo.toml | 2 +- crates/examples/Cargo.toml | 6 +- crates/examples/src/projection_example.rs | 145 ++++++++++++ .../src/projection_example_no_compression.rs | 130 ++++++++++ crates/fluss/src/client/table/scanner.rs | 72 +++++- crates/fluss/src/record/arrow.rs | 224 ++++++++++++++++-- 6 files changed, 554 insertions(+), 25 deletions(-) create mode 100644 crates/examples/src/projection_example.rs create mode 100644 crates/examples/src/projection_example_no_compression.rs diff --git a/Cargo.toml b/Cargo.toml index 54436ac..beb6c88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,5 +34,5 @@ members = ["crates/fluss", "crates/examples", "bindings/python"] fluss = { version = "0.1.0", path = "./crates/fluss" } tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } -arrow = "55.1.0" +arrow = { version = "55.1.0", features = ["ipc_compression"] } chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] } diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 1cf3079..5bc6060 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -36,4 +36,8 @@ path = "src/example_table.rs" [[example]] name = "projection-example" -path = "src/projection_example.rs" \ No newline at end of file +path = "src/projection_example.rs" + +[[example]] +name = "projection-example-no-compression" +path = "src/projection_example_no_compression.rs" \ No newline at end of file diff --git a/crates/examples/src/projection_example.rs b/crates/examples/src/projection_example.rs new file mode 100644 index 0000000..f16e5ad --- /dev/null +++ b/crates/examples/src/projection_example.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; +use std::time::Duration; +use tokio::try_join; + +#[tokio::main] +pub async fn main() -> Result<()> { + env_logger::init(); + + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::int()) + .column("email", DataTypes::string()) + .column("phone", DataTypes::string()) + .build()?, + ) + .log_format(LogFormat::ARROW) + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "projection_test".to_owned()); + + let admin = conn.get_admin().await?; + + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + + let table_info = admin.get_table(&table_path).await?; + println!("Created table:\n{}\n", table_info); + + let table = conn.get_table(&table_path).await?; + let append_writer = table.new_append()?.create_writer(); + + println!("Writing test data..."); + let mut row = GenericRow::new(); + row.set_field(0, 1); + row.set_field(1, "Alice"); + row.set_field(2, 25); + row.set_field(3, "alice@example.com"); + row.set_field(4, "123-456-7890"); + let f1 = append_writer.append(row); + + row = GenericRow::new(); + row.set_field(0, 2); + row.set_field(1, "Bob"); + row.set_field(2, 30); + row.set_field(3, "bob@example.com"); + row.set_field(4, "098-765-4321"); + let f2 = append_writer.append(row); + + try_join!(f1, f2, append_writer.flush())?; + println!("Data written successfully\n"); + + println!("=== Test 1: Full scan (no projection) ==="); + let log_scanner = table.new_scan().create_log_scanner(); + log_scanner.subscribe(0, 0).await?; + + let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; + println!("Fetched {} records", scan_records.count()); + for record in scan_records { + let row = record.row(); + println!( + "Record @{}: id={}, name={}, age={}, email={}, phone={}", + record.offset(), + row.get_int(0), + row.get_string(1), + row.get_int(2), + row.get_string(3), + row.get_string(4) + ); + } + println!(); + + println!("=== Test 2: Project by column indices (id, name, age) ==="); + let log_scanner = table.new_scan() + .project(&[0, 1, 2])? + .create_log_scanner(); + log_scanner.subscribe(0, 0).await?; + + let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; + println!("Fetched {} records with projection", scan_records.count()); + for record in scan_records { + let row = record.row(); + println!( + "Record @{}: id={}, name={}, age={}", + record.offset(), + row.get_int(0), + row.get_string(1), + row.get_int(2) + ); + } + println!(); + + println!("=== Test 3: Project by column names (name, phone) ==="); + let log_scanner = table.new_scan() + .project_by_name(&["name", "phone"])? + .create_log_scanner(); + log_scanner.subscribe(0, 0).await?; + + let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; + println!("Fetched {} records with projection", scan_records.count()); + for record in scan_records { + let row = record.row(); + println!( + "Record @{}: name={}, phone={}", + record.offset(), + row.get_string(0), + row.get_string(1) + ); + } + println!(); + + println!("All tests completed successfully!"); + Ok(()) +} + diff --git a/crates/examples/src/projection_example_no_compression.rs b/crates/examples/src/projection_example_no_compression.rs new file mode 100644 index 0000000..29bbc8c --- /dev/null +++ b/crates/examples/src/projection_example_no_compression.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; +use std::time::Duration; + +#[tokio::main] +pub async fn main() -> Result<()> { + env_logger::init(); + + let mut config = Config::parse(); + config.bootstrap_server = Some("127.0.0.1:9123".to_string()); + + let conn = FlussConnection::new(config).await?; + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::int()) + .column("email", DataTypes::string()) + .column("phone", DataTypes::string()) + .build()?, + ) + .log_format(LogFormat::ARROW) + .property("table.log.arrow.compression.type", "NONE") + .build()?; + + let table_path = TablePath::new("fluss".to_owned(), "projection_test_no_compression_rust".to_owned()); + + let admin = conn.get_admin().await?; + + // Drop table if exists, then create + let _ = admin.drop_table(&table_path, true).await; + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + + let table_info = admin.get_table(&table_path).await?; + println!("Created table:\n{}\n", table_info); + + let table = conn.get_table(&table_path).await?; + let append_writer = table.new_append()?.create_writer(); + + println!("Writing test data..."); + for i in 0..10 { + let mut row = GenericRow::new(); + row.set_field(0, i); + let name = format!("User{}", i); + row.set_field(1, name.as_str()); + row.set_field(2, 20 + i); + let email = format!("user{}@example.com", i); + row.set_field(3, email.as_str()); + let phone = format!("123-456-{:04}", i); + row.set_field(4, phone.as_str()); + append_writer.append(row).await?; + } + append_writer.flush().await?; + println!("Data written successfully\n"); + + println!("=== Test 1: Full scan (no projection) ==="); + let log_scanner = table.new_scan().create_log_scanner(); + log_scanner.subscribe(0, 0).await?; + + let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; + println!("Fetched {} records", scan_records.count()); + for record in scan_records { + let row = record.row(); + println!( + "Record @{}: id={}, name={}, age={}, email={}, phone={}", + record.offset(), + row.get_int(0), + row.get_string(1), + row.get_int(2), + row.get_string(3), + row.get_string(4) + ); + } + println!(); + + println!("=== Test 2: Project by column indices (id, name, age) - NO COMPRESSION ==="); + println!("This test verifies projection without compression to isolate compression-related issues."); + let log_scanner = table.new_scan() + .project(&[0, 1, 2])? + .create_log_scanner(); + log_scanner.subscribe(0, 0).await?; + + let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; + println!("Fetched {} records with projection", scan_records.count()); + for record in scan_records { + let row = record.row(); + println!( + "Record @{}: id={}, name={}, age={}", + record.offset(), + row.get_int(0), + row.get_string(1), + row.get_int(2) + ); + } + println!(); + + println!("✓ Rust client projection test (NO COMPRESSION) PASSED"); + println!(" This indicates projection works correctly without compression."); + println!(" If compression version fails, the issue is in compression handling."); + println!(); + + println!("All tests completed successfully!"); + Ok(()) +} + diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e1ab59f..b2af2b7 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -17,7 +17,7 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; @@ -39,6 +39,7 @@ pub struct TableScan<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, + projected_fields: Option>, } impl<'a> TableScan<'a> { @@ -47,14 +48,46 @@ impl<'a> TableScan<'a> { conn, table_info, metadata, + projected_fields: None, } } - pub fn create_log_scanner(&self) -> LogScanner { + pub fn project(mut self, column_indices: &[usize]) -> Result { + eprintln!("DEBUG TableScan::project - column_indices: {:?}", column_indices); + let field_count = self.table_info.row_type().fields().len(); + for &idx in column_indices { + if idx >= field_count { + return Err(Error::IllegalArgument(format!("Column index {} out of range (max: {})", idx, field_count - 1))); + } + } + self.projected_fields = Some(column_indices.to_vec()); + eprintln!("DEBUG TableScan::project - after setting: {:?}", self.projected_fields); + Ok(self) + } + + pub fn project_by_name(mut self, column_names: &[&str]) -> Result { + let row_type = self.table_info.row_type(); + let mut indices = Vec::new(); + + for name in column_names { + let idx = row_type.fields() + .iter() + .position(|f| f.name() == *name) + .ok_or_else(|| Error::IllegalArgument(format!("Column '{}' not found", name)))?; + indices.push(idx); + } + + self.projected_fields = Some(indices); + Ok(self) + } + + pub fn create_log_scanner(self) -> LogScanner { + eprintln!("DEBUG TableScan::create_log_scanner - projected_fields: {:?}", self.projected_fields); LogScanner::new( &self.table_info, self.metadata.clone(), self.conn.get_connections(), + self.projected_fields, ) } } @@ -72,7 +105,9 @@ impl LogScanner { table_info: &TableInfo, metadata: Arc, connections: Arc, + projected_fields: Option>, ) -> Self { + eprintln!("DEBUG LogScanner::new - projected_fields: {:?}", projected_fields); let log_scanner_status = Arc::new(LogScannerStatus::new()); Self { table_path: table_info.table_path.clone(), @@ -84,6 +119,7 @@ impl LogScanner { connections.clone(), metadata.clone(), log_scanner_status.clone(), + projected_fields, ), } } @@ -114,6 +150,7 @@ struct LogFetcher { table_info: TableInfo, metadata: Arc, log_scanner_status: Arc, + projected_fields: Option>, } impl LogFetcher { @@ -122,17 +159,21 @@ impl LogFetcher { conns: Arc, metadata: Arc, log_scanner_status: Arc, + projected_fields: Option>, ) -> Self { + eprintln!("DEBUG LogFetcher::new - projected_fields: {:?}", projected_fields); LogFetcher { table_path: table_info.table_path.clone(), conns: conns.clone(), table_info: table_info.clone(), metadata: metadata.clone(), log_scanner_status: log_scanner_status.clone(), + projected_fields, } } async fn send_fetches_and_collect(&self) -> Result>> { + eprintln!("DEBUG send_fetches_and_collect - self.projected_fields: {:?}", self.projected_fields); let fetch_request = self.prepare_fetch_log_requests().await; let mut result: HashMap> = HashMap::new(); for (leader, fetch_request) in fetch_request { @@ -149,17 +190,26 @@ impl LogFetcher { for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; - let arrow_schema = to_arrow_schema(self.table_info.get_row_type()); + + let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); + eprintln!("DEBUG Creating ReadContext with projected_fields: {:?}", self.projected_fields); + let read_context = ReadContext::with_projection( + full_arrow_schema, + self.projected_fields.clone(), + ); + for fetch_log_for_bucket in fetch_log_for_buckets { let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); if fetch_log_for_bucket.records.is_some() { let data = fetch_log_for_bucket.records.unwrap(); + eprintln!("DEBUG: Server returned data size: {} bytes, projected_fields: {:?}", data.len(), self.projected_fields); + eprintln!("DEBUG: First 64 bytes of server data: {:?}", &data[..data.len().min(64)]); for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); fetch_records - .extend(log_record.records(ReadContext::new(arrow_schema.clone()))); + .extend(log_record.records(read_context.clone())); self.log_scanner_status .update_offset(&table_bucket, last_offset + 1); } @@ -209,13 +259,23 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { + let (projection_enabled, projected_fields) = if let Some(fields) = &self.projected_fields { + if fields.is_empty() { + (false, vec![]) + } else { + (true, fields.iter().map(|&i| i as i32).collect()) + } + } else { + (false, vec![]) + }; + fetch_log_req_for_buckets .into_iter() .map(|(leader_id, feq_for_buckets)| { let req_for_table = PbFetchLogReqForTable { table_id: table_id.unwrap(), - projection_pushdown_enabled: false, - projected_fields: vec![], + projection_pushdown_enabled: projection_enabled, + projected_fields: projected_fields.clone(), buckets_req: feq_for_buckets, }; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 487f50c..a8affda 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -27,7 +27,12 @@ use arrow::array::{ }; use arrow::{ array::RecordBatch, - ipc::{reader::StreamReader, writer::StreamWriter}, + buffer::Buffer, + ipc::{ + reader::{read_record_batch, StreamReader}, + writer::StreamWriter, + root_as_message, MessageHeader, + }, }; use arrow_schema::SchemaRef; use arrow_schema::{DataType as ArrowDataType, Field}; @@ -478,25 +483,183 @@ impl<'a> LogRecordBatch<'a> { return LogRecordIterator::empty(); } - // get arrow_metadata - let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap(); - // arrow_batch_data + // Reference: Java client uses deserializeRecordBatch which reads RecordBatch message directly + // with a pre-configured VectorSchemaRoot (schema). In Rust, we use read_record_batch + // which is the equivalent API - it reads RecordBatch message directly with a pre-configured schema. + + // Server returns RecordBatch message (without Schema message) + // Format: [continuation: 4 bytes][metadata_size: 4 bytes][RecordBatch metadata][body] let data = &self.data[RECORDS_OFFSET..]; - - // need to combine arrow_metadata_bytes + arrow_batch_data - let cursor = Cursor::new([&arrow_metadata_bytes, data].concat()); - let mut stream_reader = StreamReader::try_new(cursor, None).unwrap(); - - let mut record_batch = None; - if let Some(bath) = stream_reader.next() { - record_batch = Some(bath.unwrap()); + + if data.len() < 8 { + eprintln!("Data too short: {} bytes", data.len()); + return LogRecordIterator::empty(); } - - if record_batch.is_none() { + + // Parse continuation marker and metadata size + let continuation = u32::from_le_bytes([data[0], data[1], data[2], data[3]]); + let metadata_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize; + + if continuation != 0xFFFFFFFF { + eprintln!("Invalid continuation marker: 0x{:08X}", continuation); + return LogRecordIterator::empty(); + } + + if data.len() < 8 + metadata_size { + eprintln!("Data too short for metadata: {} < {}", data.len(), 8 + metadata_size); + return LogRecordIterator::empty(); + } + + // Parse RecordBatch metadata + let metadata_bytes = &data[8..8 + metadata_size]; + let message = match root_as_message(metadata_bytes) { + Ok(msg) => msg, + Err(e) => { + eprintln!("Failed to parse RecordBatch metadata: {:?}", e); + return LogRecordIterator::empty(); + } + }; + + // Verify it's a RecordBatch message + if message.header_type() != MessageHeader::RecordBatch { + eprintln!("Expected RecordBatch message, got: {:?}", message.header_type()); return LogRecordIterator::empty(); } + + let batch_metadata = match message.header_as_record_batch() { + Some(batch) => batch, + None => { + eprintln!("Failed to get RecordBatch from message"); + return LogRecordIterator::empty(); + } + }; + + // Extract body data + let body_start = 8 + metadata_size; + let body_data = &data[body_start..]; + let body_buffer = Buffer::from(body_data); + + // Debug: Check buffer information from metadata and compare with actual data + if let Some(buffers) = batch_metadata.buffers() { + eprintln!("DEBUG: ========== Buffer Analysis =========="); + eprintln!("DEBUG: RecordBatch has {} buffers, body size: {} bytes", buffers.len(), body_data.len()); + + let has_compression = batch_metadata.compression().is_some(); + if has_compression { + if let Some(compression) = batch_metadata.compression() { + eprintln!("DEBUG: RecordBatch uses compression: {:?}", compression.codec()); + } + } else { + eprintln!("DEBUG: RecordBatch has no compression"); + } + + // Collect all buffers to a vector for easier access + let buffers_vec: Vec<_> = buffers.iter().collect(); + + for (i, buf) in buffers_vec.iter().enumerate() { + let offset = buf.offset() as usize; + let length = buf.length() as usize; + + // Calculate actual buffer size (from current offset to next buffer offset or end of body) + let actual_size = if i + 1 < buffers_vec.len() { + let next_offset = buffers_vec[i + 1].offset() as usize; + next_offset - offset + } else { + body_data.len() - offset + }; + + eprintln!("DEBUG: --- Buffer {} ---", i); + eprintln!("DEBUG: Metadata: offset={}, length={}", offset, length); + eprintln!("DEBUG: Actual size (calculated): {} bytes", actual_size); + + if offset + length > body_data.len() { + eprintln!("DEBUG: ERROR - Buffer extends beyond body data! (offset + length = {} > {})", + offset + length, body_data.len()); + } + + if actual_size != length { + eprintln!("DEBUG: WARNING - Metadata length ({}) != Actual size ({})", length, actual_size); + } + + // Check if we can read the buffer data + if offset < body_data.len() { + let available = body_data.len() - offset; + let read_len = length.min(available).min(32); + eprintln!("DEBUG: Available data: {} bytes (requested: {} bytes)", available, length); + + if read_len > 0 { + eprintln!("DEBUG: First {} bytes: {:?}", read_len, &body_data[offset..offset + read_len]); + } + + // For compressed buffers, check if we can read the uncompressed length header + if has_compression { + if length > 0 && length < 8 { + eprintln!("DEBUG: ERROR - Compressed buffer length ({}) < 8 bytes (uncompressed length header required)", length); + } else if length >= 8 { + // Try to read uncompressed length + if available >= 8 { + let uncompressed_len_bytes = &body_data[offset..offset + 8]; + let uncompressed_len = i64::from_le_bytes([ + uncompressed_len_bytes[0], + uncompressed_len_bytes[1], + uncompressed_len_bytes[2], + uncompressed_len_bytes[3], + uncompressed_len_bytes[4], + uncompressed_len_bytes[5], + uncompressed_len_bytes[6], + uncompressed_len_bytes[7], + ]); + eprintln!("DEBUG: Uncompressed length (from header): {} bytes", uncompressed_len); + + if uncompressed_len == -1 { + eprintln!("DEBUG: Note: Buffer is not compressed (uncompressed length = -1)"); + } else if uncompressed_len == 0 { + eprintln!("DEBUG: Note: Buffer is empty (uncompressed length = 0)"); + } + } + } + } + } else { + eprintln!("DEBUG: ERROR - Buffer offset ({}) >= body data size ({})", offset, body_data.len()); + } + } + eprintln!("DEBUG: ====================================="); + } + + // Determine the schema to use (projected or full) + let schema_to_use = if let Some(projected_fields) = read_context.projected_fields() { + // Server has already projected the data, so use projected schema + let projected_schema = arrow_schema::Schema::new( + projected_fields + .iter() + .map(|&idx| read_context.arrow_schema.field(idx).clone()) + .collect::>(), + ); + Arc::new(projected_schema) + } else { + read_context.arrow_schema.clone() + }; + + // Read RecordBatch using read_record_batch (equivalent to Java's deserializeRecordBatch) + // This automatically handles compression (ipc_compression feature enabled) + let record_batch = match read_record_batch( + &body_buffer, + batch_metadata, + schema_to_use, + &std::collections::HashMap::new(), // dictionaries (empty for now) + None, // projection (already handled by server, so None here) + &message.version(), + ) { + Ok(batch) => batch, + Err(e) => { + eprintln!("Failed to read RecordBatch: {}", e); + eprintln!("Error details: {:?}", e); + return LogRecordIterator::empty(); + } + }; - let arrow_reader = ArrowReader::new(Arc::new(record_batch.unwrap())); + let record_batch = Arc::new(record_batch); + let arrow_reader = ArrowReader::new(record_batch); LogRecordIterator::Arrow(ArrowLogRecordIterator { reader: arrow_reader, base_offset: self.base_log_offset(), @@ -554,20 +717,47 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { } } +#[derive(Clone)] pub struct ReadContext { arrow_schema: SchemaRef, + projected_fields: Option>, } impl ReadContext { pub fn new(arrow_schema: SchemaRef) -> ReadContext { - ReadContext { arrow_schema } + ReadContext { + arrow_schema, + projected_fields: None, + } + } + + pub fn with_projection(arrow_schema: SchemaRef, projected_fields: Option>) -> ReadContext { + ReadContext { + arrow_schema, + projected_fields, + } } pub fn to_arrow_metadata(&self) -> Result> { let mut arrow_schema_bytes = vec![]; - let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, &self.arrow_schema)?; + let schema_to_use = if let Some(projected_fields) = &self.projected_fields { + let projected_schema = arrow_schema::Schema::new( + projected_fields + .iter() + .map(|&idx| self.arrow_schema.field(idx).clone()) + .collect::>(), + ); + Arc::new(projected_schema) + } else { + self.arrow_schema.clone() + }; + let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, &schema_to_use)?; Ok(arrow_schema_bytes) } + + pub fn projected_fields(&self) -> Option<&[usize]> { + self.projected_fields.as_deref() + } } pub enum LogRecordIterator { From b13203110b915a3b6b2bb2670af028f37ca61908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 22 Nov 2025 21:50:06 +0800 Subject: [PATCH 03/21] chore refactor --- crates/fluss/src/client/table/scanner.rs | 9 - crates/fluss/src/record/arrow.rs | 220 +++++++---------------- 2 files changed, 64 insertions(+), 165 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index b2af2b7..8d450a6 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -53,7 +53,6 @@ impl<'a> TableScan<'a> { } pub fn project(mut self, column_indices: &[usize]) -> Result { - eprintln!("DEBUG TableScan::project - column_indices: {:?}", column_indices); let field_count = self.table_info.row_type().fields().len(); for &idx in column_indices { if idx >= field_count { @@ -61,7 +60,6 @@ impl<'a> TableScan<'a> { } } self.projected_fields = Some(column_indices.to_vec()); - eprintln!("DEBUG TableScan::project - after setting: {:?}", self.projected_fields); Ok(self) } @@ -82,7 +80,6 @@ impl<'a> TableScan<'a> { } pub fn create_log_scanner(self) -> LogScanner { - eprintln!("DEBUG TableScan::create_log_scanner - projected_fields: {:?}", self.projected_fields); LogScanner::new( &self.table_info, self.metadata.clone(), @@ -107,7 +104,6 @@ impl LogScanner { connections: Arc, projected_fields: Option>, ) -> Self { - eprintln!("DEBUG LogScanner::new - projected_fields: {:?}", projected_fields); let log_scanner_status = Arc::new(LogScannerStatus::new()); Self { table_path: table_info.table_path.clone(), @@ -161,7 +157,6 @@ impl LogFetcher { log_scanner_status: Arc, projected_fields: Option>, ) -> Self { - eprintln!("DEBUG LogFetcher::new - projected_fields: {:?}", projected_fields); LogFetcher { table_path: table_info.table_path.clone(), conns: conns.clone(), @@ -173,7 +168,6 @@ impl LogFetcher { } async fn send_fetches_and_collect(&self) -> Result>> { - eprintln!("DEBUG send_fetches_and_collect - self.projected_fields: {:?}", self.projected_fields); let fetch_request = self.prepare_fetch_log_requests().await; let mut result: HashMap> = HashMap::new(); for (leader, fetch_request) in fetch_request { @@ -192,7 +186,6 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); - eprintln!("DEBUG Creating ReadContext with projected_fields: {:?}", self.projected_fields); let read_context = ReadContext::with_projection( full_arrow_schema, self.projected_fields.clone(), @@ -204,8 +197,6 @@ impl LogFetcher { let table_bucket = TableBucket::new(table_id, bucket); if fetch_log_for_bucket.records.is_some() { let data = fetch_log_for_bucket.records.unwrap(); - eprintln!("DEBUG: Server returned data size: {} bytes, projected_fields: {:?}", data.len(), self.projected_fields); - eprintln!("DEBUG: First 64 bytes of server data: {:?}", &data[..data.len().min(64)]); for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); fetch_records diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index a8affda..f70bf04 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -20,6 +20,7 @@ use crate::error::Result; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; +use tracing::error; use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder, UInt8Builder, @@ -31,7 +32,7 @@ use arrow::{ ipc::{ reader::{read_record_batch, StreamReader}, writer::StreamWriter, - root_as_message, MessageHeader, + root_as_message, }, }; use arrow_schema::SchemaRef; @@ -478,157 +479,17 @@ impl<'a> LogRecordBatch<'a> { } pub fn records(&self, read_context: ReadContext) -> LogRecordIterator { - let count = self.record_count(); - if count == 0 { + if self.record_count() == 0 { return LogRecordIterator::empty(); } - // Reference: Java client uses deserializeRecordBatch which reads RecordBatch message directly - // with a pre-configured VectorSchemaRoot (schema). In Rust, we use read_record_batch - // which is the equivalent API - it reads RecordBatch message directly with a pre-configured schema. - - // Server returns RecordBatch message (without Schema message) - // Format: [continuation: 4 bytes][metadata_size: 4 bytes][RecordBatch metadata][body] let data = &self.data[RECORDS_OFFSET..]; - - if data.len() < 8 { - eprintln!("Data too short: {} bytes", data.len()); - return LogRecordIterator::empty(); - } - - // Parse continuation marker and metadata size - let continuation = u32::from_le_bytes([data[0], data[1], data[2], data[3]]); - let metadata_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize; - - if continuation != 0xFFFFFFFF { - eprintln!("Invalid continuation marker: 0x{:08X}", continuation); - return LogRecordIterator::empty(); - } - - if data.len() < 8 + metadata_size { - eprintln!("Data too short for metadata: {} < {}", data.len(), 8 + metadata_size); - return LogRecordIterator::empty(); - } - - // Parse RecordBatch metadata - let metadata_bytes = &data[8..8 + metadata_size]; - let message = match root_as_message(metadata_bytes) { - Ok(msg) => msg, - Err(e) => { - eprintln!("Failed to parse RecordBatch metadata: {:?}", e); - return LogRecordIterator::empty(); - } + let (batch_metadata, body_buffer, version) = match Self::parse_ipc_message(data) { + Some(result) => result, + None => return LogRecordIterator::empty(), }; - - // Verify it's a RecordBatch message - if message.header_type() != MessageHeader::RecordBatch { - eprintln!("Expected RecordBatch message, got: {:?}", message.header_type()); - return LogRecordIterator::empty(); - } - - let batch_metadata = match message.header_as_record_batch() { - Some(batch) => batch, - None => { - eprintln!("Failed to get RecordBatch from message"); - return LogRecordIterator::empty(); - } - }; - - // Extract body data - let body_start = 8 + metadata_size; - let body_data = &data[body_start..]; - let body_buffer = Buffer::from(body_data); - - // Debug: Check buffer information from metadata and compare with actual data - if let Some(buffers) = batch_metadata.buffers() { - eprintln!("DEBUG: ========== Buffer Analysis =========="); - eprintln!("DEBUG: RecordBatch has {} buffers, body size: {} bytes", buffers.len(), body_data.len()); - - let has_compression = batch_metadata.compression().is_some(); - if has_compression { - if let Some(compression) = batch_metadata.compression() { - eprintln!("DEBUG: RecordBatch uses compression: {:?}", compression.codec()); - } - } else { - eprintln!("DEBUG: RecordBatch has no compression"); - } - - // Collect all buffers to a vector for easier access - let buffers_vec: Vec<_> = buffers.iter().collect(); - - for (i, buf) in buffers_vec.iter().enumerate() { - let offset = buf.offset() as usize; - let length = buf.length() as usize; - - // Calculate actual buffer size (from current offset to next buffer offset or end of body) - let actual_size = if i + 1 < buffers_vec.len() { - let next_offset = buffers_vec[i + 1].offset() as usize; - next_offset - offset - } else { - body_data.len() - offset - }; - - eprintln!("DEBUG: --- Buffer {} ---", i); - eprintln!("DEBUG: Metadata: offset={}, length={}", offset, length); - eprintln!("DEBUG: Actual size (calculated): {} bytes", actual_size); - - if offset + length > body_data.len() { - eprintln!("DEBUG: ERROR - Buffer extends beyond body data! (offset + length = {} > {})", - offset + length, body_data.len()); - } - - if actual_size != length { - eprintln!("DEBUG: WARNING - Metadata length ({}) != Actual size ({})", length, actual_size); - } - - // Check if we can read the buffer data - if offset < body_data.len() { - let available = body_data.len() - offset; - let read_len = length.min(available).min(32); - eprintln!("DEBUG: Available data: {} bytes (requested: {} bytes)", available, length); - - if read_len > 0 { - eprintln!("DEBUG: First {} bytes: {:?}", read_len, &body_data[offset..offset + read_len]); - } - - // For compressed buffers, check if we can read the uncompressed length header - if has_compression { - if length > 0 && length < 8 { - eprintln!("DEBUG: ERROR - Compressed buffer length ({}) < 8 bytes (uncompressed length header required)", length); - } else if length >= 8 { - // Try to read uncompressed length - if available >= 8 { - let uncompressed_len_bytes = &body_data[offset..offset + 8]; - let uncompressed_len = i64::from_le_bytes([ - uncompressed_len_bytes[0], - uncompressed_len_bytes[1], - uncompressed_len_bytes[2], - uncompressed_len_bytes[3], - uncompressed_len_bytes[4], - uncompressed_len_bytes[5], - uncompressed_len_bytes[6], - uncompressed_len_bytes[7], - ]); - eprintln!("DEBUG: Uncompressed length (from header): {} bytes", uncompressed_len); - - if uncompressed_len == -1 { - eprintln!("DEBUG: Note: Buffer is not compressed (uncompressed length = -1)"); - } else if uncompressed_len == 0 { - eprintln!("DEBUG: Note: Buffer is empty (uncompressed length = 0)"); - } - } - } - } - } else { - eprintln!("DEBUG: ERROR - Buffer offset ({}) >= body data size ({})", offset, body_data.len()); - } - } - eprintln!("DEBUG: ====================================="); - } - - // Determine the schema to use (projected or full) + let schema_to_use = if let Some(projected_fields) = read_context.projected_fields() { - // Server has already projected the data, so use projected schema let projected_schema = arrow_schema::Schema::new( projected_fields .iter() @@ -639,27 +500,23 @@ impl<'a> LogRecordBatch<'a> { } else { read_context.arrow_schema.clone() }; - - // Read RecordBatch using read_record_batch (equivalent to Java's deserializeRecordBatch) - // This automatically handles compression (ipc_compression feature enabled) + let record_batch = match read_record_batch( &body_buffer, batch_metadata, schema_to_use, - &std::collections::HashMap::new(), // dictionaries (empty for now) - None, // projection (already handled by server, so None here) - &message.version(), + &std::collections::HashMap::new(), + None, + &version, ) { Ok(batch) => batch, Err(e) => { - eprintln!("Failed to read RecordBatch: {}", e); - eprintln!("Error details: {:?}", e); + error!(error = %e, "Failed to read RecordBatch"); return LogRecordIterator::empty(); } }; - let record_batch = Arc::new(record_batch); - let arrow_reader = ArrowReader::new(record_batch); + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); LogRecordIterator::Arrow(ArrowLogRecordIterator { reader: arrow_reader, base_offset: self.base_log_offset(), @@ -668,6 +525,57 @@ impl<'a> LogRecordBatch<'a> { change_type: ChangeType::AppendOnly, }) } + + /// Parse an Arrow IPC message from a byte slice. + /// + /// Similar to `arrow-ipc::reader::parse_message` (reader.rs:743-753), but also extracts + /// the body buffer. The implementation follows the same pattern: skip continuation marker + /// and metadata size, then parse the metadata message. + /// + /// Returns the RecordBatch metadata, body buffer, and metadata version. + fn parse_ipc_message( + data: &'a [u8], + ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, arrow::ipc::MetadataVersion)> { + const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; + + let metadata_buf = if data.len() >= 4 && data[..4] == CONTINUATION_MARKER { + if data.len() < 8 { + return None; + } + &data[8..] + } else { + if data.len() < 4 { + return None; + } + &data[4..] + }; + + let metadata_size = u32::from_le_bytes([ + metadata_buf[0], + metadata_buf[1], + metadata_buf[2], + metadata_buf[3], + ]) as usize; + + if metadata_buf.len() < 4 + metadata_size { + return None; + } + + let metadata_bytes = &metadata_buf[4..4 + metadata_size]; + let message = root_as_message(metadata_bytes).ok()?; + let batch_metadata = message.header_as_record_batch()?; + + let body_start = 4 + metadata_size; + let body_length = message.bodyLength() as usize; + if metadata_buf.len() < body_start + body_length { + return None; + } + + let body_data = &metadata_buf[body_start..body_start + body_length]; + let body_buffer = Buffer::from(body_data); + + Some((batch_metadata, body_buffer, message.version())) + } } pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { From c955fd5282d0850a65213d815341b8ce10fa1990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 22 Nov 2025 22:04:17 +0800 Subject: [PATCH 04/21] support non-projection --- .../src/projection_example_no_compression.rs | 10 ++++-- crates/fluss/src/client/table/scanner.rs | 31 ++++++++++++++++--- crates/fluss/src/record/arrow.rs | 19 +++++++++++- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/crates/examples/src/projection_example_no_compression.rs b/crates/examples/src/projection_example_no_compression.rs index 29bbc8c..8579689 100644 --- a/crates/examples/src/projection_example_no_compression.rs +++ b/crates/examples/src/projection_example_no_compression.rs @@ -80,7 +80,10 @@ pub async fn main() -> Result<()> { println!("=== Test 1: Full scan (no projection) ==="); let log_scanner = table.new_scan().create_log_scanner(); - log_scanner.subscribe(0, 0).await?; + let num_buckets = table.table_info().get_num_buckets(); + for bucket_id in 0..num_buckets { + log_scanner.subscribe(bucket_id, 0).await?; + } let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; println!("Fetched {} records", scan_records.count()); @@ -103,7 +106,10 @@ pub async fn main() -> Result<()> { let log_scanner = table.new_scan() .project(&[0, 1, 2])? .create_log_scanner(); - log_scanner.subscribe(0, 0).await?; + let num_buckets = table.table_info().get_num_buckets(); + for bucket_id in 0..num_buckets { + log_scanner.subscribe(bucket_id, 0).await?; + } let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; println!("Fetched {} records with projection", scan_records.count()); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8d450a6..323d1ee 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -186,10 +186,33 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); - let read_context = ReadContext::with_projection( - full_arrow_schema, - self.projected_fields.clone(), - ); + let read_context = if let Some(projected_fields) = &self.projected_fields { + let (projection_enabled, _) = if !projected_fields.is_empty() { + (true, projected_fields.clone()) + } else { + (false, vec![]) + }; + + if projection_enabled { + let projected_schema = arrow_schema::Schema::new( + projected_fields + .iter() + .map(|&idx| full_arrow_schema.field(idx).clone()) + .collect::>(), + ); + ReadContext::with_projection_pushdown( + Arc::new(projected_schema), + Some(projected_fields.clone()), + ) + } else { + ReadContext::with_projection( + full_arrow_schema, + self.projected_fields.clone(), + ) + } + } else { + ReadContext::new(full_arrow_schema) + }; for fetch_log_for_bucket in fetch_log_for_buckets { let mut fetch_records = vec![]; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index f70bf04..5c4b3cf 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -489,7 +489,9 @@ impl<'a> LogRecordBatch<'a> { None => return LogRecordIterator::empty(), }; - let schema_to_use = if let Some(projected_fields) = read_context.projected_fields() { + let schema_to_use = if read_context.is_projection_pushdown() { + read_context.arrow_schema.clone() + } else if let Some(projected_fields) = read_context.projected_fields() { let projected_schema = arrow_schema::Schema::new( projected_fields .iter() @@ -629,6 +631,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { pub struct ReadContext { arrow_schema: SchemaRef, projected_fields: Option>, + projection_pushdown: bool, } impl ReadContext { @@ -636,6 +639,7 @@ impl ReadContext { ReadContext { arrow_schema, projected_fields: None, + projection_pushdown: false, } } @@ -643,9 +647,22 @@ impl ReadContext { ReadContext { arrow_schema, projected_fields, + projection_pushdown: false, } } + pub fn with_projection_pushdown(arrow_schema: SchemaRef, projected_fields: Option>) -> ReadContext { + ReadContext { + arrow_schema, + projected_fields, + projection_pushdown: true, + } + } + + pub fn is_projection_pushdown(&self) -> bool { + self.projection_pushdown + } + pub fn to_arrow_metadata(&self) -> Result> { let mut arrow_schema_bytes = vec![]; let schema_to_use = if let Some(projected_fields) = &self.projected_fields { From 439f0717f75558f47acb00f31ba411db384d4136 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sun, 23 Nov 2025 00:34:33 +0800 Subject: [PATCH 05/21] chore refactor --- Cargo.toml | 2 +- crates/examples/Cargo.toml | 8 +- ...o_compression.rs => example_projection.rs} | 10 +- crates/examples/src/projection_example.rs | 145 ------------------ crates/fluss/src/record/arrow.rs | 49 +++--- 5 files changed, 24 insertions(+), 190 deletions(-) rename crates/examples/src/{projection_example_no_compression.rs => example_projection.rs} (91%) delete mode 100644 crates/examples/src/projection_example.rs diff --git a/Cargo.toml b/Cargo.toml index beb6c88..54436ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,5 +34,5 @@ members = ["crates/fluss", "crates/examples", "bindings/python"] fluss = { version = "0.1.0", path = "./crates/fluss" } tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } -arrow = { version = "55.1.0", features = ["ipc_compression"] } +arrow = "55.1.0" chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] } diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 5bc6060..3b53465 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -35,9 +35,5 @@ name = "example-table" path = "src/example_table.rs" [[example]] -name = "projection-example" -path = "src/projection_example.rs" - -[[example]] -name = "projection-example-no-compression" -path = "src/projection_example_no_compression.rs" \ No newline at end of file +name = "example-projection" +path = "src/example_projection.rs" \ No newline at end of file diff --git a/crates/examples/src/projection_example_no_compression.rs b/crates/examples/src/example_projection.rs similarity index 91% rename from crates/examples/src/projection_example_no_compression.rs rename to crates/examples/src/example_projection.rs index 8579689..193a64e 100644 --- a/crates/examples/src/projection_example_no_compression.rs +++ b/crates/examples/src/example_projection.rs @@ -46,7 +46,7 @@ pub async fn main() -> Result<()> { .property("table.log.arrow.compression.type", "NONE") .build()?; - let table_path = TablePath::new("fluss".to_owned(), "projection_test_no_compression_rust".to_owned()); + let table_path = TablePath::new("fluss".to_owned(), "projection_test".to_owned()); let admin = conn.get_admin().await?; @@ -101,8 +101,7 @@ pub async fn main() -> Result<()> { } println!(); - println!("=== Test 2: Project by column indices (id, name, age) - NO COMPRESSION ==="); - println!("This test verifies projection without compression to isolate compression-related issues."); + println!("=== Test 2: Project by column indices (id, name, age) ==="); let log_scanner = table.new_scan() .project(&[0, 1, 2])? .create_log_scanner(); @@ -125,11 +124,6 @@ pub async fn main() -> Result<()> { } println!(); - println!("✓ Rust client projection test (NO COMPRESSION) PASSED"); - println!(" This indicates projection works correctly without compression."); - println!(" If compression version fails, the issue is in compression handling."); - println!(); - println!("All tests completed successfully!"); Ok(()) } diff --git a/crates/examples/src/projection_example.rs b/crates/examples/src/projection_example.rs deleted file mode 100644 index f16e5ad..0000000 --- a/crates/examples/src/projection_example.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use clap::Parser; -use fluss::client::FlussConnection; -use fluss::config::Config; -use fluss::error::Result; -use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; -use std::time::Duration; -use tokio::try_join; - -#[tokio::main] -pub async fn main() -> Result<()> { - env_logger::init(); - - let mut config = Config::parse(); - config.bootstrap_server = Some("127.0.0.1:9123".to_string()); - - let conn = FlussConnection::new(config).await?; - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("name", DataTypes::string()) - .column("age", DataTypes::int()) - .column("email", DataTypes::string()) - .column("phone", DataTypes::string()) - .build()?, - ) - .log_format(LogFormat::ARROW) - .build()?; - - let table_path = TablePath::new("fluss".to_owned(), "projection_test".to_owned()); - - let admin = conn.get_admin().await?; - - admin - .create_table(&table_path, &table_descriptor, true) - .await?; - - let table_info = admin.get_table(&table_path).await?; - println!("Created table:\n{}\n", table_info); - - let table = conn.get_table(&table_path).await?; - let append_writer = table.new_append()?.create_writer(); - - println!("Writing test data..."); - let mut row = GenericRow::new(); - row.set_field(0, 1); - row.set_field(1, "Alice"); - row.set_field(2, 25); - row.set_field(3, "alice@example.com"); - row.set_field(4, "123-456-7890"); - let f1 = append_writer.append(row); - - row = GenericRow::new(); - row.set_field(0, 2); - row.set_field(1, "Bob"); - row.set_field(2, 30); - row.set_field(3, "bob@example.com"); - row.set_field(4, "098-765-4321"); - let f2 = append_writer.append(row); - - try_join!(f1, f2, append_writer.flush())?; - println!("Data written successfully\n"); - - println!("=== Test 1: Full scan (no projection) ==="); - let log_scanner = table.new_scan().create_log_scanner(); - log_scanner.subscribe(0, 0).await?; - - let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; - println!("Fetched {} records", scan_records.count()); - for record in scan_records { - let row = record.row(); - println!( - "Record @{}: id={}, name={}, age={}, email={}, phone={}", - record.offset(), - row.get_int(0), - row.get_string(1), - row.get_int(2), - row.get_string(3), - row.get_string(4) - ); - } - println!(); - - println!("=== Test 2: Project by column indices (id, name, age) ==="); - let log_scanner = table.new_scan() - .project(&[0, 1, 2])? - .create_log_scanner(); - log_scanner.subscribe(0, 0).await?; - - let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; - println!("Fetched {} records with projection", scan_records.count()); - for record in scan_records { - let row = record.row(); - println!( - "Record @{}: id={}, name={}, age={}", - record.offset(), - row.get_int(0), - row.get_string(1), - row.get_int(2) - ); - } - println!(); - - println!("=== Test 3: Project by column names (name, phone) ==="); - let log_scanner = table.new_scan() - .project_by_name(&["name", "phone"])? - .create_log_scanner(); - log_scanner.subscribe(0, 0).await?; - - let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; - println!("Fetched {} records with projection", scan_records.count()); - for record in scan_records { - let row = record.row(); - println!( - "Record @{}: name={}, phone={}", - record.offset(), - row.get_string(0), - row.get_string(1) - ); - } - println!(); - - println!("All tests completed successfully!"); - Ok(()) -} - diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 5c4b3cf..4b48976 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -530,50 +530,39 @@ impl<'a> LogRecordBatch<'a> { /// Parse an Arrow IPC message from a byte slice. /// - /// Similar to `arrow-ipc::reader::parse_message` (reader.rs:743-753), but also extracts - /// the body buffer. The implementation follows the same pattern: skip continuation marker - /// and metadata size, then parse the metadata message. + /// Server returns RecordBatch message (without Schema message) in the encapsulated message format. + /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 bytes][RecordBatch metadata][body] + /// + /// This format is documented at: + /// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format /// /// Returns the RecordBatch metadata, body buffer, and metadata version. fn parse_ipc_message( data: &'a [u8], ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, arrow::ipc::MetadataVersion)> { - const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; + const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; - let metadata_buf = if data.len() >= 4 && data[..4] == CONTINUATION_MARKER { - if data.len() < 8 { - return None; - } - &data[8..] - } else { - if data.len() < 4 { - return None; - } - &data[4..] - }; + if data.len() < 8 { + return None; + } - let metadata_size = u32::from_le_bytes([ - metadata_buf[0], - metadata_buf[1], - metadata_buf[2], - metadata_buf[3], - ]) as usize; + let continuation = u32::from_le_bytes([data[0], data[1], data[2], data[3]]); + let metadata_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize; - if metadata_buf.len() < 4 + metadata_size { + if continuation != CONTINUATION_MARKER { + return None; + } + + if data.len() < 8 + metadata_size { return None; } - let metadata_bytes = &metadata_buf[4..4 + metadata_size]; + let metadata_bytes = &data[8..8 + metadata_size]; let message = root_as_message(metadata_bytes).ok()?; let batch_metadata = message.header_as_record_batch()?; - let body_start = 4 + metadata_size; - let body_length = message.bodyLength() as usize; - if metadata_buf.len() < body_start + body_length { - return None; - } - - let body_data = &metadata_buf[body_start..body_start + body_length]; + let body_start = 8 + metadata_size; + let body_data = &data[body_start..]; let body_buffer = Buffer::from(body_data); Some((batch_metadata, body_buffer, message.version())) From 8d19bad3c81a1fe37c8001979115ac3b353530b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sun, 23 Nov 2025 00:41:34 +0800 Subject: [PATCH 06/21] address comments --- crates/fluss/src/record/arrow.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 4b48976..84aac51 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -489,18 +489,12 @@ impl<'a> LogRecordBatch<'a> { None => return LogRecordIterator::empty(), }; - let schema_to_use = if read_context.is_projection_pushdown() { - read_context.arrow_schema.clone() + let (schema_to_use, projection) = if read_context.is_projection_pushdown() { + (read_context.arrow_schema.clone(), None) } else if let Some(projected_fields) = read_context.projected_fields() { - let projected_schema = arrow_schema::Schema::new( - projected_fields - .iter() - .map(|&idx| read_context.arrow_schema.field(idx).clone()) - .collect::>(), - ); - Arc::new(projected_schema) + (read_context.arrow_schema.clone(), Some(projected_fields)) } else { - read_context.arrow_schema.clone() + (read_context.arrow_schema.clone(), None) }; let record_batch = match read_record_batch( @@ -508,7 +502,7 @@ impl<'a> LogRecordBatch<'a> { batch_metadata, schema_to_use, &std::collections::HashMap::new(), - None, + projection, &version, ) { Ok(batch) => batch, From 1a92c48b6648d629f56f9c112fa0991aba5b9287 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sun, 23 Nov 2025 19:28:55 +0800 Subject: [PATCH 07/21] address comments --- crates/fluss/src/client/table/scanner.rs | 48 ++++++++++++------------ crates/fluss/src/record/arrow.rs | 11 +++--- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 323d1ee..ff27f89 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -186,32 +186,23 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); - let read_context = if let Some(projected_fields) = &self.projected_fields { - let (projection_enabled, _) = if !projected_fields.is_empty() { - (true, projected_fields.clone()) - } else { - (false, vec![]) - }; - - if projection_enabled { - let projected_schema = arrow_schema::Schema::new( - projected_fields - .iter() - .map(|&idx| full_arrow_schema.field(idx).clone()) - .collect::>(), - ); - ReadContext::with_projection_pushdown( - Arc::new(projected_schema), - Some(projected_fields.clone()), - ) - } else { - ReadContext::with_projection( - full_arrow_schema, - self.projected_fields.clone(), - ) - } + let read_context = if self.is_projection_enabled() { + let projected_fields = self.projected_fields.as_ref().unwrap(); + let projected_schema = arrow_schema::Schema::new( + projected_fields + .iter() + .map(|&idx| full_arrow_schema.field(idx).clone()) + .collect::>(), + ); + ReadContext::with_projection_pushdown( + Arc::new(projected_schema), + Some(projected_fields.clone()), + ) } else { - ReadContext::new(full_arrow_schema) + ReadContext::with_projection( + full_arrow_schema, + self.projected_fields.clone(), + ) }; for fetch_log_for_bucket in fetch_log_for_buckets { @@ -306,6 +297,13 @@ impl LogFetcher { } } + fn is_projection_enabled(&self) -> bool { + self.projected_fields + .as_ref() + .map(|fields| !fields.is_empty()) + .unwrap_or(false) + } + fn fetchable_buckets(&self) -> Vec { // always available now self.log_scanner_status.fetchable_buckets(|_| true) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 84aac51..40d5056 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -16,7 +16,7 @@ // under the License. use crate::client::{Record, WriteRecord}; -use crate::error::Result; +use crate::error::{Result}; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; @@ -489,13 +489,12 @@ impl<'a> LogRecordBatch<'a> { None => return LogRecordIterator::empty(), }; - let (schema_to_use, projection) = if read_context.is_projection_pushdown() { - (read_context.arrow_schema.clone(), None) - } else if let Some(projected_fields) = read_context.projected_fields() { - (read_context.arrow_schema.clone(), Some(projected_fields)) + let projection = if read_context.is_projection_pushdown() { + None } else { - (read_context.arrow_schema.clone(), None) + read_context.projected_fields() }; + let schema_to_use = read_context.arrow_schema.clone(); let record_batch = match read_record_batch( &body_buffer, From 396d8742bd35e3f1f6f349bbc8e17f79cd874546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Sat, 29 Nov 2025 17:48:03 +0800 Subject: [PATCH 08/21] support project fields reordering --- crates/examples/Cargo.toml | 6 +- crates/examples/src/example_projection.rs | 130 ---------------------- crates/examples/src/example_table.rs | 38 +++++++ crates/fluss/src/client/table/scanner.rs | 13 ++- crates/fluss/src/record/arrow.rs | 67 ++++++++++- 5 files changed, 114 insertions(+), 140 deletions(-) delete mode 100644 crates/examples/src/example_projection.rs diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 3b53465..3f6d96a 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -32,8 +32,4 @@ env_logger = "0.11" [[example]] name = "example-table" -path = "src/example_table.rs" - -[[example]] -name = "example-projection" -path = "src/example_projection.rs" \ No newline at end of file +path = "src/example_table.rs" \ No newline at end of file diff --git a/crates/examples/src/example_projection.rs b/crates/examples/src/example_projection.rs deleted file mode 100644 index 193a64e..0000000 --- a/crates/examples/src/example_projection.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use clap::Parser; -use fluss::client::FlussConnection; -use fluss::config::Config; -use fluss::error::Result; -use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; -use std::time::Duration; - -#[tokio::main] -pub async fn main() -> Result<()> { - env_logger::init(); - - let mut config = Config::parse(); - config.bootstrap_server = Some("127.0.0.1:9123".to_string()); - - let conn = FlussConnection::new(config).await?; - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("name", DataTypes::string()) - .column("age", DataTypes::int()) - .column("email", DataTypes::string()) - .column("phone", DataTypes::string()) - .build()?, - ) - .log_format(LogFormat::ARROW) - .property("table.log.arrow.compression.type", "NONE") - .build()?; - - let table_path = TablePath::new("fluss".to_owned(), "projection_test".to_owned()); - - let admin = conn.get_admin().await?; - - // Drop table if exists, then create - let _ = admin.drop_table(&table_path, true).await; - admin - .create_table(&table_path, &table_descriptor, true) - .await?; - - let table_info = admin.get_table(&table_path).await?; - println!("Created table:\n{}\n", table_info); - - let table = conn.get_table(&table_path).await?; - let append_writer = table.new_append()?.create_writer(); - - println!("Writing test data..."); - for i in 0..10 { - let mut row = GenericRow::new(); - row.set_field(0, i); - let name = format!("User{}", i); - row.set_field(1, name.as_str()); - row.set_field(2, 20 + i); - let email = format!("user{}@example.com", i); - row.set_field(3, email.as_str()); - let phone = format!("123-456-{:04}", i); - row.set_field(4, phone.as_str()); - append_writer.append(row).await?; - } - append_writer.flush().await?; - println!("Data written successfully\n"); - - println!("=== Test 1: Full scan (no projection) ==="); - let log_scanner = table.new_scan().create_log_scanner(); - let num_buckets = table.table_info().get_num_buckets(); - for bucket_id in 0..num_buckets { - log_scanner.subscribe(bucket_id, 0).await?; - } - - let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; - println!("Fetched {} records", scan_records.count()); - for record in scan_records { - let row = record.row(); - println!( - "Record @{}: id={}, name={}, age={}, email={}, phone={}", - record.offset(), - row.get_int(0), - row.get_string(1), - row.get_int(2), - row.get_string(3), - row.get_string(4) - ); - } - println!(); - - println!("=== Test 2: Project by column indices (id, name, age) ==="); - let log_scanner = table.new_scan() - .project(&[0, 1, 2])? - .create_log_scanner(); - let num_buckets = table.table_info().get_num_buckets(); - for bucket_id in 0..num_buckets { - log_scanner.subscribe(bucket_id, 0).await?; - } - - let scan_records = log_scanner.poll(Duration::from_secs(5)).await?; - println!("Fetched {} records with projection", scan_records.count()); - for record in scan_records { - let row = record.row(); - println!( - "Record @{}: id={}, name={}, age={}", - record.offset(), - row.get_int(0), - row.get_string(1), - row.get_int(2) - ); - } - println!(); - - println!("All tests completed successfully!"); - Ok(()) -} - diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index ead257a..8ca809b 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -83,4 +83,42 @@ pub async fn main() -> Result<()> { ); } } + + // Example: Projection scan (commented out as it's after the infinite loop) + // Uncomment and place before the loop to test projection: + // + // println!("=== Projection scan: only c2 column ==="); + // let log_scanner_projected = table + // .new_scan() + // .project(&[1])? + // .create_log_scanner(); + // log_scanner_projected.subscribe(0, 0).await?; + // + // let scan_records_projected = log_scanner_projected + // .poll(Duration::from_secs(10)) + // .await?; + // for record in scan_records_projected { + // let row = record.row(); + // println!("{{c2={}}}@{}", row.get_string(0), record.offset()); + // } + // + // println!("=== Projection scan: reordered columns (c2, c1) ==="); + // let log_scanner_reordered = table + // .new_scan() + // .project(&[1, 0])? + // .create_log_scanner(); + // log_scanner_reordered.subscribe(0, 0).await?; + // + // let scan_records_reordered = log_scanner_reordered + // .poll(Duration::from_secs(10)) + // .await?; + // for record in scan_records_reordered { + // let row = record.row(); + // println!( + // "{{{}, {}}}@{}", + // row.get_string(0), + // row.get_int(1), + // record.offset() + // ); + // } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index ff27f89..8328745 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -188,15 +188,19 @@ impl LogFetcher { let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); let read_context = if self.is_projection_enabled() { let projected_fields = self.projected_fields.as_ref().unwrap(); + // Server returns columns in sorted order, build schema accordingly + let mut sorted_fields = projected_fields.clone(); + sorted_fields.sort(); let projected_schema = arrow_schema::Schema::new( - projected_fields + sorted_fields .iter() .map(|&idx| full_arrow_schema.field(idx).clone()) .collect::>(), ); ReadContext::with_projection_pushdown( Arc::new(projected_schema), - Some(projected_fields.clone()), + projected_fields.clone(), + sorted_fields, ) } else { ReadContext::with_projection( @@ -268,7 +272,10 @@ impl LogFetcher { if fields.is_empty() { (false, vec![]) } else { - (true, fields.iter().map(|&i| i as i32).collect()) + // Server requires projected_fields to be in ascending order + let mut sorted_fields = fields.clone(); + sorted_fields.sort(); + (true, sorted_fields.iter().map(|&i| i as i32).collect()) } } else { (false, vec![]) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 40d5056..dae4517 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -489,7 +489,14 @@ impl<'a> LogRecordBatch<'a> { None => return LogRecordIterator::empty(), }; + let reordering_indexes_opt = if read_context.is_projection_pushdown() { + read_context.reordering_indexes() + } else { + None + }; let projection = if read_context.is_projection_pushdown() { + // When projection pushdown is enabled, server already returns projected columns + // in sorted order. We read all columns then reorder them. None } else { read_context.projected_fields() @@ -511,6 +518,23 @@ impl<'a> LogRecordBatch<'a> { } }; + // Reorder columns if needed (when projection pushdown with non-sorted order) + let record_batch = if let Some(reordering_indexes) = &reordering_indexes_opt { + let reordered_columns: Vec<_> = reordering_indexes + .iter() + .map(|&idx| record_batch.column(idx).clone()) + .collect(); + let reordered_fields: Vec<_> = reordering_indexes + .iter() + .map(|&idx| record_batch.schema().field(idx).clone()) + .collect(); + let reordered_schema = Arc::new(arrow_schema::Schema::new(reordered_fields)); + RecordBatch::try_new(reordered_schema, reordered_columns) + .expect("Failed to create reordered RecordBatch") + } else { + record_batch + }; + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); LogRecordIterator::Arrow(ArrowLogRecordIterator { reader: arrow_reader, @@ -614,6 +638,7 @@ pub struct ReadContext { arrow_schema: SchemaRef, projected_fields: Option>, projection_pushdown: bool, + projection_in_order: Option>, } impl ReadContext { @@ -622,6 +647,7 @@ impl ReadContext { arrow_schema, projected_fields: None, projection_pushdown: false, + projection_in_order: None, } } @@ -630,14 +656,20 @@ impl ReadContext { arrow_schema, projected_fields, projection_pushdown: false, + projection_in_order: None, } } - pub fn with_projection_pushdown(arrow_schema: SchemaRef, projected_fields: Option>) -> ReadContext { + pub fn with_projection_pushdown( + arrow_schema: SchemaRef, + projected_fields: Vec, + projection_in_order: Vec, + ) -> ReadContext { ReadContext { arrow_schema, - projected_fields, + projected_fields: Some(projected_fields), projection_pushdown: true, + projection_in_order: Some(projection_in_order), } } @@ -665,6 +697,37 @@ impl ReadContext { pub fn projected_fields(&self) -> Option<&[usize]> { self.projected_fields.as_deref() } + + pub fn reordering_indexes(&self) -> Option> { + if !self.projection_pushdown { + return None; + } + + let projected_fields = match &self.projected_fields { + Some(fields) => fields, + None => return None, + }; + + let projection_in_order = match &self.projection_in_order { + Some(order) => order, + None => return None, + }; + + if projected_fields.is_empty() { + return None; + } + + // Calculate reordering indexes to transform from sorted order to user-requested order + let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); + for &original_idx in projected_fields { + let pos = projection_in_order + .binary_search(&original_idx) + .expect("projection index should exist in sorted list"); + reordering_indexes.push(pos); + } + + Some(reordering_indexes) + } } pub enum LogRecordIterator { From a0c1ce23dd86fd89fdea67b46a8889d9056b8319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 14:32:13 +0800 Subject: [PATCH 09/21] address comments --- crates/examples/src/example_table.rs | 38 ---------------------------- 1 file changed, 38 deletions(-) diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 8ca809b..ead257a 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -83,42 +83,4 @@ pub async fn main() -> Result<()> { ); } } - - // Example: Projection scan (commented out as it's after the infinite loop) - // Uncomment and place before the loop to test projection: - // - // println!("=== Projection scan: only c2 column ==="); - // let log_scanner_projected = table - // .new_scan() - // .project(&[1])? - // .create_log_scanner(); - // log_scanner_projected.subscribe(0, 0).await?; - // - // let scan_records_projected = log_scanner_projected - // .poll(Duration::from_secs(10)) - // .await?; - // for record in scan_records_projected { - // let row = record.row(); - // println!("{{c2={}}}@{}", row.get_string(0), record.offset()); - // } - // - // println!("=== Projection scan: reordered columns (c2, c1) ==="); - // let log_scanner_reordered = table - // .new_scan() - // .project(&[1, 0])? - // .create_log_scanner(); - // log_scanner_reordered.subscribe(0, 0).await?; - // - // let scan_records_reordered = log_scanner_reordered - // .poll(Duration::from_secs(10)) - // .await?; - // for record in scan_records_reordered { - // let row = record.row(); - // println!( - // "{{{}, {}}}@{}", - // row.get_string(0), - // row.get_int(1), - // record.offset() - // ); - // } } From 25f2c993e5c6ac4933f51a0e5380510b5bcadfd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 14:35:00 +0800 Subject: [PATCH 10/21] address comments --- crates/fluss/src/client/table/scanner.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8328745..878e89c 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -53,6 +53,9 @@ impl<'a> TableScan<'a> { } pub fn project(mut self, column_indices: &[usize]) -> Result { + if column_indices.is_empty() { + return Err(Error::IllegalArgument("Column indices cannot be empty".to_string())); + } let field_count = self.table_info.row_type().fields().len(); for &idx in column_indices { if idx >= field_count { @@ -64,6 +67,9 @@ impl<'a> TableScan<'a> { } pub fn project_by_name(mut self, column_names: &[&str]) -> Result { + if column_names.is_empty() { + return Err(Error::IllegalArgument("Column names cannot be empty".to_string())); + } let row_type = self.table_info.row_type(); let mut indices = Vec::new(); @@ -269,14 +275,9 @@ impl LogFetcher { HashMap::new() } else { let (projection_enabled, projected_fields) = if let Some(fields) = &self.projected_fields { - if fields.is_empty() { - (false, vec![]) - } else { - // Server requires projected_fields to be in ascending order - let mut sorted_fields = fields.clone(); - sorted_fields.sort(); - (true, sorted_fields.iter().map(|&i| i as i32).collect()) - } + let mut sorted_fields = fields.clone(); + sorted_fields.sort(); + (true, sorted_fields.iter().map(|&i| i as i32).collect()) } else { (false, vec![]) }; @@ -305,10 +306,7 @@ impl LogFetcher { } fn is_projection_enabled(&self) -> bool { - self.projected_fields - .as_ref() - .map(|fields| !fields.is_empty()) - .unwrap_or(false) + self.projected_fields.is_some() } fn fetchable_buckets(&self) -> Vec { From 361a23a0d6e394239e5cfe9b258455573628162f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 14:54:00 +0800 Subject: [PATCH 11/21] address comments --- crates/fluss/src/client/table/scanner.rs | 107 ++++++++++++++--------- crates/fluss/src/record/arrow.rs | 53 +++-------- 2 files changed, 79 insertions(+), 81 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 878e89c..4364000 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -22,6 +22,7 @@ use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::RpcClient; +use arrow_schema::SchemaRef; use crate::util::FairBucketStatusMap; use parking_lot::RwLock; use std::collections::HashMap; @@ -39,6 +40,7 @@ pub struct TableScan<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, + /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, } @@ -52,6 +54,18 @@ impl<'a> TableScan<'a> { } } + /// Projects the scan to only include specified columns by their indices. + /// + /// # Arguments + /// * `column_indices` - Zero-based indices of columns to include in the scan + /// + /// # Errors + /// Returns an error if `column_indices` is empty or if any column index is out of range. + /// + /// # Example + /// ``` + /// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner(); + /// ``` pub fn project(mut self, column_indices: &[usize]) -> Result { if column_indices.is_empty() { return Err(Error::IllegalArgument("Column indices cannot be empty".to_string())); @@ -66,6 +80,18 @@ impl<'a> TableScan<'a> { Ok(self) } + /// Projects the scan to only include specified columns by their names. + /// + /// # Arguments + /// * `column_names` - Names of columns to include in the scan + /// + /// # Errors + /// Returns an error if `column_names` is empty or if any column name is not found in the table schema. + /// + /// # Example + /// ``` + /// let scanner = table.new_scan().project_by_name(&["col1", "col3"])?.create_log_scanner(); + /// ``` pub fn project_by_name(mut self, column_names: &[&str]) -> Result { if column_names.is_empty() { return Err(Error::IllegalArgument("Column names cannot be empty".to_string())); @@ -153,6 +179,7 @@ struct LogFetcher { metadata: Arc, log_scanner_status: Arc, projected_fields: Option>, + read_context: ReadContext, } impl LogFetcher { @@ -163,13 +190,40 @@ impl LogFetcher { log_scanner_status: Arc, projected_fields: Option>, ) -> Self { + let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); + let read_context = Self::create_read_context(full_arrow_schema, &projected_fields); LogFetcher { table_path: table_info.table_path.clone(), - conns: conns.clone(), - table_info: table_info.clone(), - metadata: metadata.clone(), - log_scanner_status: log_scanner_status.clone(), + conns, + table_info, + metadata, + log_scanner_status, projected_fields, + read_context, + } + } + + fn create_read_context( + full_arrow_schema: SchemaRef, + projected_fields: &Option>, + ) -> ReadContext { + match projected_fields { + None => ReadContext::new(full_arrow_schema), + Some(fields) => { + let mut sorted_fields = fields.clone(); + sorted_fields.sort(); + let projected_schema = arrow_schema::Schema::new( + sorted_fields + .iter() + .map(|&idx| full_arrow_schema.field(idx).clone()) + .collect::>(), + ); + ReadContext::with_projection_pushdown( + Arc::new(projected_schema), + fields.clone(), + sorted_fields, + ) + } } } @@ -190,31 +244,7 @@ impl LogFetcher { for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; - - let full_arrow_schema = to_arrow_schema(self.table_info.get_row_type()); - let read_context = if self.is_projection_enabled() { - let projected_fields = self.projected_fields.as_ref().unwrap(); - // Server returns columns in sorted order, build schema accordingly - let mut sorted_fields = projected_fields.clone(); - sorted_fields.sort(); - let projected_schema = arrow_schema::Schema::new( - sorted_fields - .iter() - .map(|&idx| full_arrow_schema.field(idx).clone()) - .collect::>(), - ); - ReadContext::with_projection_pushdown( - Arc::new(projected_schema), - projected_fields.clone(), - sorted_fields, - ) - } else { - ReadContext::with_projection( - full_arrow_schema, - self.projected_fields.clone(), - ) - }; - + for fetch_log_for_bucket in fetch_log_for_buckets { let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; @@ -224,7 +254,7 @@ impl LogFetcher { for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); fetch_records - .extend(log_record.records(read_context.clone())); + .extend(log_record.records(&self.read_context)?); self.log_scanner_status .update_offset(&table_bucket, last_offset + 1); } @@ -274,12 +304,13 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = if let Some(fields) = &self.projected_fields { - let mut sorted_fields = fields.clone(); - sorted_fields.sort(); - (true, sorted_fields.iter().map(|&i| i as i32).collect()) - } else { - (false, vec![]) + let (projection_enabled, projected_fields) = match self.read_context.projected_fields() { + None => { + (false, vec![]) + } + Some(fields) => { + (true, fields.iter().map(|&i| i as i32).collect()) + } }; fetch_log_req_for_buckets @@ -305,10 +336,6 @@ impl LogFetcher { } } - fn is_projection_enabled(&self) -> bool { - self.projected_fields.is_some() - } - fn fetchable_buckets(&self) -> Vec { // always available now self.log_scanner_status.fetchable_buckets(|_| true) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index dae4517..6919ffa 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -20,7 +20,6 @@ use crate::error::{Result}; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; -use tracing::error; use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder, UInt8Builder, @@ -478,15 +477,15 @@ impl<'a> LogRecordBatch<'a> { LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH]) } - pub fn records(&self, read_context: ReadContext) -> LogRecordIterator { + pub fn records(&self, read_context: &ReadContext) -> Result { if self.record_count() == 0 { - return LogRecordIterator::empty(); + return Ok(LogRecordIterator::empty()); } let data = &self.data[RECORDS_OFFSET..]; let (batch_metadata, body_buffer, version) = match Self::parse_ipc_message(data) { Some(result) => result, - None => return LogRecordIterator::empty(), + None => return Ok(LogRecordIterator::empty()), }; let reordering_indexes_opt = if read_context.is_projection_pushdown() { @@ -494,29 +493,18 @@ impl<'a> LogRecordBatch<'a> { } else { None }; - let projection = if read_context.is_projection_pushdown() { - // When projection pushdown is enabled, server already returns projected columns - // in sorted order. We read all columns then reorder them. - None - } else { - read_context.projected_fields() - }; + + let projection = read_context.projected_fields(); let schema_to_use = read_context.arrow_schema.clone(); - let record_batch = match read_record_batch( + let record_batch = read_record_batch( &body_buffer, batch_metadata, schema_to_use, &std::collections::HashMap::new(), projection, &version, - ) { - Ok(batch) => batch, - Err(e) => { - error!(error = %e, "Failed to read RecordBatch"); - return LogRecordIterator::empty(); - } - }; + )?; // Reorder columns if needed (when projection pushdown with non-sorted order) let record_batch = if let Some(reordering_indexes) = &reordering_indexes_opt { @@ -529,20 +517,19 @@ impl<'a> LogRecordBatch<'a> { .map(|&idx| record_batch.schema().field(idx).clone()) .collect(); let reordered_schema = Arc::new(arrow_schema::Schema::new(reordered_fields)); - RecordBatch::try_new(reordered_schema, reordered_columns) - .expect("Failed to create reordered RecordBatch") + RecordBatch::try_new(reordered_schema, reordered_columns)? } else { record_batch }; let arrow_reader = ArrowReader::new(Arc::new(record_batch)); - LogRecordIterator::Arrow(ArrowLogRecordIterator { + Ok(LogRecordIterator::Arrow(ArrowLogRecordIterator { reader: arrow_reader, base_offset: self.base_log_offset(), timestamp: self.commit_timestamp(), row_id: 0, change_type: ChangeType::AppendOnly, - }) + })) } /// Parse an Arrow IPC message from a byte slice. @@ -563,8 +550,8 @@ impl<'a> LogRecordBatch<'a> { return None; } - let continuation = u32::from_le_bytes([data[0], data[1], data[2], data[3]]); - let metadata_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize; + let continuation = LittleEndian::read_u32(&data[0..4]); + let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; if continuation != CONTINUATION_MARKER { return None; @@ -677,22 +664,6 @@ impl ReadContext { self.projection_pushdown } - pub fn to_arrow_metadata(&self) -> Result> { - let mut arrow_schema_bytes = vec![]; - let schema_to_use = if let Some(projected_fields) = &self.projected_fields { - let projected_schema = arrow_schema::Schema::new( - projected_fields - .iter() - .map(|&idx| self.arrow_schema.field(idx).clone()) - .collect::>(), - ); - Arc::new(projected_schema) - } else { - self.arrow_schema.clone() - }; - let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, &schema_to_use)?; - Ok(arrow_schema_bytes) - } pub fn projected_fields(&self) -> Option<&[usize]> { self.projected_fields.as_deref() From 29e1dd6c63cd771447dfd3e3e2894224a171d5c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 15:05:18 +0800 Subject: [PATCH 12/21] address comments --- crates/fluss/src/client/table/scanner.rs | 12 +++--------- crates/fluss/src/record/arrow.rs | 7 +++++-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 4364000..ac4e569 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -178,7 +178,6 @@ struct LogFetcher { table_info: TableInfo, metadata: Arc, log_scanner_status: Arc, - projected_fields: Option>, read_context: ReadContext, } @@ -198,7 +197,6 @@ impl LogFetcher { table_info, metadata, log_scanner_status, - projected_fields, read_context, } } @@ -304,13 +302,9 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = match self.read_context.projected_fields() { - None => { - (false, vec![]) - } - Some(fields) => { - (true, fields.iter().map(|&i| i as i32).collect()) - } + let (projection_enabled, projected_fields) = match self.read_context.projection_in_order() { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), }; fetch_log_req_for_buckets diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 6919ffa..dc84239 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -494,7 +494,6 @@ impl<'a> LogRecordBatch<'a> { None }; - let projection = read_context.projected_fields(); let schema_to_use = read_context.arrow_schema.clone(); let record_batch = read_record_batch( @@ -502,7 +501,7 @@ impl<'a> LogRecordBatch<'a> { batch_metadata, schema_to_use, &std::collections::HashMap::new(), - projection, + None, &version, )?; @@ -669,6 +668,10 @@ impl ReadContext { self.projected_fields.as_deref() } + pub fn projection_in_order(&self) -> Option<&[usize]> { + self.projection_in_order.as_deref() + } + pub fn reordering_indexes(&self) -> Option> { if !self.projection_pushdown { return None; From 281cc64f82eb17cff2dd2eb28f8a541e849d9868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 15:16:39 +0800 Subject: [PATCH 13/21] fix fmt check --- crates/fluss/src/client/table/scanner.rs | 37 +++++++++++++++--------- crates/fluss/src/record/arrow.rs | 30 +++++++++++-------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index ac4e569..0649b87 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -22,8 +22,8 @@ use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::RpcClient; -use arrow_schema::SchemaRef; use crate::util::FairBucketStatusMap; +use arrow_schema::SchemaRef; use parking_lot::RwLock; use std::collections::HashMap; use std::slice::from_ref; @@ -68,18 +68,24 @@ impl<'a> TableScan<'a> { /// ``` pub fn project(mut self, column_indices: &[usize]) -> Result { if column_indices.is_empty() { - return Err(Error::IllegalArgument("Column indices cannot be empty".to_string())); + return Err(Error::IllegalArgument( + "Column indices cannot be empty".to_string(), + )); } let field_count = self.table_info.row_type().fields().len(); for &idx in column_indices { if idx >= field_count { - return Err(Error::IllegalArgument(format!("Column index {} out of range (max: {})", idx, field_count - 1))); + return Err(Error::IllegalArgument(format!( + "Column index {} out of range (max: {})", + idx, + field_count - 1 + ))); } } self.projected_fields = Some(column_indices.to_vec()); Ok(self) } - + /// Projects the scan to only include specified columns by their names. /// /// # Arguments @@ -94,19 +100,22 @@ impl<'a> TableScan<'a> { /// ``` pub fn project_by_name(mut self, column_names: &[&str]) -> Result { if column_names.is_empty() { - return Err(Error::IllegalArgument("Column names cannot be empty".to_string())); + return Err(Error::IllegalArgument( + "Column names cannot be empty".to_string(), + )); } let row_type = self.table_info.row_type(); let mut indices = Vec::new(); - + for name in column_names { - let idx = row_type.fields() + let idx = row_type + .fields() .iter() .position(|f| f.name() == *name) .ok_or_else(|| Error::IllegalArgument(format!("Column '{}' not found", name)))?; indices.push(idx); } - + self.projected_fields = Some(indices); Ok(self) } @@ -251,8 +260,7 @@ impl LogFetcher { let data = fetch_log_for_bucket.records.unwrap(); for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); - fetch_records - .extend(log_record.records(&self.read_context)?); + fetch_records.extend(log_record.records(&self.read_context)?); self.log_scanner_status .update_offset(&table_bucket, last_offset + 1); } @@ -302,10 +310,11 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = match self.read_context.projection_in_order() { - None => (false, vec![]), - Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), - }; + let (projection_enabled, projected_fields) = + match self.read_context.projection_in_order() { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + }; fetch_log_req_for_buckets .into_iter() diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index dc84239..fc0150b 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -16,7 +16,7 @@ // under the License. use crate::client::{Record, WriteRecord}; -use crate::error::{Result}; +use crate::error::Result; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; @@ -29,9 +29,9 @@ use arrow::{ array::RecordBatch, buffer::Buffer, ipc::{ - reader::{read_record_batch, StreamReader}, - writer::StreamWriter, + reader::{StreamReader, read_record_batch}, root_as_message, + writer::StreamWriter, }, }; use arrow_schema::SchemaRef; @@ -542,20 +542,24 @@ impl<'a> LogRecordBatch<'a> { /// Returns the RecordBatch metadata, body buffer, and metadata version. fn parse_ipc_message( data: &'a [u8], - ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, arrow::ipc::MetadataVersion)> { + ) -> Option<( + arrow::ipc::RecordBatch<'a>, + Buffer, + arrow::ipc::MetadataVersion, + )> { const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; - + if data.len() < 8 { return None; } - + let continuation = LittleEndian::read_u32(&data[0..4]); let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; - + if continuation != CONTINUATION_MARKER { return None; } - + if data.len() < 8 + metadata_size { return None; } @@ -637,7 +641,10 @@ impl ReadContext { } } - pub fn with_projection(arrow_schema: SchemaRef, projected_fields: Option>) -> ReadContext { + pub fn with_projection( + arrow_schema: SchemaRef, + projected_fields: Option>, + ) -> ReadContext { ReadContext { arrow_schema, projected_fields, @@ -663,7 +670,6 @@ impl ReadContext { self.projection_pushdown } - pub fn projected_fields(&self) -> Option<&[usize]> { self.projected_fields.as_deref() } @@ -676,7 +682,7 @@ impl ReadContext { if !self.projection_pushdown { return None; } - + let projected_fields = match &self.projected_fields { Some(fields) => fields, None => return None, @@ -699,7 +705,7 @@ impl ReadContext { .expect("projection index should exist in sorted list"); reordering_indexes.push(pos); } - + Some(reordering_indexes) } } From 2dd9e37b1abfd4cd24c7394cc202116936feb2fa Mon Sep 17 00:00:00 2001 From: AlexZhao Date: Wed, 3 Dec 2025 15:21:37 +0800 Subject: [PATCH 14/21] Update crates/examples/Cargo.toml Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/examples/Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 3f6d96a..dab85b6 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,9 +27,6 @@ version = { workspace = true } fluss = { workspace = true } tokio = { workspace = true } clap = { workspace = true} -env_logger = "0.11" - - [[example]] name = "example-table" path = "src/example_table.rs" \ No newline at end of file From e260f89457765949c35a723f76121435580b8e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Wed, 3 Dec 2025 15:30:37 +0800 Subject: [PATCH 15/21] address comments --- crates/fluss/src/client/table/scanner.rs | 8 ++++---- crates/fluss/src/record/arrow.rs | 16 ---------------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 0649b87..33994c9 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -199,7 +199,7 @@ impl LogFetcher { projected_fields: Option>, ) -> Self { let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); - let read_context = Self::create_read_context(full_arrow_schema, &projected_fields); + let read_context = Self::create_read_context(full_arrow_schema, projected_fields); LogFetcher { table_path: table_info.table_path.clone(), conns, @@ -212,13 +212,13 @@ impl LogFetcher { fn create_read_context( full_arrow_schema: SchemaRef, - projected_fields: &Option>, + projected_fields: Option>, ) -> ReadContext { match projected_fields { None => ReadContext::new(full_arrow_schema), Some(fields) => { let mut sorted_fields = fields.clone(); - sorted_fields.sort(); + sorted_fields.sort_unstable(); let projected_schema = arrow_schema::Schema::new( sorted_fields .iter() @@ -227,7 +227,7 @@ impl LogFetcher { ); ReadContext::with_projection_pushdown( Arc::new(projected_schema), - fields.clone(), + fields, sorted_fields, ) } diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index fc0150b..c8ef225 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -641,18 +641,6 @@ impl ReadContext { } } - pub fn with_projection( - arrow_schema: SchemaRef, - projected_fields: Option>, - ) -> ReadContext { - ReadContext { - arrow_schema, - projected_fields, - projection_pushdown: false, - projection_in_order: None, - } - } - pub fn with_projection_pushdown( arrow_schema: SchemaRef, projected_fields: Vec, @@ -670,10 +658,6 @@ impl ReadContext { self.projection_pushdown } - pub fn projected_fields(&self) -> Option<&[usize]> { - self.projected_fields.as_deref() - } - pub fn projection_in_order(&self) -> Option<&[usize]> { self.projection_in_order.as_deref() } From 2d98dc7c3ec7c03cfb1a22b0d160080989b62553 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 4 Dec 2025 00:36:40 +0800 Subject: [PATCH 16/21] improve reorder logic --- crates/fluss/src/client/table/scanner.rs | 27 +-- crates/fluss/src/record/arrow.rs | 294 +++++++++++++---------- 2 files changed, 171 insertions(+), 150 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 33994c9..91036cb 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -112,7 +112,7 @@ impl<'a> TableScan<'a> { .fields() .iter() .position(|f| f.name() == *name) - .ok_or_else(|| Error::IllegalArgument(format!("Column '{}' not found", name)))?; + .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?; indices.push(idx); } @@ -216,21 +216,7 @@ impl LogFetcher { ) -> ReadContext { match projected_fields { None => ReadContext::new(full_arrow_schema), - Some(fields) => { - let mut sorted_fields = fields.clone(); - sorted_fields.sort_unstable(); - let projected_schema = arrow_schema::Schema::new( - sorted_fields - .iter() - .map(|&idx| full_arrow_schema.field(idx).clone()) - .collect::>(), - ); - ReadContext::with_projection_pushdown( - Arc::new(projected_schema), - fields, - sorted_fields, - ) - } + Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, fields), } } @@ -310,11 +296,10 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = - match self.read_context.projection_in_order() { - None => (false, vec![]), - Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), - }; + let (projection_enabled, projected_fields) = match self.read_context.project_fields() { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + }; fetch_log_req_for_buckets .into_iter() diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index c8ef225..0d0e8bd 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -483,99 +483,78 @@ impl<'a> LogRecordBatch<'a> { } let data = &self.data[RECORDS_OFFSET..]; - let (batch_metadata, body_buffer, version) = match Self::parse_ipc_message(data) { - Some(result) => result, - None => return Ok(LogRecordIterator::empty()), - }; - - let reordering_indexes_opt = if read_context.is_projection_pushdown() { - read_context.reordering_indexes() - } else { - None - }; - let schema_to_use = read_context.arrow_schema.clone(); - - let record_batch = read_record_batch( - &body_buffer, - batch_metadata, - schema_to_use, - &std::collections::HashMap::new(), - None, - &version, - )?; - - // Reorder columns if needed (when projection pushdown with non-sorted order) - let record_batch = if let Some(reordering_indexes) = &reordering_indexes_opt { - let reordered_columns: Vec<_> = reordering_indexes - .iter() - .map(|&idx| record_batch.column(idx).clone()) - .collect(); - let reordered_fields: Vec<_> = reordering_indexes - .iter() - .map(|&idx| record_batch.schema().field(idx).clone()) - .collect(); - let reordered_schema = Arc::new(arrow_schema::Schema::new(reordered_fields)); - RecordBatch::try_new(reordered_schema, reordered_columns)? - } else { - record_batch + let record_batch = read_context.record_batch(data)?; + let log_record_iterator = match record_batch { + None => LogRecordIterator::empty(), + Some(record_batch) => { + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); + LogRecordIterator::Arrow(ArrowLogRecordIterator { + reader: arrow_reader, + base_offset: self.base_log_offset(), + timestamp: self.commit_timestamp(), + row_id: 0, + change_type: ChangeType::AppendOnly, + }) + } }; - - let arrow_reader = ArrowReader::new(Arc::new(record_batch)); - Ok(LogRecordIterator::Arrow(ArrowLogRecordIterator { - reader: arrow_reader, - base_offset: self.base_log_offset(), - timestamp: self.commit_timestamp(), - row_id: 0, - change_type: ChangeType::AppendOnly, - })) - } - - /// Parse an Arrow IPC message from a byte slice. - /// - /// Server returns RecordBatch message (without Schema message) in the encapsulated message format. - /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 bytes][RecordBatch metadata][body] - /// - /// This format is documented at: - /// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format - /// - /// Returns the RecordBatch metadata, body buffer, and metadata version. - fn parse_ipc_message( - data: &'a [u8], - ) -> Option<( - arrow::ipc::RecordBatch<'a>, - Buffer, - arrow::ipc::MetadataVersion, - )> { - const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; - - if data.len() < 8 { - return None; - } - - let continuation = LittleEndian::read_u32(&data[0..4]); - let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; - - if continuation != CONTINUATION_MARKER { - return None; - } - - if data.len() < 8 + metadata_size { - return None; - } - - let metadata_bytes = &data[8..8 + metadata_size]; - let message = root_as_message(metadata_bytes).ok()?; - let batch_metadata = message.header_as_record_batch()?; - - let body_start = 8 + metadata_size; - let body_data = &data[body_start..]; - let body_buffer = Buffer::from(body_data); - - Some((batch_metadata, body_buffer, message.version())) + Ok(log_record_iterator) } } +/// Parse an Arrow IPC message from a byte slice. +/// +/// Server returns RecordBatch message (without Schema message) in the encapsulated message format. +/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 bytes][RecordBatch metadata][body] +/// +/// This format is documented at: +/// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format +/// +/// # Arguments +/// * `data` - The byte slice containing the IPC message. +/// +/// # Returns +/// Returns `Some((batch_metadata, body_buffer, version))` on success: +/// - `batch_metadata`: The RecordBatch metadata from the IPC message. +/// - `body_buffer`: The buffer containing the record batch body data. +/// - `version`: The Arrow IPC metadata version. +/// +/// Returns `None` if the data is malformed or too short. +fn parse_ipc_message( + data: &[u8], +) -> Option<( + arrow::ipc::RecordBatch<'_>, + Buffer, + arrow::ipc::MetadataVersion, +)> { + const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; + + if data.len() < 8 { + return None; + } + + let continuation = LittleEndian::read_u32(&data[0..4]); + let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; + + if continuation != CONTINUATION_MARKER { + return None; + } + + if data.len() < 8 + metadata_size { + return None; + } + + let metadata_bytes = &data[8..8 + metadata_size]; + let message = root_as_message(metadata_bytes).ok()?; + let batch_metadata = message.header_as_record_batch()?; + + let body_start = 8 + metadata_size; + let body_data = &data[body_start..]; + let body_buffer = Buffer::from(body_data); + + Some((batch_metadata, body_buffer, message.version())) +} + pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { match &fluss_schema { DataType::Row(row_type) => { @@ -625,72 +604,129 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { #[derive(Clone)] pub struct ReadContext { - arrow_schema: SchemaRef, - projected_fields: Option>, - projection_pushdown: bool, - projection_in_order: Option>, + target_schema: SchemaRef, + + projection: Option, +} + +#[derive(Clone)] +struct Projection { + ordered_schema: SchemaRef, + projected_fields: Vec, + + reordering_indexes: Vec, + reordering_needed: bool, } impl ReadContext { pub fn new(arrow_schema: SchemaRef) -> ReadContext { ReadContext { - arrow_schema, - projected_fields: None, - projection_pushdown: false, - projection_in_order: None, + target_schema: arrow_schema, + projection: None, } } pub fn with_projection_pushdown( arrow_schema: SchemaRef, projected_fields: Vec, - projection_in_order: Vec, ) -> ReadContext { + let target_schema = Self::project_schema(arrow_schema.clone(), projected_fields.as_slice()); + let mut sorted_fields = projected_fields.clone(); + sorted_fields.sort_unstable(); + + let project = { + if !sorted_fields.eq(&projected_fields) { + // reordering is required + // Calculate reordering indexes to transform from sorted order to user-requested order + let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); + for &original_idx in &projected_fields { + let pos = sorted_fields + .binary_search(&original_idx) + .expect("projection index should exist in sorted list"); + reordering_indexes.push(pos); + } + Projection { + ordered_schema: Self::project_schema( + arrow_schema.clone(), + sorted_fields.as_slice(), + ), + projected_fields, + reordering_indexes, + reordering_needed: true, + } + } else { + Projection { + ordered_schema: Self::project_schema(arrow_schema, projected_fields.as_slice()), + projected_fields, + reordering_indexes: vec![], + reordering_needed: false, + } + } + }; + ReadContext { - arrow_schema, - projected_fields: Some(projected_fields), - projection_pushdown: true, - projection_in_order: Some(projection_in_order), + target_schema, + projection: Some(project), } } - pub fn is_projection_pushdown(&self) -> bool { - self.projection_pushdown + pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> SchemaRef { + // todo: handle the exception + SchemaRef::new( + schema + .project(projected_fields) + .expect("can't project schema"), + ) } - pub fn projection_in_order(&self) -> Option<&[usize]> { - self.projection_in_order.as_deref() + pub fn project_fields(&self) -> Option<&[usize]> { + self.projection + .as_ref() + .map(|p| p.projected_fields.as_slice()) } - pub fn reordering_indexes(&self) -> Option> { - if !self.projection_pushdown { - return None; - } - - let projected_fields = match &self.projected_fields { - Some(fields) => fields, - None => return None, + pub fn record_batch(&self, data: &[u8]) -> Result> { + let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { + Some(result) => result, + None => return Ok(None), }; - let projection_in_order = match &self.projection_in_order { - Some(order) => order, - None => return None, + // the record batch from server must be ordered by field pos, + // according to project to decide what arrow schema to use + // to parse the record batch + let resolve_schema = match self.projection { + Some(ref projection) => { + // projection, should use ordered schema by project field pos + projection.ordered_schema.clone() + } + None => { + // no projection, use target output schema + self.target_schema.clone() + } }; - if projected_fields.is_empty() { - return None; - } - - // Calculate reordering indexes to transform from sorted order to user-requested order - let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); - for &original_idx in projected_fields { - let pos = projection_in_order - .binary_search(&original_idx) - .expect("projection index should exist in sorted list"); - reordering_indexes.push(pos); - } + let record_batch = read_record_batch( + &body_buffer, + batch_metadata, + resolve_schema, + &std::collections::HashMap::new(), + None, + &version, + )?; - Some(reordering_indexes) + let record_batch = match &self.projection { + Some(projection) if projection.reordering_needed => { + // Reorder columns if needed (when projection pushdown with non-sorted order) + let reordered_columns: Vec<_> = projection + .reordering_indexes + .iter() + .map(|&idx| record_batch.column(idx).clone()) + .collect(); + RecordBatch::try_new(self.target_schema.clone(), reordered_columns)? + } + _ => record_batch, + }; + Ok(Some(record_batch)) } } From bb53f97d5d1f8012876bfc49ba383875e6487d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 4 Dec 2025 09:27:20 +0800 Subject: [PATCH 17/21] fix failed case --- crates/fluss/src/client/table/scanner.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 91036cb..775c388 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -298,7 +298,11 @@ impl LogFetcher { } else { let (projection_enabled, projected_fields) = match self.read_context.project_fields() { None => (false, vec![]), - Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + Some(fields) => { + let mut sorted: Vec = fields.iter().map(|&i| i as i32).collect(); + sorted.sort_unstable(); + (true, sorted) + } }; fetch_log_req_for_buckets From e34a1f862d6438c488d7a22b81864b9c9bd3602f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 4 Dec 2025 09:29:43 +0800 Subject: [PATCH 18/21] add test case change --- crates/fluss/tests/integration/table.rs | 97 +++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 8 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index a1a6cb2..8e92d4f 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -37,10 +37,16 @@ mod table_test { use crate::integration::utils::create_table; use arrow::array::record_batch; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::InternalRow; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::thread; fn before_all() { + let use_local = std::env::var("FLUSS_USE_LOCAL") + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .unwrap_or(false); + // Create a new tokio runtime in a separate thread let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); std::thread::spawn(move || { @@ -53,9 +59,15 @@ mod table_test { }) .join() .expect("Failed to create cluster"); - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); + + if !use_local { + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + thread::sleep(std::time::Duration::from_secs(20)); + } else { + // For local cluster, just wait a short time for connection + thread::sleep(std::time::Duration::from_secs(2)); + } } fn get_fluss_cluster() -> Arc { @@ -83,14 +95,14 @@ mod table_test { } #[tokio::test] - async fn append_record_batch() { + async fn append_record_batch_and_scan() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().await.expect("Failed to get admin"); let table_path = - TablePath::new("fluss".to_string(), "test_append_record_batch".to_string()); + TablePath::new("fluss".to_string(), "test_append_record_batch_and_scan".to_string()); let table_descriptor = TableDescriptor::builder() .schema( @@ -100,15 +112,18 @@ mod table_test { .build() .expect("Failed to build schema"), ) + .property("table.log.arrow.compression.type", "NONE") .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - let append_writer = connection + let table = connection .get_table(&table_path) .await - .expect("Failed to get table") + .expect("Failed to get table"); + + let append_writer = table .new_append() .expect("Failed to create append") .create_writer(); @@ -127,6 +142,72 @@ mod table_test { .await .expect("Failed to append batch"); - // todo: add scan code to verify the records appended in #30 + append_writer.flush().await.expect("Failed to flush"); + + let num_buckets = table.table_info().get_num_buckets(); + let log_scanner = table.new_scan().create_log_scanner(); + for bucket_id in 0..num_buckets { + log_scanner.subscribe(bucket_id, 0).await.expect("Failed to subscribe"); + } + + let scan_records = log_scanner + .poll(std::time::Duration::from_secs(5)) + .await + .expect("Failed to poll"); + + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + + assert_eq!(records.len(), 6, "Should have 6 records"); + for (i, record) in records.iter().enumerate() { + let row = record.row(); + let expected_c1 = (i + 1) as i32; + let expected_c2 = format!("a{}", i + 1); + assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); + assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); + } + + let log_scanner_projected = table + .new_scan() + .project(&[1, 0]) + .expect("Failed to project") + .create_log_scanner(); + for bucket_id in 0..num_buckets { + log_scanner_projected + .subscribe(bucket_id, 0) + .await + .expect("Failed to subscribe"); + } + + let scan_records_projected = log_scanner_projected + .poll(std::time::Duration::from_secs(5)) + .await + .expect("Failed to poll"); + + let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect(); + records_projected.sort_by_key(|r| r.offset()); + + assert_eq!( + records_projected.len(), + 6, + "Should have 6 records with projection" + ); + for (i, record) in records_projected.iter().enumerate() { + let row = record.row(); + let expected_c2 = format!("a{}", i + 1); + let expected_c1 = (i + 1) as i32; + assert_eq!( + row.get_string(0), + expected_c2, + "Projected c2 (first column) mismatch at index {}", + i + ); + assert_eq!( + row.get_int(1), + expected_c1, + "Projected c1 (second column) mismatch at index {}", + i + ); + } } } From 03c9b0366e1acfa1e7ee8ddd9b6809ffcd1ccdba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 4 Dec 2025 10:00:39 +0800 Subject: [PATCH 19/21] address comments --- crates/fluss/tests/integration/table.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 8e92d4f..5452ad9 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -42,10 +42,6 @@ mod table_test { use std::sync::atomic::AtomicUsize; use std::thread; fn before_all() { - let use_local = std::env::var("FLUSS_USE_LOCAL") - .unwrap_or_else(|_| "false".to_string()) - .parse::() - .unwrap_or(false); // Create a new tokio runtime in a separate thread let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); @@ -60,14 +56,9 @@ mod table_test { .join() .expect("Failed to create cluster"); - if !use_local { - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); - } else { - // For local cluster, just wait a short time for connection - thread::sleep(std::time::Duration::from_secs(2)); - } + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + thread::sleep(std::time::Duration::from_secs(20)); } fn get_fluss_cluster() -> Arc { From afc048adf3cb0798fcdeb847ad4509c4245368c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 4 Dec 2025 10:07:02 +0800 Subject: [PATCH 20/21] add project_fields_in_order --- crates/fluss/src/client/table/scanner.rs | 13 +++++-------- crates/fluss/src/record/arrow.rs | 9 +++++++++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 775c388..13372ef 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -296,14 +296,11 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = match self.read_context.project_fields() { - None => (false, vec![]), - Some(fields) => { - let mut sorted: Vec = fields.iter().map(|&i| i as i32).collect(); - sorted.sort_unstable(); - (true, sorted) - } - }; + let (projection_enabled, projected_fields) = + match self.read_context.project_fields_in_order() { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + }; fetch_log_req_for_buckets .into_iter() diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 0d0e8bd..29bfe41 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -613,6 +613,7 @@ pub struct ReadContext { struct Projection { ordered_schema: SchemaRef, projected_fields: Vec, + ordered_fields: Vec, reordering_indexes: Vec, reordering_needed: bool, @@ -651,12 +652,14 @@ impl ReadContext { sorted_fields.as_slice(), ), projected_fields, + ordered_fields: sorted_fields, reordering_indexes, reordering_needed: true, } } else { Projection { ordered_schema: Self::project_schema(arrow_schema, projected_fields.as_slice()), + ordered_fields: projected_fields.clone(), projected_fields, reordering_indexes: vec![], reordering_needed: false, @@ -685,6 +688,12 @@ impl ReadContext { .map(|p| p.projected_fields.as_slice()) } + pub fn project_fields_in_order(&self) -> Option<&[usize]> { + self.projection + .as_ref() + .map(|p| p.ordered_fields.as_slice()) + } + pub fn record_batch(&self, data: &[u8]) -> Result> { let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { Some(result) => result, From c3981610d176ea8f006a4962455a845ee8318168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E6=B5=B7=E6=BA=90?= Date: Thu, 4 Dec 2025 10:50:57 +0800 Subject: [PATCH 21/21] fix fmt check --- crates/fluss/tests/integration/table.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 6fd8ffc..e14b852 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -42,7 +42,6 @@ mod table_test { use std::sync::atomic::AtomicUsize; use std::thread; fn before_all() { - // Create a new tokio runtime in a separate thread let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); std::thread::spawn(move || { @@ -55,7 +54,7 @@ mod table_test { }) .join() .expect("Failed to create cluster"); - + // wait for 20 seconds to avoid the error like // CoordinatorEventProcessor is not initialized yet thread::sleep(std::time::Duration::from_secs(20)); @@ -92,8 +91,10 @@ mod table_test { let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = - TablePath::new("fluss".to_string(), "test_append_record_batch_and_scan".to_string()); + let table_path = TablePath::new( + "fluss".to_string(), + "test_append_record_batch_and_scan".to_string(), + ); let table_descriptor = TableDescriptor::builder() .schema( @@ -138,7 +139,10 @@ mod table_test { let num_buckets = table.table_info().get_num_buckets(); let log_scanner = table.new_scan().create_log_scanner(); for bucket_id in 0..num_buckets { - log_scanner.subscribe(bucket_id, 0).await.expect("Failed to subscribe"); + log_scanner + .subscribe(bucket_id, 0) + .await + .expect("Failed to subscribe"); } let scan_records = log_scanner