Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ version = { workspace = true }
fluss = { workspace = true }
tokio = { workspace = true }
clap = { workspace = true}


[[example]]
name = "example-table"
path = "src/example_table.rs"
115 changes: 104 additions & 11 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema};
use crate::rpc::RpcClient;
use crate::util::FairBucketStatusMap;
use arrow_schema::SchemaRef;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::slice::from_ref;
Expand All @@ -39,6 +40,8 @@ pub struct TableScan<'a> {
conn: &'a FlussConnection,
table_info: TableInfo,
metadata: Arc<Metadata>,
/// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty).
projected_fields: Option<Vec<usize>>,
}

impl<'a> TableScan<'a> {
Expand All @@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
projected_fields: None,
}
}

pub fn create_log_scanner(&self) -> LogScanner {
/// Projects the scan to only include specified columns by their indices.
///
/// # Arguments
/// * `column_indices` - Zero-based indices of columns to include in the scan
///
/// # Errors
/// Returns an error if `column_indices` is empty or if any column index is out of range.
///
/// # Example
/// ```
/// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner();
/// ```
pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
if column_indices.is_empty() {
return Err(Error::IllegalArgument(
"Column indices cannot be empty".to_string(),
));
}
let field_count = self.table_info.row_type().fields().len();
for &idx in column_indices {
if idx >= field_count {
return Err(Error::IllegalArgument(format!(
"Column index {} out of range (max: {})",
idx,
field_count - 1
)));
}
}
self.projected_fields = Some(column_indices.to_vec());
Ok(self)
}

/// Projects the scan to only include specified columns by their names.
///
/// # Arguments
/// * `column_names` - Names of columns to include in the scan
///
/// # Errors
/// Returns an error if `column_names` is empty or if any column name is not found in the table schema.
///
/// # Example
/// ```
/// let scanner = table.new_scan().project_by_name(&["col1", "col3"])?.create_log_scanner();
/// ```
pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
if column_names.is_empty() {
return Err(Error::IllegalArgument(
"Column names cannot be empty".to_string(),
));
}
let row_type = self.table_info.row_type();
let mut indices = Vec::new();

for name in column_names {
let idx = row_type
.fields()
.iter()
.position(|f| f.name() == *name)
.ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?;
indices.push(idx);
}

self.projected_fields = Some(indices);
Ok(self)
}

pub fn create_log_scanner(self) -> LogScanner {
LogScanner::new(
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
self.projected_fields,
)
}
}
Expand All @@ -72,6 +143,7 @@ impl LogScanner {
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
projected_fields: Option<Vec<usize>>,
) -> Self {
let log_scanner_status = Arc::new(LogScannerStatus::new());
Self {
Expand All @@ -84,6 +156,7 @@ impl LogScanner {
connections.clone(),
metadata.clone(),
log_scanner_status.clone(),
projected_fields,
),
}
}
Expand Down Expand Up @@ -114,6 +187,7 @@ struct LogFetcher {
table_info: TableInfo,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
read_context: ReadContext,
}

impl LogFetcher {
Expand All @@ -122,13 +196,27 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
projected_fields: Option<Vec<usize>>,
) -> Self {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
let read_context = Self::create_read_context(full_arrow_schema, projected_fields);
LogFetcher {
table_path: table_info.table_path.clone(),
conns: conns.clone(),
table_info: table_info.clone(),
metadata: metadata.clone(),
log_scanner_status: log_scanner_status.clone(),
conns,
table_info,
metadata,
log_scanner_status,
read_context,
}
}

fn create_read_context(
full_arrow_schema: SchemaRef,
projected_fields: Option<Vec<usize>>,
) -> ReadContext {
match projected_fields {
None => ReadContext::new(full_arrow_schema),
Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, fields),
}
}

Expand All @@ -149,7 +237,7 @@ impl LogFetcher {
for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
let arrow_schema = to_arrow_schema(self.table_info.get_row_type());

for fetch_log_for_bucket in fetch_log_for_buckets {
let mut fetch_records = vec![];
let bucket: i32 = fetch_log_for_bucket.bucket_id;
Expand All @@ -158,8 +246,7 @@ impl LogFetcher {
let data = fetch_log_for_bucket.records.unwrap();
for log_record in &mut LogRecordsBatchs::new(&data) {
let last_offset = log_record.last_log_offset();
fetch_records
.extend(log_record.records(ReadContext::new(arrow_schema.clone())));
fetch_records.extend(log_record.records(&self.read_context)?);
self.log_scanner_status
.update_offset(&table_bucket, last_offset + 1);
}
Expand Down Expand Up @@ -209,13 +296,19 @@ impl LogFetcher {
if ready_for_fetch_count == 0 {
HashMap::new()
} else {
let (projection_enabled, projected_fields) =
match self.read_context.project_fields_in_order() {
None => (false, vec![]),
Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()),
};

fetch_log_req_for_buckets
.into_iter()
.map(|(leader_id, feq_for_buckets)| {
let req_for_table = PbFetchLogReqForTable {
table_id: table_id.unwrap(),
projection_pushdown_enabled: false,
projected_fields: vec![],
projection_pushdown_enabled: projection_enabled,
projected_fields: projected_fields.clone(),
buckets_req: feq_for_buckets,
};

Expand Down
Loading
Loading