Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
767 changes: 767 additions & 0 deletions crates/fluss/src/client/table/batch_scanner.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions crates/fluss/src/client/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
pub const EARLIEST_OFFSET: i64 = -2;

mod append;
mod batch_scanner;
mod lookup;

mod log_fetch_buffer;
Expand All @@ -35,6 +36,7 @@ mod scanner;
mod upsert;

pub use append::{AppendWriter, TableAppend};
pub use batch_scanner::LimitBatchScanner;
pub use lookup::{LookupResult, Lookuper, PrefixKeyLookuper, TableLookup, TablePrefixLookup};
pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader};
pub use remote_log::{
Expand Down
97 changes: 96 additions & 1 deletion crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::ClientSchemaGetter;
use crate::client::connection::FlussConnection;
use crate::client::credentials::SecurityTokenManager;
use crate::client::metadata::Metadata;
use crate::client::table::batch_scanner::LimitBatchScanner;
use crate::client::table::log_fetch_buffer::{
CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel,
LogFetchBuffer, RemotePendingFetch,
Expand All @@ -26,7 +28,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo};
use crate::config::Config;
use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::metadata::{
LogFormat, PhysicalTablePath, RowType, SchemaInfo, TableBucket, TableInfo, TablePath,
};
use crate::metrics::ScannerMetrics;
use crate::proto::{
ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable,
Expand Down Expand Up @@ -55,6 +59,8 @@ pub struct TableScan<'a> {
metadata: Arc<Metadata>,
/// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty).
projected_fields: Option<Vec<usize>>,
/// Optional row limit. When set, callers may construct a [`BatchScanner`] for a one-shot bounded scan.
limit: Option<i32>,
}

impl<'a> TableScan<'a> {
Expand All @@ -64,9 +70,96 @@ impl<'a> TableScan<'a> {
table_info,
metadata,
projected_fields: None,
limit: None,
}
}

/// Sets a row limit for the scan, enabling [`Self::create_bucket_batch_scanner`].
///
/// The limit must be positive. A limit is incompatible with the log
/// scanners, which reject it.
pub fn limit(mut self, n: i32) -> Result<Self> {
if n <= 0 {
return Err(Error::IllegalArgument {
message: format!("Scan limit must be positive, got {n}"),
});
}
self.limit = Some(n);
Ok(self)
}

/// Log scanners don't support limit pushdown; reject a configured limit
/// rather than silently ignoring it.
fn reject_limit(&self, scanner: &str) -> Result<()> {
if let Some(limit) = self.limit {
return Err(Error::UnsupportedOperation {
message: format!(
"{scanner} doesn't support limit pushdown. Table: {}, requested limit: {limit}",
self.table_info.table_path
),
});
}
Ok(())
}

/// Creates a one-shot bounded scan of `table_bucket`.
///
/// Requires a previously-configured limit via [`Self::limit`]. Creation is
/// cheap; the `LimitScanRequest` runs on the first
/// [`LimitBatchScanner::next_batch`].
pub fn create_bucket_batch_scanner(
Comment thread
fresh-borzoni marked this conversation as resolved.
self,
table_bucket: TableBucket,
) -> Result<LimitBatchScanner> {
let limit = self.limit.ok_or_else(|| Error::IllegalArgument {
message: "create_bucket_batch_scanner requires a limit configured via .limit(n)"
.to_string(),
})?;
if table_bucket.table_id() != self.table_info.table_id {
return Err(Error::IllegalArgument {
message: format!(
"Bucket table_id {} does not match scan table_id {}",
table_bucket.table_id(),
self.table_info.table_id
),
});
}
let num_buckets = self.table_info.get_num_buckets();
if table_bucket.bucket_id() < 0 || table_bucket.bucket_id() >= num_buckets {
return Err(Error::IllegalArgument {
message: format!(
"Bucket id {} out of range for table with {num_buckets} buckets",
table_bucket.bucket_id()
),
});
}
// Log tables decode as Arrow IPC, so only ARROW format is supported (KV
// tables use the value-record path and are exempt).
if !self.table_info.has_primary_key() {
validate_scan_support(&self.table_info.table_path, &self.table_info)?;
}
// Pre-seed the current schema; older versions are fetched lazily during
// KV decode. Mirrors `Table::new_lookup`.
let latest = SchemaInfo::new(
self.table_info.get_schema().clone(),
self.table_info.get_schema_id(),
);
let schema_getter = Arc::new(ClientSchemaGetter::new(
self.table_info.table_path.clone(),
self.conn.get_admin()?,
latest,
));
Ok(LimitBatchScanner::new(
self.conn.get_connections(),
self.metadata.clone(),
self.table_info,
schema_getter,
self.projected_fields,
table_bucket,
limit,
))
}

/// Projects the scan to only include specified columns by their indices.
///
/// # Arguments
Expand Down Expand Up @@ -219,6 +312,7 @@ impl<'a> TableScan<'a> {
}

pub fn create_log_scanner(self) -> Result<LogScanner> {
self.reject_limit("LogScanner")?;
validate_scan_support(&self.table_info.table_path, &self.table_info)?;
let inner = LogScannerInner::new(
&self.table_info,
Expand All @@ -233,6 +327,7 @@ impl<'a> TableScan<'a> {
}

pub fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> {
self.reject_limit("RecordBatchLogScanner")?;
validate_scan_support(&self.table_info.table_path, &self.table_info)?;
let inner = LogScannerInner::new(
&self.table_info,
Expand Down
2 changes: 2 additions & 0 deletions crates/fluss/src/record/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod kv_record_batch;
mod kv_record_batch_builder;
mod kv_record_read_context;
mod read_context;
mod value_record_batch;

#[cfg(test)]
mod test_util;
Expand All @@ -31,6 +32,7 @@ pub use kv_record_batch::*;
pub use kv_record_batch_builder::*;
pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter};
pub use read_context::ReadContext;
pub use value_record_batch::ValueRecordBatch;

/// Current KV magic value
pub const CURRENT_KV_MAGIC_VALUE: u8 = 0;
Expand Down
188 changes: 188 additions & 0 deletions crates/fluss/src/record/kv/value_record_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Reader for the value-record batch returned by a KV (primary-key) limit
//! scan. This is a distinct wire format from [`super::KvRecordBatch`]: it
//! carries value-only records (no keys, no CRC/writer-id header) and a schema
//! id *per record* rather than per batch.
//!
//! Batch layout (little-endian):
//! - Length => Int32 (size of everything after this field)
//! - Magic => Int8
//! - RecordCount => Int32
//! - Records => [ValueRecord]
//!
//! Each `ValueRecord`:
//! - Length => Int32 (size after this field: SchemaId + Value)
//! - SchemaId => Int16
//! - Value => row bytes
//!
//! Reference: `org.apache.fluss.record.DefaultValueRecordBatch` and
//! `org.apache.fluss.record.DefaultValueRecord`.

use crate::error::{Error, Result};
use byteorder::{ByteOrder, LittleEndian};
use bytes::Bytes;
use std::ops::Range;

const LENGTH_LENGTH: usize = 4;
const MAGIC_LENGTH: usize = 1;
const RECORD_COUNT_LENGTH: usize = 4;
/// Offset of the record count within the batch header.
const RECORD_COUNT_OFFSET: usize = LENGTH_LENGTH + MAGIC_LENGTH;
/// Size of the batch header (`Length + Magic + RecordCount`).
const RECORD_BATCH_HEADER_SIZE: usize = LENGTH_LENGTH + MAGIC_LENGTH + RECORD_COUNT_LENGTH;
/// Size of a `ValueRecord`'s leading length field.
const RECORD_LENGTH_LENGTH: usize = 4;

/// Read-only view over a serialized value-record batch.
pub struct ValueRecordBatch {
data: Bytes,
}

impl ValueRecordBatch {
/// Wraps raw batch bytes. The batch is expected to start at offset 0.
pub fn new(data: Bytes) -> Self {
Self { data }
}

/// Number of records declared in the batch header.
pub fn record_count(&self) -> Result<i32> {
if self.data.len() < RECORD_BATCH_HEADER_SIZE {
return Err(corrupt(format!(
"value-record batch too short: {} bytes, need {} for header",
self.data.len(),
RECORD_BATCH_HEADER_SIZE
)));
}
Ok(LittleEndian::read_i32(
&self.data[RECORD_COUNT_OFFSET..RECORD_COUNT_OFFSET + RECORD_COUNT_LENGTH],
))
}

/// Returns one byte range per record, each spanning `[SchemaId | Value]`:
/// the payload [`crate::row::FixedSchemaDecoder::decode`] expects. Index
/// [`Self::data`] with a returned range to get it without copying.
pub fn value_ranges(&self) -> Result<Vec<Range<usize>>> {
let count = self.record_count()?;
if count < 0 {
return Err(corrupt(format!("invalid record count {count}")));
}
let mut ranges = Vec::with_capacity(count as usize);
let mut pos = RECORD_BATCH_HEADER_SIZE;
for i in 0..count as usize {
if pos + RECORD_LENGTH_LENGTH > self.data.len() {
return Err(corrupt(format!(
"truncated value-record batch: record {i} length field runs past end"
)));
}
let rec_len = LittleEndian::read_i32(&self.data[pos..pos + RECORD_LENGTH_LENGTH]);
if rec_len < 0 {
return Err(corrupt(format!("record {i} has negative length {rec_len}")));
}
let start = pos + RECORD_LENGTH_LENGTH;
let end = start + rec_len as usize;
if end > self.data.len() {
return Err(corrupt(format!(
"truncated value-record batch: record {i} payload runs past end"
)));
}
ranges.push(start..end);
pos = end;
}
Ok(ranges)
}

/// The underlying batch bytes.
pub fn data(&self) -> &Bytes {
&self.data
}
}

fn corrupt(message: String) -> Error {
Error::UnexpectedError {
message,
source: None,
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::record::kv::SCHEMA_ID_LENGTH;

/// Build a value-record batch from `(schema_id, row_bytes)` pairs, mirroring
/// the Java `DefaultValueRecordBatch.Builder` wire layout.
fn build_batch(records: &[(i16, &[u8])]) -> Vec<u8> {
let mut body = Vec::new();
for (schema_id, row) in records {
let rec_len = (SCHEMA_ID_LENGTH + row.len()) as i32;
body.extend_from_slice(&rec_len.to_le_bytes());
body.extend_from_slice(&schema_id.to_le_bytes());
body.extend_from_slice(row);
}
let mut out = Vec::new();
// Length covers Magic + RecordCount + body.
let length = (MAGIC_LENGTH + RECORD_COUNT_LENGTH + body.len()) as i32;
out.extend_from_slice(&length.to_le_bytes());
out.push(0); // magic
out.extend_from_slice(&(records.len() as i32).to_le_bytes());
out.extend_from_slice(&body);
out
}

#[test]
fn parses_record_count_and_ranges() {
let raw = build_batch(&[(7, &[1, 2, 3]), (7, &[4, 5])]);
let batch = ValueRecordBatch::new(Bytes::from(raw));
assert_eq!(batch.record_count().unwrap(), 2);

let ranges = batch.value_ranges().unwrap();
assert_eq!(ranges.len(), 2);
// First record payload = [schema_id(2) | row(3)] = 5 bytes.
let r0 = &batch.data()[ranges[0].clone()];
assert_eq!(r0.len(), 5);
assert_eq!(LittleEndian::read_i16(&r0[..2]), 7);
assert_eq!(&r0[2..], &[1, 2, 3]);
// Second record payload = [schema_id(2) | row(2)] = 4 bytes.
let r1 = &batch.data()[ranges[1].clone()];
assert_eq!(r1.len(), 4);
assert_eq!(&r1[2..], &[4, 5]);
}

#[test]
fn empty_batch_has_no_ranges() {
let raw = build_batch(&[]);
let batch = ValueRecordBatch::new(Bytes::from(raw));
assert_eq!(batch.record_count().unwrap(), 0);
assert!(batch.value_ranges().unwrap().is_empty());
}

#[test]
fn truncated_payload_errors() {
let mut raw = build_batch(&[(7, &[1, 2, 3])]);
raw.truncate(raw.len() - 2); // chop into the row payload
let batch = ValueRecordBatch::new(Bytes::from(raw));
assert!(batch.value_ranges().is_err());
}

#[test]
fn short_header_errors() {
let batch = ValueRecordBatch::new(Bytes::from(vec![0u8, 1, 2]));
assert!(batch.record_count().is_err());
}
}
Loading
Loading