diff --git a/crates/fluss/src/client/table/batch_scanner.rs b/crates/fluss/src/client/table/batch_scanner.rs new file mode 100644 index 00000000..cc0585f3 --- /dev/null +++ b/crates/fluss/src/client/table/batch_scanner.rs @@ -0,0 +1,767 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bounded batch scanner backed by a single `LimitScanRequest`, polled with +//! `next_batch` until it returns `None` (like `RecordBatchLogReader`). +//! +//! The KV branch decodes a [`ValueRecordBatch`], decoding each record against +//! its own schema id via [`FixedSchemaDecoder`] so older records are projected +//! onto the current schema (the same path as lookup). + +use crate::client::ClientSchemaGetter; +use crate::client::metadata::Metadata; +use crate::error::{ApiError, Error, FlussError, Result}; +use crate::metadata::{KvFormat, RowType, Schema, TableBucket, TableInfo}; +use crate::proto::ErrorResponse; +use crate::record::kv::{SCHEMA_ID_LENGTH, ValueRecordBatch}; +use crate::record::{ + LogRecordsBatches, ReadContext as ArrowReadContext, RowAppendRecordBatchBuilder, ScanBatch, + to_arrow_schema, +}; +use crate::row::FixedSchemaDecoder; +use crate::rpc::RpcClient; +use crate::rpc::message::LimitScanRequest; +use arrow::array::RecordBatch; +use arrow::compute::concat_batches; +use arrow_schema::SchemaRef; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::Bytes; +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +/// One-shot bounded scanner: a single `LimitScanRequest` yielded as one +/// [`ScanBatch`]. Creation is cheap; the request runs on the first +/// [`next_batch`](Self::next_batch), which returns the batch once, then `None`. +pub struct LimitBatchScanner { + bucket: TableBucket, + /// Taken on the first `next_batch` to run the scan; `None` afterward. + pending: Option, +} + +/// Request inputs captured at creation, consumed by the first `next_batch`. +struct PendingScan { + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + limit: i32, +} + +impl LimitBatchScanner { + pub(super) fn new( + rpc_client: Arc, + metadata: Arc, + table_info: TableInfo, + schema_getter: Arc, + projected_fields: Option>, + bucket: TableBucket, + limit: i32, + ) -> Self { + Self { + bucket, + pending: Some(PendingScan { + rpc_client, + metadata, + table_info, + schema_getter, + projected_fields, + limit, + }), + } + } + + /// Runs the scan on the first call and returns its batch, then `None`. Not + /// retried — an error leaves the scanner spent; create a new one to retry. + pub async fn next_batch(&mut self) -> Result> { + let Some(pending) = self.pending.take() else { + return Ok(None); + }; + run_limit_scan(&pending, &self.bucket).await.map(Some) + } + + /// Drains the scanner into all of its batches. + pub async fn collect_all_batches(&mut self) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = self.next_batch().await? { + batches.push(batch); + } + Ok(batches) + } + + /// The bucket scanned by this `LimitBatchScanner`. + pub fn bucket(&self) -> &TableBucket { + &self.bucket + } +} + +/// Resolves the leader, sends the `LimitScanRequest`, and decodes the response +/// into one [`ScanBatch`]. +async fn run_limit_scan(pending: &PendingScan, bucket: &TableBucket) -> Result { + let leader = pending + .metadata + .leader_for(&pending.table_info.table_path, bucket) + .await? + .ok_or_else(|| { + Error::leader_not_available(format!("No leader found for table bucket: {bucket}")) + })?; + let connection = pending.rpc_client.get_connection(&leader).await?; + + let request = LimitScanRequest::new( + pending.table_info.table_id, + bucket.partition_id(), + bucket.bucket_id(), + pending.limit, + ); + let response = connection.request(request).await?; + + if let Some(error_code) = response.error_code + && error_code != FlussError::None.code() + { + let err: ApiError = ErrorResponse { + error_code, + error_message: response.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error: err }); + } + + let raw = response.records.unwrap_or_default(); + // `limit` is validated positive by `TableScan::limit`. + let limit = pending.limit.max(0) as usize; + let projected = pending.projected_fields.as_deref(); + + // Choose the payload format from table metadata, not the response's advisory + // `is_log_table` flag. + let (batch, base_offset) = if !pending.table_info.has_primary_key() { + decode_log_batch(&pending.table_info, projected, raw, limit)? + } else { + // KV (primary-key) limit scan: no log offset, so base_offset is 0. + let batch = decode_kv_batch( + &pending.table_info, + &pending.schema_getter, + projected, + raw, + limit, + ) + .await?; + (batch, 0) + }; + + Ok(ScanBatch::new(bucket.clone(), batch, base_offset)) +} + +/// Decode the log payload into a single Arrow `RecordBatch`, concatenating any +/// inner batches. If more than `limit` rows are returned, the last `limit` are +/// kept and `base_offset` is advanced by the number dropped. +fn decode_log_batch( + table_info: &TableInfo, + projected_fields: Option<&[usize]>, + raw: Vec, + limit: usize, +) -> Result<(RecordBatch, i64)> { + let row_type = Arc::new(table_info.get_row_type().clone()); + let full_schema = to_arrow_schema(table_info.get_row_type())?; + let read_context = match projected_fields { + None => ArrowReadContext::new(full_schema.clone(), row_type.clone(), false), + Some(fields) => ArrowReadContext::with_projection_pushdown( + full_schema.clone(), + row_type.clone(), + fields.to_vec(), + false, + )?, + }; + + let target_schema: SchemaRef = match projected_fields { + None => full_schema, + Some(fields) => { + ArrowReadContext::project_schema(to_arrow_schema(table_info.get_row_type())?, fields)? + } + }; + + if raw.is_empty() { + return Ok((RecordBatch::new_empty(target_schema), 0)); + } + + let mut batches: Vec = Vec::new(); + let mut base_offset: Option = None; + for log_batch in LogRecordsBatches::new(raw) { + let log_batch = log_batch?; + if base_offset.is_none() { + base_offset = Some(log_batch.base_log_offset()); + } + let rb = log_batch.record_batch(&read_context)?; + batches.push(rb); + } + + let base_offset = base_offset.unwrap_or(0); + let merged = if batches.is_empty() { + RecordBatch::new_empty(target_schema) + } else if batches.len() == 1 { + batches.into_iter().next().unwrap() + } else { + concat_batches(&target_schema, batches.iter()).map_err(|e| Error::UnexpectedError { + message: format!("Failed to concatenate log record batches: {e}"), + source: None, + })? + }; + + Ok(take_last_rows(merged, base_offset, limit)) +} + +/// Decode a KV limit-scan [`ValueRecordBatch`] into a single Arrow +/// `RecordBatch`, decoding each record by its own schema id and projecting onto +/// the current schema. +async fn decode_kv_batch( + table_info: &TableInfo, + schema_getter: &ClientSchemaGetter, + projected_fields: Option<&[usize]>, + raw: Vec, + limit: usize, +) -> Result { + // No records: return an empty (projected) batch. + if raw.is_empty() { + return empty_record_batch(table_info.get_row_type(), projected_fields); + } + + let kv_format = table_info.table_config.get_kv_format()?; + let target_schema = table_info.get_schema(); + let target_schema_id = + i16::try_from(table_info.get_schema_id()).map_err(|_| Error::UnexpectedError { + message: format!( + "Schema id {} does not fit in 16 bits — wire format violated", + table_info.get_schema_id() + ), + source: None, + })?; + + let batch = ValueRecordBatch::new(Bytes::from(raw)); + let ranges = batch.value_ranges()?; + + // Collect the distinct schema ids present, then build one decoder per id + // (fetching older schemas via the coordinator as needed). + let mut schema_ids: Vec = Vec::new(); + for range in &ranges { + let id = read_schema_id(&batch.data()[range.clone()])?; + if !schema_ids.contains(&id) { + schema_ids.push(id); + } + } + let decoders = build_kv_decoders( + schema_getter, + target_schema, + target_schema_id, + kv_format, + &schema_ids, + ) + .await?; + + value_records_to_record_batch( + &batch, + &ranges, + &decoders, + table_info.get_row_type(), + projected_fields, + limit, + ) +} + +/// Build one [`FixedSchemaDecoder`] per distinct schema id. The current schema +/// decodes without projection; older schemas are fetched and projected onto the +/// current schema. +async fn build_kv_decoders( + schema_getter: &ClientSchemaGetter, + target_schema: &Schema, + target_schema_id: i16, + kv_format: KvFormat, + schema_ids: &[i16], +) -> Result> { + let mut decoders = HashMap::with_capacity(schema_ids.len()); + for &id in schema_ids { + if decoders.contains_key(&id) { + continue; + } + let decoder = if id == target_schema_id { + FixedSchemaDecoder::new_no_projection(kv_format, target_schema)? + } else { + let source = schema_getter.get_schema(id as i32).await?; + FixedSchemaDecoder::new(kv_format, source.as_ref(), target_schema)? + }; + decoders.insert(id, decoder); + } + Ok(decoders) +} + +/// Decode every value record into a row shaped by `target_row_type`, build a +/// single Arrow batch, keep the last `limit` rows, then apply column projection. +fn value_records_to_record_batch( + batch: &ValueRecordBatch, + ranges: &[Range], + decoders: &HashMap, + target_row_type: &RowType, + projected_fields: Option<&[usize]>, + limit: usize, +) -> Result { + let mut builder = RowAppendRecordBatchBuilder::new(target_row_type)?; + for range in ranges { + let payload = &batch.data()[range.clone()]; + let schema_id = read_schema_id(payload)?; + let decoder = decoders + .get(&schema_id) + .ok_or_else(|| Error::UnexpectedError { + message: format!("No decoder built for schema id {schema_id}"), + source: None, + })?; + let row = decoder.decode(payload)?; + builder.append(&row)?; + } + + let full = Arc::unwrap_or_clone(builder.build_arrow_record_batch()?); + let (full, _) = take_last_rows(full, 0, limit); + project_batch(full, target_row_type, projected_fields) +} + +/// Read the leading little-endian schema id from a `[schema_id | row]` payload. +fn read_schema_id(payload: &[u8]) -> Result { + if payload.len() < SCHEMA_ID_LENGTH { + return Err(Error::UnexpectedError { + message: format!( + "Value record payload too short: {} bytes, need {} for schema id", + payload.len(), + SCHEMA_ID_LENGTH + ), + source: None, + }); + } + let schema_id = LittleEndian::read_i16(&payload[..SCHEMA_ID_LENGTH]); + if schema_id < 0 { + return Err(Error::UnexpectedError { + message: format!("Invalid negative schema id {schema_id}; payload is corrupt"), + source: None, + }); + } + Ok(schema_id) +} + +/// Keep the last `limit` rows of `batch`, advancing `base_offset` by the number +/// of dropped leading rows. A `batch` at or under the limit is returned as-is. +fn take_last_rows(batch: RecordBatch, base_offset: i64, limit: usize) -> (RecordBatch, i64) { + let rows = batch.num_rows(); + if rows > limit { + let dropped = rows - limit; + (batch.slice(dropped, limit), base_offset + dropped as i64) + } else { + (batch, base_offset) + } +} + +/// An empty `RecordBatch` with the (optionally projected) target schema. +fn empty_record_batch( + target_row_type: &RowType, + projected_fields: Option<&[usize]>, +) -> Result { + let empty = RecordBatch::new_empty(to_arrow_schema(target_row_type)?); + project_batch(empty, target_row_type, projected_fields) +} + +/// Project `batch` (shaped by `target_row_type`) onto the requested columns. +fn project_batch( + batch: RecordBatch, + target_row_type: &RowType, + projected_fields: Option<&[usize]>, +) -> Result { + match projected_fields { + None => Ok(batch), + Some(fields) => { + let projected_schema = + ArrowReadContext::project_schema(to_arrow_schema(target_row_type)?, fields)?; + let columns: Vec<_> = fields + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect(); + Ok(RecordBatch::try_new(projected_schema, columns)?) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, + DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{ + Column, DataField, DataType, DataTypes, PhysicalTablePath, Schema, TableDescriptor, + TableInfo, TablePath, + }; + use crate::record::MemoryLogRecordsArrowBuilder; + use crate::row::GenericRow; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::CompactedRowWriter; + use arrow::array::{Array, Int32Array, Int64Array}; + + fn build_two_col_table_info() -> TableInfo { + let row_type = DataTypes::row(vec![ + DataField::new("id", DataTypes::int(), None), + DataField::new("name", DataTypes::string(), None), + ]); + let schema = Schema::builder() + .with_row_type(&row_type) + .build() + .expect("schema build"); + let descriptor = TableDescriptor::builder() + .schema(schema) + .distributed_by(Some(1), vec![]) + .build() + .expect("descriptor build"); + TableInfo::of( + TablePath::new("db".to_string(), "tbl".to_string()), + 42, + 1, + descriptor, + 0, + 0, + ) + } + + fn build_log_records( + table_info: &TableInfo, + base_offset: i64, + rows: &[(i32, &str)], + ) -> Vec { + let row_type = table_info.get_row_type(); + let table_path = table_info.table_path.clone(); + let table_info_arc = Arc::new(table_info.clone()); + let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), + ) + .expect("builder"); + + for (i, (id, name)) in rows.iter().enumerate() { + let mut row = GenericRow::new(2); + row.set_field(0, *id); + row.set_field(1, *name); + let record = WriteRecord::for_append( + Arc::clone(&table_info_arc), + physical.clone(), + (i + 1) as i32, + &row, + ); + builder.append(&record).expect("append"); + } + let mut data = builder.build().expect("build log batch"); + // Builder always writes base_log_offset=0; patch it so tests can verify + // BatchScanner faithfully propagates whatever offset the server returned. + let bytes = base_offset.to_le_bytes(); + data[..bytes.len()].copy_from_slice(&bytes); + data + } + + // ---- log path ---------------------------------------------------------- + + #[test] + fn decode_log_batch_empty_returns_empty_record_batch() { + let table_info = build_two_col_table_info(); + let (batch, base_offset) = + decode_log_batch(&table_info, None, Vec::new(), usize::MAX).expect("decode empty"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 2); + assert_eq!(base_offset, 0); + } + + #[test] + fn decode_log_batch_empty_with_projection() { + let table_info = build_two_col_table_info(); + let (batch, base_offset) = + decode_log_batch(&table_info, Some(&[1usize]), Vec::new(), usize::MAX) + .expect("decode empty"); + assert_eq!(batch.num_rows(), 0); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "name"); + assert_eq!(base_offset, 0); + } + + #[test] + fn decode_log_batch_extracts_base_offset_and_rows() { + let table_info = build_two_col_table_info(); + let raw = build_log_records(&table_info, 17, &[(1, "alice"), (2, "bob"), (3, "carol")]); + + let (batch, base_offset) = + decode_log_batch(&table_info, None, raw, usize::MAX).expect("decode populated"); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + assert_eq!(base_offset, 17); + } + + #[test] + fn decode_log_batch_projection_keeps_requested_columns() { + let table_info = build_two_col_table_info(); + let raw = build_log_records(&table_info, 0, &[(7, "x"), (8, "y")]); + + let (batch, _) = decode_log_batch(&table_info, Some(&[0usize]), raw, usize::MAX) + .expect("decode projected"); + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "id"); + } + + #[test] + fn decode_log_batch_truncates_to_last_limit_rows() { + let table_info = build_two_col_table_info(); + // Server returned 4 rows starting at offset 100, but limit is 2. + let raw = build_log_records(&table_info, 100, &[(1, "a"), (2, "b"), (3, "c"), (4, "d")]); + + let (batch, base_offset) = decode_log_batch(&table_info, None, raw, 2).expect("decode"); + assert_eq!(batch.num_rows(), 2); + // The last two rows are kept, so the base offset advances by 2. + assert_eq!(base_offset, 102); + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), 3); + assert_eq!(ids.value(1), 4); + } + + // ---- KV path ----------------------------------------------------------- + + fn schema_with_ids(columns: &[(i32, &str, DataType)]) -> Schema { + let cols: Vec = columns + .iter() + .map(|(id, name, dt)| Column::new(*name, dt.clone()).with_id(*id)) + .collect(); + Schema::builder().with_columns(cols).build().unwrap() + } + + /// Encode a value-record batch from `(schema_id, compacted-row-bytes)` + /// pairs, matching the Java `DefaultValueRecordBatch` wire layout. + fn value_batch(records: &[(i16, Vec)]) -> ValueRecordBatch { + let mut body = Vec::new(); + for (schema_id, row) in records { + let rec_len = (SCHEMA_ID_LENGTH + row.len()) as i32; + body.extend_from_slice(&rec_len.to_le_bytes()); + body.extend_from_slice(&schema_id.to_le_bytes()); + body.extend_from_slice(row); + } + let mut out = Vec::new(); + out.extend_from_slice(&((1 + 4 + body.len()) as i32).to_le_bytes()); // Length + out.push(0); // Magic + out.extend_from_slice(&(records.len() as i32).to_le_bytes()); // RecordCount + out.extend_from_slice(&body); + ValueRecordBatch::new(Bytes::from(out)) + } + + fn compacted(field_count: usize, write: impl FnOnce(&mut CompactedRowWriter)) -> Vec { + let mut w = CompactedRowWriter::new(field_count); + write(&mut w); + w.to_bytes().as_ref().to_vec() + } + + fn id_name_schema() -> Schema { + schema_with_ids(&[ + (0, "id", DataTypes::int()), + (1, "name", DataTypes::string()), + ]) + } + + #[test] + fn value_records_empty_returns_empty_batch() { + let schema = id_name_schema(); + let batch = value_batch(&[]); + let ranges = batch.value_ranges().unwrap(); + let rb = value_records_to_record_batch( + &batch, + &ranges, + &HashMap::new(), + schema.row_type(), + None, + usize::MAX, + ) + .expect("decode empty kv"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 2); + } + + #[test] + fn empty_kv_payload_returns_empty_batch() { + let schema = id_name_schema(); + // Full schema. + let rb = empty_record_batch(schema.row_type(), None).expect("empty"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 2); + // Projected. + let rb = empty_record_batch(schema.row_type(), Some(&[1usize])).expect("empty projected"); + assert_eq!(rb.num_rows(), 0); + assert_eq!(rb.num_columns(), 1); + assert_eq!(rb.schema().field(0).name(), "name"); + } + + #[test] + fn value_records_decode_rows() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let r0 = compacted(2, |w| { + w.write_int(1); + w.write_string("alice"); + }); + let r1 = compacted(2, |w| { + w.write_int(2); + w.write_string("bob"); + }); + let batch = value_batch(&[(0, r0), (0, r1)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + schema.row_type(), + None, + usize::MAX, + ) + .expect("decode kv rows"); + assert_eq!(rb.num_rows(), 2); + let ids = rb.column(0).as_any().downcast_ref::().unwrap(); + assert_eq!(ids.value(0), 1); + assert_eq!(ids.value(1), 2); + } + + #[test] + fn value_records_limit_keeps_last_rows() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let records: Vec<(i16, Vec)> = (1..=5) + .map(|i| { + ( + 0i16, + compacted(2, |w| { + w.write_int(i); + w.write_string("x"); + }), + ) + }) + .collect(); + let batch = value_batch(&records); + let ranges = batch.value_ranges().unwrap(); + + let rb = + value_records_to_record_batch(&batch, &ranges, &decoders, schema.row_type(), None, 3) + .expect("decode kv rows"); + assert_eq!(rb.num_rows(), 3); + let ids = rb.column(0).as_any().downcast_ref::().unwrap(); + // Last 3 of [1,2,3,4,5]. + assert_eq!(ids.values(), &[3, 4, 5]); + } + + #[test] + fn value_records_projection_keeps_requested_columns() { + let schema = id_name_schema(); + let decoder = FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &schema).unwrap(); + let mut decoders = HashMap::new(); + decoders.insert(0i16, decoder); + + let r0 = compacted(2, |w| { + w.write_int(9); + w.write_string("nine"); + }); + let batch = value_batch(&[(0, r0)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + schema.row_type(), + Some(&[1usize]), + usize::MAX, + ) + .expect("decode projected kv"); + assert_eq!(rb.num_columns(), 1); + assert_eq!(rb.schema().field(0).name(), "name"); + } + + #[test] + fn value_records_decode_across_schema_evolution() { + // Source schema (older): [id, name]. Target (current): added `age`. + let source = id_name_schema(); + let target = schema_with_ids(&[ + (0, "id", DataTypes::int()), + (1, "name", DataTypes::string()), + (2, "age", DataTypes::bigint()), + ]); + + let mut decoders = HashMap::new(); + // Records with schema id 0 were written under the old schema. + decoders.insert( + 0i16, + FixedSchemaDecoder::new(KvFormat::COMPACTED, &source, &target).unwrap(), + ); + // Records with schema id 1 carry the current schema. + decoders.insert( + 1i16, + FixedSchemaDecoder::new_no_projection(KvFormat::COMPACTED, &target).unwrap(), + ); + + let old_row = compacted(2, |w| { + w.write_int(1); + w.write_string("alice"); + }); + let new_row = compacted(3, |w| { + w.write_int(2); + w.write_string("bob"); + w.write_long(30); + }); + let batch = value_batch(&[(0, old_row), (1, new_row)]); + let ranges = batch.value_ranges().unwrap(); + + let rb = value_records_to_record_batch( + &batch, + &ranges, + &decoders, + target.row_type(), + None, + usize::MAX, + ) + .expect("decode mixed-schema kv"); + + assert_eq!(rb.num_rows(), 2); + assert_eq!(rb.num_columns(), 3); + let age = rb.column(2).as_any().downcast_ref::().unwrap(); + // Old record has no `age` column -> null; new record carries 30. + assert!(age.is_null(0), "old-schema record must read age as null"); + assert_eq!(age.value(1), 30); + } +} diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index e116bbb4..657a44bf 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -25,6 +25,7 @@ use std::sync::Arc; pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod batch_scanner; mod lookup; mod log_fetch_buffer; @@ -35,6 +36,7 @@ mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; +pub use batch_scanner::LimitBatchScanner; pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup}; pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d7e2dd2c..35cc52e3 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::client::ClientSchemaGetter; use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; +use crate::client::table::batch_scanner::LimitBatchScanner; use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, LogFetchBuffer, RemotePendingFetch, @@ -26,7 +28,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo}; use crate::config::Config; use crate::error::Error::UnsupportedOperation; use crate::error::{ApiError, Error, FlussError, Result}; -use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; +use crate::metadata::{ + LogFormat, PhysicalTablePath, RowType, SchemaInfo, TableBucket, TableInfo, TablePath, +}; use crate::metrics::ScannerMetrics; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, @@ -55,6 +59,8 @@ pub struct TableScan<'a> { metadata: Arc, /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty). projected_fields: Option>, + /// Optional row limit. When set, callers may construct a [`BatchScanner`] for a one-shot bounded scan. + limit: Option, } impl<'a> TableScan<'a> { @@ -64,9 +70,96 @@ impl<'a> TableScan<'a> { table_info, metadata, projected_fields: None, + limit: None, } } + /// Sets a row limit for the scan, enabling [`Self::create_bucket_batch_scanner`]. + /// + /// The limit must be positive. A limit is incompatible with the log + /// scanners, which reject it. + pub fn limit(mut self, n: i32) -> Result { + if n <= 0 { + return Err(Error::IllegalArgument { + message: format!("Scan limit must be positive, got {n}"), + }); + } + self.limit = Some(n); + Ok(self) + } + + /// Log scanners don't support limit pushdown; reject a configured limit + /// rather than silently ignoring it. + fn reject_limit(&self, scanner: &str) -> Result<()> { + if let Some(limit) = self.limit { + return Err(Error::UnsupportedOperation { + message: format!( + "{scanner} doesn't support limit pushdown. Table: {}, requested limit: {limit}", + self.table_info.table_path + ), + }); + } + Ok(()) + } + + /// Creates a one-shot bounded scan of `table_bucket`. + /// + /// Requires a previously-configured limit via [`Self::limit`]. Creation is + /// cheap; the `LimitScanRequest` runs on the first + /// [`LimitBatchScanner::next_batch`]. + pub fn create_bucket_batch_scanner( + self, + table_bucket: TableBucket, + ) -> Result { + let limit = self.limit.ok_or_else(|| Error::IllegalArgument { + message: "create_bucket_batch_scanner requires a limit configured via .limit(n)" + .to_string(), + })?; + if table_bucket.table_id() != self.table_info.table_id { + return Err(Error::IllegalArgument { + message: format!( + "Bucket table_id {} does not match scan table_id {}", + table_bucket.table_id(), + self.table_info.table_id + ), + }); + } + let num_buckets = self.table_info.get_num_buckets(); + if table_bucket.bucket_id() < 0 || table_bucket.bucket_id() >= num_buckets { + return Err(Error::IllegalArgument { + message: format!( + "Bucket id {} out of range for table with {num_buckets} buckets", + table_bucket.bucket_id() + ), + }); + } + // Log tables decode as Arrow IPC, so only ARROW format is supported (KV + // tables use the value-record path and are exempt). + if !self.table_info.has_primary_key() { + validate_scan_support(&self.table_info.table_path, &self.table_info)?; + } + // Pre-seed the current schema; older versions are fetched lazily during + // KV decode. Mirrors `Table::new_lookup`. + let latest = SchemaInfo::new( + self.table_info.get_schema().clone(), + self.table_info.get_schema_id(), + ); + let schema_getter = Arc::new(ClientSchemaGetter::new( + self.table_info.table_path.clone(), + self.conn.get_admin()?, + latest, + )); + Ok(LimitBatchScanner::new( + self.conn.get_connections(), + self.metadata.clone(), + self.table_info, + schema_getter, + self.projected_fields, + table_bucket, + limit, + )) + } + /// Projects the scan to only include specified columns by their indices. /// /// # Arguments @@ -219,6 +312,7 @@ impl<'a> TableScan<'a> { } pub fn create_log_scanner(self) -> Result { + self.reject_limit("LogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; let inner = LogScannerInner::new( &self.table_info, @@ -233,6 +327,7 @@ impl<'a> TableScan<'a> { } pub fn create_record_batch_log_scanner(self) -> Result { + self.reject_limit("RecordBatchLogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; let inner = LogScannerInner::new( &self.table_info, diff --git a/crates/fluss/src/record/kv/mod.rs b/crates/fluss/src/record/kv/mod.rs index 857c5e5f..8f7750a8 100644 --- a/crates/fluss/src/record/kv/mod.rs +++ b/crates/fluss/src/record/kv/mod.rs @@ -22,6 +22,7 @@ mod kv_record_batch; mod kv_record_batch_builder; mod kv_record_read_context; mod read_context; +mod value_record_batch; #[cfg(test)] mod test_util; @@ -31,6 +32,7 @@ pub use kv_record_batch::*; pub use kv_record_batch_builder::*; pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter}; pub use read_context::ReadContext; +pub use value_record_batch::ValueRecordBatch; /// Current KV magic value pub const CURRENT_KV_MAGIC_VALUE: u8 = 0; diff --git a/crates/fluss/src/record/kv/value_record_batch.rs b/crates/fluss/src/record/kv/value_record_batch.rs new file mode 100644 index 00000000..cfcb4a6d --- /dev/null +++ b/crates/fluss/src/record/kv/value_record_batch.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reader for the value-record batch returned by a KV (primary-key) limit +//! scan. This is a distinct wire format from [`super::KvRecordBatch`]: it +//! carries value-only records (no keys, no CRC/writer-id header) and a schema +//! id *per record* rather than per batch. +//! +//! Batch layout (little-endian): +//! - Length => Int32 (size of everything after this field) +//! - Magic => Int8 +//! - RecordCount => Int32 +//! - Records => [ValueRecord] +//! +//! Each `ValueRecord`: +//! - Length => Int32 (size after this field: SchemaId + Value) +//! - SchemaId => Int16 +//! - Value => row bytes +//! +//! Reference: `org.apache.fluss.record.DefaultValueRecordBatch` and +//! `org.apache.fluss.record.DefaultValueRecord`. + +use crate::error::{Error, Result}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::Bytes; +use std::ops::Range; + +const LENGTH_LENGTH: usize = 4; +const MAGIC_LENGTH: usize = 1; +const RECORD_COUNT_LENGTH: usize = 4; +/// Offset of the record count within the batch header. +const RECORD_COUNT_OFFSET: usize = LENGTH_LENGTH + MAGIC_LENGTH; +/// Size of the batch header (`Length + Magic + RecordCount`). +const RECORD_BATCH_HEADER_SIZE: usize = LENGTH_LENGTH + MAGIC_LENGTH + RECORD_COUNT_LENGTH; +/// Size of a `ValueRecord`'s leading length field. +const RECORD_LENGTH_LENGTH: usize = 4; + +/// Read-only view over a serialized value-record batch. +pub struct ValueRecordBatch { + data: Bytes, +} + +impl ValueRecordBatch { + /// Wraps raw batch bytes. The batch is expected to start at offset 0. + pub fn new(data: Bytes) -> Self { + Self { data } + } + + /// Number of records declared in the batch header. + pub fn record_count(&self) -> Result { + if self.data.len() < RECORD_BATCH_HEADER_SIZE { + return Err(corrupt(format!( + "value-record batch too short: {} bytes, need {} for header", + self.data.len(), + RECORD_BATCH_HEADER_SIZE + ))); + } + Ok(LittleEndian::read_i32( + &self.data[RECORD_COUNT_OFFSET..RECORD_COUNT_OFFSET + RECORD_COUNT_LENGTH], + )) + } + + /// Returns one byte range per record, each spanning `[SchemaId | Value]`: + /// the payload [`crate::row::FixedSchemaDecoder::decode`] expects. Index + /// [`Self::data`] with a returned range to get it without copying. + pub fn value_ranges(&self) -> Result>> { + let count = self.record_count()?; + if count < 0 { + return Err(corrupt(format!("invalid record count {count}"))); + } + let mut ranges = Vec::with_capacity(count as usize); + let mut pos = RECORD_BATCH_HEADER_SIZE; + for i in 0..count as usize { + if pos + RECORD_LENGTH_LENGTH > self.data.len() { + return Err(corrupt(format!( + "truncated value-record batch: record {i} length field runs past end" + ))); + } + let rec_len = LittleEndian::read_i32(&self.data[pos..pos + RECORD_LENGTH_LENGTH]); + if rec_len < 0 { + return Err(corrupt(format!("record {i} has negative length {rec_len}"))); + } + let start = pos + RECORD_LENGTH_LENGTH; + let end = start + rec_len as usize; + if end > self.data.len() { + return Err(corrupt(format!( + "truncated value-record batch: record {i} payload runs past end" + ))); + } + ranges.push(start..end); + pos = end; + } + Ok(ranges) + } + + /// The underlying batch bytes. + pub fn data(&self) -> &Bytes { + &self.data + } +} + +fn corrupt(message: String) -> Error { + Error::UnexpectedError { + message, + source: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::record::kv::SCHEMA_ID_LENGTH; + + /// Build a value-record batch from `(schema_id, row_bytes)` pairs, mirroring + /// the Java `DefaultValueRecordBatch.Builder` wire layout. + fn build_batch(records: &[(i16, &[u8])]) -> Vec { + let mut body = Vec::new(); + for (schema_id, row) in records { + let rec_len = (SCHEMA_ID_LENGTH + row.len()) as i32; + body.extend_from_slice(&rec_len.to_le_bytes()); + body.extend_from_slice(&schema_id.to_le_bytes()); + body.extend_from_slice(row); + } + let mut out = Vec::new(); + // Length covers Magic + RecordCount + body. + let length = (MAGIC_LENGTH + RECORD_COUNT_LENGTH + body.len()) as i32; + out.extend_from_slice(&length.to_le_bytes()); + out.push(0); // magic + out.extend_from_slice(&(records.len() as i32).to_le_bytes()); + out.extend_from_slice(&body); + out + } + + #[test] + fn parses_record_count_and_ranges() { + let raw = build_batch(&[(7, &[1, 2, 3]), (7, &[4, 5])]); + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert_eq!(batch.record_count().unwrap(), 2); + + let ranges = batch.value_ranges().unwrap(); + assert_eq!(ranges.len(), 2); + // First record payload = [schema_id(2) | row(3)] = 5 bytes. + let r0 = &batch.data()[ranges[0].clone()]; + assert_eq!(r0.len(), 5); + assert_eq!(LittleEndian::read_i16(&r0[..2]), 7); + assert_eq!(&r0[2..], &[1, 2, 3]); + // Second record payload = [schema_id(2) | row(2)] = 4 bytes. + let r1 = &batch.data()[ranges[1].clone()]; + assert_eq!(r1.len(), 4); + assert_eq!(&r1[2..], &[4, 5]); + } + + #[test] + fn empty_batch_has_no_ranges() { + let raw = build_batch(&[]); + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert_eq!(batch.record_count().unwrap(), 0); + assert!(batch.value_ranges().unwrap().is_empty()); + } + + #[test] + fn truncated_payload_errors() { + let mut raw = build_batch(&[(7, &[1, 2, 3])]); + raw.truncate(raw.len() - 2); // chop into the row payload + let batch = ValueRecordBatch::new(Bytes::from(raw)); + assert!(batch.value_ranges().is_err()); + } + + #[test] + fn short_header_errors() { + let batch = ValueRecordBatch::new(Bytes::from(vec![0u8, 1, 2])); + assert!(batch.record_count().is_err()); + } +} diff --git a/crates/fluss/tests/integration/batch_scanner.rs b/crates/fluss/tests/integration/batch_scanner.rs new file mode 100644 index 00000000..75434174 --- /dev/null +++ b/crates/fluss/tests/integration/batch_scanner.rs @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod batch_scanner_test { + use crate::integration::utils::{create_table, get_shared_cluster}; + use arrow::array::{Int32Array, StringArray, record_batch}; + use fluss::metadata::{DataTypes, LogFormat, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::row::GenericRow; + use std::collections::HashMap; + use std::time::Duration; + + /// End-to-end check that the scanner yields the appended rows once and then + /// `None`, honoring the configured limit. + #[tokio::test] + async fn batch_scanner_returns_appended_rows_then_none() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_log"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .column("c2", DataTypes::string()) + .build() + .expect("schema"), + ) + // Single bucket so a single BatchScanner sees every row. + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_append() + .expect("append") + .create_writer() + .expect("writer"); + + let batch = record_batch!( + ("c1", Int32, [1, 2, 3, 4, 5]), + ("c2", Utf8, ["a", "b", "c", "d", "e"]) + ) + .unwrap(); + writer.append_arrow_batch(batch).expect("append batch"); + writer.flush().await.expect("flush"); + + // Give the server a moment to commit and make the records readable. + tokio::time::sleep(Duration::from_secs(1)).await; + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .limit(3) + .expect("limit") + .create_bucket_batch_scanner(bucket.clone()) + .expect("create batch scanner"); + + let first = scanner + .next_batch() + .await + .expect("poll") + .expect("first batch should be Some"); + + assert_eq!(first.bucket(), &bucket); + // The server may return fewer rows than the limit on the first call, + // but must never exceed it. + assert!( + first.num_records() > 0 && first.num_records() <= 3, + "expected 1..=3 records, got {}", + first.num_records() + ); + + assert!( + scanner.next_batch().await.expect("poll").is_none(), + "scanner must end after one batch" + ); + } + + /// Limit scan on a primary-key table: decodes the value-record batch and + /// honors the limit. Exercises the KV wire path (distinct from the log one). + #[tokio::test] + async fn batch_scanner_reads_primary_key_table() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_pk"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("schema"), + ) + // Single bucket so one BatchScanner sees every row. + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let writer = table + .new_upsert() + .expect("upsert") + .create_writer() + .expect("writer"); + + let expected: HashMap = + [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")].into(); + for (id, name) in &expected { + let mut row = GenericRow::new(2); + row.set_field(0, *id); + row.set_field(1, *name); + writer.upsert(&row).expect("upsert row"); + } + writer.flush().await.expect("flush"); + + // Give the server a moment to commit and make the records readable. + tokio::time::sleep(Duration::from_secs(1)).await; + + let table_info = table.get_table_info(); + let bucket = TableBucket::new(table_info.table_id, 0); + + let mut scanner = table + .new_scan() + .limit(3) + .expect("limit") + .create_bucket_batch_scanner(bucket.clone()) + .expect("create batch scanner"); + + let first = scanner + .next_batch() + .await + .expect("poll") + .expect("first batch should be Some"); + + assert_eq!(first.bucket(), &bucket); + let rows = first.batch(); + assert_eq!(rows.num_columns(), 2, "id + name"); + assert!( + rows.num_rows() > 0 && rows.num_rows() <= 3, + "expected 1..=3 records, got {}", + rows.num_rows() + ); + + // Every returned (id, name) must match what we upserted. + let ids = rows + .column(0) + .as_any() + .downcast_ref::() + .expect("id column Int32"); + let names = rows + .column(1) + .as_any() + .downcast_ref::() + .expect("name column Utf8"); + for i in 0..rows.num_rows() { + let id = ids.value(i); + let name = names.value(i); + assert_eq!( + expected.get(&id), + Some(&name), + "decoded row ({id}, {name}) does not match upserted data" + ); + } + + assert!( + scanner.next_batch().await.expect("poll").is_none(), + "scanner must end after one batch" + ); + } + + /// A bucket with the wrong table_id or an out-of-range bucket_id must be + /// rejected before any RPC is made. + #[tokio::test] + async fn batch_scanner_rejects_invalid_bucket() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_table_id"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let table_id = table.get_table_info().table_id; + + // Wrong table_id. + assert!( + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(TableBucket::new(table_id + 9999, 0)) + .is_err(), + "must reject mismatched table_id" + ); + + // Bucket id past the single bucket of this table. + assert!( + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(TableBucket::new(table_id, 99)) + .is_err(), + "must reject out-of-range bucket_id" + ); + } + + /// A limit scan over a non-ARROW log table must be rejected (the log path + /// decodes Arrow IPC). + #[tokio::test] + async fn batch_scanner_rejects_non_arrow_log_format() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_indexed"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .log_format(LogFormat::INDEXED) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + let bucket = TableBucket::new(table.get_table_info().table_id, 0); + + assert!( + table + .new_scan() + .limit(1) + .expect("limit") + .create_bucket_batch_scanner(bucket) + .is_err(), + "must reject INDEXED log format" + ); + } + + /// `.limit(n)` must reject non-positive values before any scanner is built. + #[tokio::test] + async fn batch_scanner_rejects_non_positive_limit() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_bad_limit"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + assert!(table.new_scan().limit(0).is_err()); + assert!(table.new_scan().limit(-5).is_err()); + } + + /// A configured limit must be rejected by the log scanners rather than + /// silently ignored. + #[tokio::test] + async fn limit_is_rejected_by_log_scanners() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("admin"); + + let table_path = TablePath::new("fluss", "test_batch_scanner_limit_logscan"); + let descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .build() + .expect("schema"), + ) + .distributed_by(Some(1), vec!["c1".to_string()]) + .build() + .expect("descriptor"); + create_table(&admin, &table_path, &descriptor).await; + + let table = connection.get_table(&table_path).await.expect("table"); + assert!( + table + .new_scan() + .limit(5) + .expect("limit") + .create_log_scanner() + .is_err(), + "create_log_scanner must reject a configured limit" + ); + assert!( + table + .new_scan() + .limit(5) + .expect("limit") + .create_record_batch_log_scanner() + .is_err(), + "create_record_batch_log_scanner must reject a configured limit" + ); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 792b68f0..2d2bd152 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -21,6 +21,7 @@ extern crate fluss; #[cfg(feature = "integration_tests")] mod integration { mod admin; + mod batch_scanner; mod fluss_cluster; mod kv_table; mod log_table; diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 5d983030..bb2ec3e8 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -134,8 +134,10 @@ Complete API reference for the Fluss Rust client. |-----------------------------------------------------------------------------|-----------------------------------------| | `fn project(self, indices: &[usize]) -> Result` | Project columns by index | | `fn project_by_name(self, names: &[&str]) -> Result` | Project columns by name | +| `fn limit(self, n: i32) -> Result` | Set a row limit (enables `create_bucket_batch_scanner`; rejected by log scanners) | | `fn create_log_scanner(self) -> Result` | Create a record-based log scanner | | `fn create_record_batch_log_scanner(self) -> Result` | Create an Arrow batch-based log scanner | +| `fn create_bucket_batch_scanner(self, bucket: TableBucket) -> Result` | Bounded scan of one bucket (requires `limit`; runs on first `next_batch`) | ## `LogScanner` @@ -211,6 +213,19 @@ bucket identity per batch, use `next_batch` instead. | `fn next(&mut self) -> Option>` | Iterator: next batch, or `None` when caught up | | `fn schema(&self) -> SchemaRef` | Arrow schema for produced batches | +## `LimitBatchScanner` + +One-shot bounded scanner from `TableScan::limit(n).create_bucket_batch_scanner(bucket)`. +Poll it with `next_batch` until it returns `None` (mirrors `RecordBatchLogReader`). +Supports both log and primary-key tables (the latter returns the current, +server-deduplicated state); yields a single batch of at most `n` rows. + +| Method | Description | +|---------------------------------------------------------------|--------------------------------------| +| `async fn next_batch(&mut self) -> Result>` | Rows on the first call, `None` after | +| `async fn collect_all_batches(&mut self) -> Result>` | Drain into all batches | +| `fn bucket(&self) -> &TableBucket` | The scanned bucket | + ## `ScanRecord` | Method | Description | diff --git a/website/docs/user-guide/rust/example/log-tables.md b/website/docs/user-guide/rust/example/log-tables.md index 04857796..e77c8c6c 100644 --- a/website/docs/user-guide/rust/example/log-tables.md +++ b/website/docs/user-guide/rust/example/log-tables.md @@ -153,3 +153,20 @@ let scanner = table.new_scan() .project_by_name(&["event_id", "timestamp"])? .create_log_scanner()?; ``` + +## Limit Scan + +For a bounded read of up to `n` rows from a single bucket, use a batch scanner +instead of subscribing. It issues one request; poll it with `next_batch` until +it returns `None`. + +```rust +let bucket = TableBucket::new(table.get_table_info().table_id, 0); +let mut scanner = table.new_scan().limit(10)?.create_bucket_batch_scanner(bucket)?; + +while let Some(batch) = scanner.next_batch().await? { + println!("rows: {}", batch.batch().num_rows()); +} +``` + +Limit applies per bucket; scan each bucket to cover a multi-bucket table. diff --git a/website/docs/user-guide/rust/example/primary-key-tables.md b/website/docs/user-guide/rust/example/primary-key-tables.md index 82a07c4c..01836e29 100644 --- a/website/docs/user-guide/rust/example/primary-key-tables.md +++ b/website/docs/user-guide/rust/example/primary-key-tables.md @@ -124,3 +124,18 @@ println!("Rows: {}", batch.num_rows()); ## Prefix Lookup To fetch all rows sharing a common primary-key prefix (by choosing a bucket key that's a strict prefix of the primary key), see [Prefix Lookup](./prefix-lookup.md). + +## Limit Scan + +To read up to `n` rows of a bucket's current state without supplying keys, use a batch scanner. The server returns the deduplicated current rows as Arrow batches, which is convenient for previews or DataFusion sources. + +```rust +let bucket = TableBucket::new(table.get_table_info().table_id, 0); +let mut scanner = table.new_scan().limit(10)?.create_bucket_batch_scanner(bucket)?; + +while let Some(batch) = scanner.next_batch().await? { + println!("rows: {}", batch.batch().num_rows()); +} +``` + +Limit applies per bucket; scan each bucket to cover a multi-bucket table.