diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 82d864f..dab85b6 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,8 +27,6 @@ version = { workspace = true } fluss = { workspace = true } tokio = { workspace = true } clap = { workspace = true} - - [[example]] name = "example-table" path = "src/example_table.rs" \ No newline at end of file diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e1ab59f..13372ef 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -17,12 +17,13 @@ use crate::client::connection::FlussConnection; use crate::client::metadata::Metadata; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::RpcClient; use crate::util::FairBucketStatusMap; +use arrow_schema::SchemaRef; use parking_lot::RwLock; use std::collections::HashMap; use std::slice::from_ref; @@ -39,6 +40,8 @@ pub struct TableScan<'a> { conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc, + /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). + projected_fields: Option>, } impl<'a> TableScan<'a> { @@ -47,14 +50,82 @@ impl<'a> TableScan<'a> { conn, table_info, metadata, + projected_fields: None, } } - pub fn create_log_scanner(&self) -> LogScanner { + /// Projects the scan to only include specified columns by their indices. + /// + /// # Arguments + /// * `column_indices` - Zero-based indices of columns to include in the scan + /// + /// # Errors + /// Returns an error if `column_indices` is empty or if any column index is out of range. + /// + /// # Example + /// ``` + /// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner(); + /// ``` + pub fn project(mut self, column_indices: &[usize]) -> Result { + if column_indices.is_empty() { + return Err(Error::IllegalArgument( + "Column indices cannot be empty".to_string(), + )); + } + let field_count = self.table_info.row_type().fields().len(); + for &idx in column_indices { + if idx >= field_count { + return Err(Error::IllegalArgument(format!( + "Column index {} out of range (max: {})", + idx, + field_count - 1 + ))); + } + } + self.projected_fields = Some(column_indices.to_vec()); + Ok(self) + } + + /// Projects the scan to only include specified columns by their names. + /// + /// # Arguments + /// * `column_names` - Names of columns to include in the scan + /// + /// # Errors + /// Returns an error if `column_names` is empty or if any column name is not found in the table schema. + /// + /// # Example + /// ``` + /// let scanner = table.new_scan().project_by_name(&["col1", "col3"])?.create_log_scanner(); + /// ``` + pub fn project_by_name(mut self, column_names: &[&str]) -> Result { + if column_names.is_empty() { + return Err(Error::IllegalArgument( + "Column names cannot be empty".to_string(), + )); + } + let row_type = self.table_info.row_type(); + let mut indices = Vec::new(); + + for name in column_names { + let idx = row_type + .fields() + .iter() + .position(|f| f.name() == *name) + .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?; + indices.push(idx); + } + + self.projected_fields = Some(indices); + Ok(self) + } + + pub fn create_log_scanner(self) -> LogScanner { LogScanner::new( &self.table_info, self.metadata.clone(), self.conn.get_connections(), + self.projected_fields, ) } } @@ -72,6 +143,7 @@ impl LogScanner { table_info: &TableInfo, metadata: Arc, connections: Arc, + projected_fields: Option>, ) -> Self { let log_scanner_status = Arc::new(LogScannerStatus::new()); Self { @@ -84,6 +156,7 @@ impl LogScanner { connections.clone(), metadata.clone(), log_scanner_status.clone(), + projected_fields, ), } } @@ -114,6 +187,7 @@ struct LogFetcher { table_info: TableInfo, metadata: Arc, log_scanner_status: Arc, + read_context: ReadContext, } impl LogFetcher { @@ -122,13 +196,27 @@ impl LogFetcher { conns: Arc, metadata: Arc, log_scanner_status: Arc, + projected_fields: Option>, ) -> Self { + let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); + let read_context = Self::create_read_context(full_arrow_schema, projected_fields); LogFetcher { table_path: table_info.table_path.clone(), - conns: conns.clone(), - table_info: table_info.clone(), - metadata: metadata.clone(), - log_scanner_status: log_scanner_status.clone(), + conns, + table_info, + metadata, + log_scanner_status, + read_context, + } + } + + fn create_read_context( + full_arrow_schema: SchemaRef, + projected_fields: Option>, + ) -> ReadContext { + match projected_fields { + None => ReadContext::new(full_arrow_schema), + Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, fields), } } @@ -149,7 +237,7 @@ impl LogFetcher { for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; - let arrow_schema = to_arrow_schema(self.table_info.get_row_type()); + for fetch_log_for_bucket in fetch_log_for_buckets { let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; @@ -158,8 +246,7 @@ impl LogFetcher { let data = fetch_log_for_bucket.records.unwrap(); for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); - fetch_records - .extend(log_record.records(ReadContext::new(arrow_schema.clone()))); + fetch_records.extend(log_record.records(&self.read_context)?); self.log_scanner_status .update_offset(&table_bucket, last_offset + 1); } @@ -209,13 +296,19 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { + let (projection_enabled, projected_fields) = + match self.read_context.project_fields_in_order() { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + }; + fetch_log_req_for_buckets .into_iter() .map(|(leader_id, feq_for_buckets)| { let req_for_table = PbFetchLogReqForTable { table_id: table_id.unwrap(), - projection_pushdown_enabled: false, - projected_fields: vec![], + projection_pushdown_enabled: projection_enabled, + projected_fields: projected_fields.clone(), buckets_req: feq_for_buckets, }; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 487f50c..29bfe41 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -27,7 +27,12 @@ use arrow::array::{ }; use arrow::{ array::RecordBatch, - ipc::{reader::StreamReader, writer::StreamWriter}, + buffer::Buffer, + ipc::{ + reader::{StreamReader, read_record_batch}, + root_as_message, + writer::StreamWriter, + }, }; use arrow_schema::SchemaRef; use arrow_schema::{DataType as ArrowDataType, Field}; @@ -472,41 +477,84 @@ impl<'a> LogRecordBatch<'a> { LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH]) } - pub fn records(&self, read_context: ReadContext) -> LogRecordIterator { - let count = self.record_count(); - if count == 0 { - return LogRecordIterator::empty(); + pub fn records(&self, read_context: &ReadContext) -> Result { + if self.record_count() == 0 { + return Ok(LogRecordIterator::empty()); } - // get arrow_metadata - let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap(); - // arrow_batch_data let data = &self.data[RECORDS_OFFSET..]; - // need to combine arrow_metadata_bytes + arrow_batch_data - let cursor = Cursor::new([&arrow_metadata_bytes, data].concat()); - let mut stream_reader = StreamReader::try_new(cursor, None).unwrap(); - - let mut record_batch = None; - if let Some(bath) = stream_reader.next() { - record_batch = Some(bath.unwrap()); - } - - if record_batch.is_none() { - return LogRecordIterator::empty(); - } - - let arrow_reader = ArrowReader::new(Arc::new(record_batch.unwrap())); - LogRecordIterator::Arrow(ArrowLogRecordIterator { - reader: arrow_reader, - base_offset: self.base_log_offset(), - timestamp: self.commit_timestamp(), - row_id: 0, - change_type: ChangeType::AppendOnly, - }) + let record_batch = read_context.record_batch(data)?; + let log_record_iterator = match record_batch { + None => LogRecordIterator::empty(), + Some(record_batch) => { + let arrow_reader = ArrowReader::new(Arc::new(record_batch)); + LogRecordIterator::Arrow(ArrowLogRecordIterator { + reader: arrow_reader, + base_offset: self.base_log_offset(), + timestamp: self.commit_timestamp(), + row_id: 0, + change_type: ChangeType::AppendOnly, + }) + } + }; + Ok(log_record_iterator) } } +/// Parse an Arrow IPC message from a byte slice. +/// +/// Server returns RecordBatch message (without Schema message) in the encapsulated message format. +/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 bytes][RecordBatch metadata][body] +/// +/// This format is documented at: +/// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format +/// +/// # Arguments +/// * `data` - The byte slice containing the IPC message. +/// +/// # Returns +/// Returns `Some((batch_metadata, body_buffer, version))` on success: +/// - `batch_metadata`: The RecordBatch metadata from the IPC message. +/// - `body_buffer`: The buffer containing the record batch body data. +/// - `version`: The Arrow IPC metadata version. +/// +/// Returns `None` if the data is malformed or too short. +fn parse_ipc_message( + data: &[u8], +) -> Option<( + arrow::ipc::RecordBatch<'_>, + Buffer, + arrow::ipc::MetadataVersion, +)> { + const CONTINUATION_MARKER: u32 = 0xFFFFFFFF; + + if data.len() < 8 { + return None; + } + + let continuation = LittleEndian::read_u32(&data[0..4]); + let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize; + + if continuation != CONTINUATION_MARKER { + return None; + } + + if data.len() < 8 + metadata_size { + return None; + } + + let metadata_bytes = &data[8..8 + metadata_size]; + let message = root_as_message(metadata_bytes).ok()?; + let batch_metadata = message.header_as_record_batch()?; + + let body_start = 8 + metadata_size; + let body_data = &data[body_start..]; + let body_buffer = Buffer::from(body_data); + + Some((batch_metadata, body_buffer, message.version())) +} + pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef { match &fluss_schema { DataType::Row(row_type) => { @@ -554,19 +602,140 @@ pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType { } } +#[derive(Clone)] pub struct ReadContext { - arrow_schema: SchemaRef, + target_schema: SchemaRef, + + projection: Option, +} + +#[derive(Clone)] +struct Projection { + ordered_schema: SchemaRef, + projected_fields: Vec, + ordered_fields: Vec, + + reordering_indexes: Vec, + reordering_needed: bool, } impl ReadContext { pub fn new(arrow_schema: SchemaRef) -> ReadContext { - ReadContext { arrow_schema } + ReadContext { + target_schema: arrow_schema, + projection: None, + } } - pub fn to_arrow_metadata(&self) -> Result> { - let mut arrow_schema_bytes = vec![]; - let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, &self.arrow_schema)?; - Ok(arrow_schema_bytes) + pub fn with_projection_pushdown( + arrow_schema: SchemaRef, + projected_fields: Vec, + ) -> ReadContext { + let target_schema = Self::project_schema(arrow_schema.clone(), projected_fields.as_slice()); + let mut sorted_fields = projected_fields.clone(); + sorted_fields.sort_unstable(); + + let project = { + if !sorted_fields.eq(&projected_fields) { + // reordering is required + // Calculate reordering indexes to transform from sorted order to user-requested order + let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); + for &original_idx in &projected_fields { + let pos = sorted_fields + .binary_search(&original_idx) + .expect("projection index should exist in sorted list"); + reordering_indexes.push(pos); + } + Projection { + ordered_schema: Self::project_schema( + arrow_schema.clone(), + sorted_fields.as_slice(), + ), + projected_fields, + ordered_fields: sorted_fields, + reordering_indexes, + reordering_needed: true, + } + } else { + Projection { + ordered_schema: Self::project_schema(arrow_schema, projected_fields.as_slice()), + ordered_fields: projected_fields.clone(), + projected_fields, + reordering_indexes: vec![], + reordering_needed: false, + } + } + }; + + ReadContext { + target_schema, + projection: Some(project), + } + } + + pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> SchemaRef { + // todo: handle the exception + SchemaRef::new( + schema + .project(projected_fields) + .expect("can't project schema"), + ) + } + + pub fn project_fields(&self) -> Option<&[usize]> { + self.projection + .as_ref() + .map(|p| p.projected_fields.as_slice()) + } + + pub fn project_fields_in_order(&self) -> Option<&[usize]> { + self.projection + .as_ref() + .map(|p| p.ordered_fields.as_slice()) + } + + pub fn record_batch(&self, data: &[u8]) -> Result> { + let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { + Some(result) => result, + None => return Ok(None), + }; + + // the record batch from server must be ordered by field pos, + // according to project to decide what arrow schema to use + // to parse the record batch + let resolve_schema = match self.projection { + Some(ref projection) => { + // projection, should use ordered schema by project field pos + projection.ordered_schema.clone() + } + None => { + // no projection, use target output schema + self.target_schema.clone() + } + }; + + let record_batch = read_record_batch( + &body_buffer, + batch_metadata, + resolve_schema, + &std::collections::HashMap::new(), + None, + &version, + )?; + + let record_batch = match &self.projection { + Some(projection) if projection.reordering_needed => { + // Reorder columns if needed (when projection pushdown with non-sorted order) + let reordered_columns: Vec<_> = projection + .reordering_indexes + .iter() + .map(|&idx| record_batch.column(idx).clone()) + .collect(); + RecordBatch::try_new(self.target_schema.clone(), reordered_columns)? + } + _ => record_batch, + }; + Ok(Some(record_batch)) } } diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index aa02724..e14b852 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -54,6 +54,7 @@ mod table_test { }) .join() .expect("Failed to create cluster"); + // wait for 20 seconds to avoid the error like // CoordinatorEventProcessor is not initialized yet thread::sleep(std::time::Duration::from_secs(20)); @@ -84,14 +85,16 @@ mod table_test { } #[tokio::test] - async fn append_record_batch() { + async fn append_record_batch_and_scan() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = - TablePath::new("fluss".to_string(), "test_append_record_batch".to_string()); + let table_path = TablePath::new( + "fluss".to_string(), + "test_append_record_batch_and_scan".to_string(), + ); let table_descriptor = TableDescriptor::builder() .schema( @@ -101,15 +104,18 @@ mod table_test { .build() .expect("Failed to build schema"), ) + .property("table.log.arrow.compression.type", "NONE") .build() .expect("Failed to build table"); create_table(&admin, &table_path, &table_descriptor).await; - let append_writer = connection + let table = connection .get_table(&table_path) .await - .expect("Failed to get table") + .expect("Failed to get table"); + + let append_writer = table .new_append() .expect("Failed to create append") .create_writer(); @@ -128,6 +134,77 @@ mod table_test { .await .expect("Failed to append batch"); + append_writer.flush().await.expect("Failed to flush"); + + let num_buckets = table.table_info().get_num_buckets(); + let log_scanner = table.new_scan().create_log_scanner(); + for bucket_id in 0..num_buckets { + log_scanner + .subscribe(bucket_id, 0) + .await + .expect("Failed to subscribe"); + } + + let scan_records = log_scanner + .poll(std::time::Duration::from_secs(5)) + .await + .expect("Failed to poll"); + + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + + assert_eq!(records.len(), 6, "Should have 6 records"); + for (i, record) in records.iter().enumerate() { + let row = record.row(); + let expected_c1 = (i + 1) as i32; + let expected_c2 = format!("a{}", i + 1); + assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); + assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); + } + + let log_scanner_projected = table + .new_scan() + .project(&[1, 0]) + .expect("Failed to project") + .create_log_scanner(); + for bucket_id in 0..num_buckets { + log_scanner_projected + .subscribe(bucket_id, 0) + .await + .expect("Failed to subscribe"); + } + + let scan_records_projected = log_scanner_projected + .poll(std::time::Duration::from_secs(5)) + .await + .expect("Failed to poll"); + + let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect(); + records_projected.sort_by_key(|r| r.offset()); + + assert_eq!( + records_projected.len(), + 6, + "Should have 6 records with projection" + ); + for (i, record) in records_projected.iter().enumerate() { + let row = record.row(); + let expected_c2 = format!("a{}", i + 1); + let expected_c1 = (i + 1) as i32; + assert_eq!( + row.get_string(0), + expected_c2, + "Projected c2 (first column) mismatch at index {}", + i + ); + assert_eq!( + row.get_int(1), + expected_c1, + "Projected c1 (second column) mismatch at index {}", + i + ); + } + // Create scanner to verify appended records let table = connection .get_table(&table_path)