Skip to content

Conversation

@zhaohaidao
Copy link
Contributor

@zhaohaidao zhaohaidao commented Nov 22, 2025

Purpose

Support ColumnPruning to save significant network costs

Brief change log

Tests

API and Format

Documentation

@zhaohaidao zhaohaidao changed the title (WIP)Support ColumnPruning Support ColumnPruning Nov 23, 2025
@zhaohaidao
Copy link
Contributor Author

@luoyuxia PTAL if u have time.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds column projection (pruning) support to the Fluss Rust client, enabling users to fetch only specific columns from tables to reduce network transfer costs. The implementation supports both server-side projection pushdown and client-side projection, with the ability to project by column indices or names.

Key Changes:

  • Added projection support to the scanner API with project() and project_by_name() methods
  • Implemented custom Arrow IPC message parsing to support projection at the record batch level
  • Enhanced ReadContext to handle different projection modes (pushdown vs. client-side)

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
crates/fluss/src/record/arrow.rs Added parse_ipc_message() method for custom IPC parsing, modified records() to support projection, and extended ReadContext with projection capabilities
crates/fluss/src/client/table/scanner.rs Added project() and project_by_name() methods to TableScan, threaded projection fields through LogScanner and LogFetcher, and updated fetch request generation to include projection information
crates/examples/src/example_projection.rs New example demonstrating column projection usage with both full scan and projected scan scenarios
crates/examples/src/example_table.rs Updated bootstrap server address for consistency
crates/examples/Cargo.toml Added env_logger dependency and registered the new projection example

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

let (projection_enabled, projected_fields) = if let Some(fields) = &self.projected_fields {
if fields.is_empty() {
(false, vec![])
} else {
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting usize to i32 using as i32 could lead to data loss or incorrect values on 64-bit systems where usize can exceed i32::MAX. This could cause issues when projecting columns with very high indices. Consider either:

  1. Using a bounded type for column indices (e.g., validating they're within i32 range)
  2. Changing the protobuf field type to support larger indices
  3. Adding an explicit check and error if the index exceeds i32::MAX
Suggested change
} else {
} else {
// Check for out-of-range indices before converting
if let Some(&idx) = fields.iter().find(|&&i| i > i32::MAX as usize) {
// Return early with error if any index is too large
return HashMap::new(); // Or, if possible, return Err(Error::msg(...))
}

Copilot uses AI. Check for mistakes.
data: &'a [u8],
) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, arrow::ipc::MetadataVersion)> {
const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;

Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Trailing whitespace detected. Please remove it to maintain code cleanliness.

Suggested change

Copilot uses AI. Check for mistakes.
if continuation != CONTINUATION_MARKER {
return None;
}

Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Trailing whitespace detected. Please remove it to maintain code cleanliness.

Suggested change

Copilot uses AI. Check for mistakes.
Comment on lines 197 to 202
let projected_schema = arrow_schema::Schema::new(
projected_fields
.iter()
.map(|&idx| full_arrow_schema.field(idx).clone())
.collect::<Vec<_>>(),
);
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the bounds checking issue in ReadContext::to_arrow_metadata, there's no validation when creating the projected schema here. If projected_fields contains an invalid index, calling full_arrow_schema.field(idx) will panic. While TableScan::project validates indices, this code path could still be reached with invalid indices if the projection is set up through other means.

Copilot uses AI. Check for mistakes.
Comment on lines 197 to 202
let projected_schema = arrow_schema::Schema::new(
projected_fields
.iter()
.map(|&idx| full_arrow_schema.field(idx).clone())
.collect::<Vec<_>>(),
);
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code for building a projected schema is duplicated from the logic in ReadContext::to_arrow_metadata (lines 652-658 in arrow.rs). This duplication creates a maintenance burden and increases the risk of inconsistencies. Consider extracting this logic into a shared helper method.

Suggested change
let projected_schema = arrow_schema::Schema::new(
projected_fields
.iter()
.map(|&idx| full_arrow_schema.field(idx).clone())
.collect::<Vec<_>>(),
);
let projected_schema = build_projected_schema(&full_arrow_schema, &projected_fields);

Copilot uses AI. Check for mistakes.
if data.len() < 8 {
return None;
}

Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Trailing whitespace detected. Please remove it to maintain code cleanliness.

Suggested change

Copilot uses AI. Check for mistakes.
Comment on lines 190 to 194
let (projection_enabled, _) = if !projected_fields.is_empty() {
(true, projected_fields.clone())
} else {
(false, vec![])
};
Copy link

Copilot AI Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The check for empty projected fields (line 190) and subsequent handling creates unnecessary complexity. If projected_fields is Some(vec![]) (an empty vector), it should probably be treated the same as None - i.e., fetch all fields. The current logic sets projection_enabled = false for empty vectors, but it's unclear why an empty projection list would ever be intentionally created. Consider either preventing empty vectors from being set in the first place (e.g., in the project method) or simplifying this logic.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaohaidao Thanks for the pr. Left minor comments. PTAL.
Also, seem this pr doesn't include reordeing the project fields returned by server for final output. See java impl https://github.com/apache/fluss/blob/ae84521aaaef5448a0bc5a63fc83e6ca536ca452/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java#L84

don't forget to create an issue to track it. It's critical when project fields is out of orders like [3, 2, 1]

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

pub async fn main() -> Result<()> {
let mut config = Config::parse();
config.bootstrap_server = Some("127.0.0.1:56405".to_string());
config.bootstrap_server = Some("127.0.0.1:9123".to_string());
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded bootstrap server changed from port 56405 to 9123. This change appears to be unrelated to the column pruning feature and may have been committed accidentally. If this is an intentional configuration change for development/testing, it should be explained in the PR description or reverted to avoid affecting other developers' local environments.

Suggested change
config.bootstrap_server = Some("127.0.0.1:9123".to_string());
config.bootstrap_server = Some("127.0.0.1:56405".to_string());

Copilot uses AI. Check for mistakes.
Comment on lines 697 to 700
return None;
}

// Calculate reordering indexes to transform from sorted order to user-requested order
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .expect() call could panic if there's a mismatch between projected_fields and projection_in_order. While this should never happen with the current code logic (since both are derived from the same source in create_read_context), this represents a potentially unrecoverable error condition. Consider:

  1. Adding debug assertions to validate invariants in with_projection_pushdown():
debug_assert_eq!(projected_fields.len(), projection_in_order.len());
debug_assert!(projected_fields.iter().all(|&f| projection_in_order.contains(&f)));
  1. Or, returning a Result from reordering_indexes() instead of using .expect() to make error handling more explicit and recoverable.

Copilot uses AI. Check for mistakes.
zhaohaidao and others added 2 commits December 3, 2025 15:21
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@zhaohaidao
Copy link
Contributor Author

@zhaohaidao Thanks for the pr. Left minor comments. PTAL. Also, seem this pr doesn't include reordeing the project fields returned by server for final output. See java impl https://github.com/apache/fluss/blob/ae84521aaaef5448a0bc5a63fc83e6ca536ca452/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java#L84

don't forget to create an issue to track it. It's critical when project fields is out of orders like [3, 2, 1]

@luoyuxia Thank you for your concise and elegant suggestions. Comments are addressed. PTAL if u have time

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia
Copy link
Contributor

luoyuxia commented Dec 3, 2025

@zhaohaidao Hi, I append a minior commit to improve reordering. PTAL.

@zhaohaidao
Copy link
Contributor Author

@zhaohaidao Hi, I append a minior commit to improve reordering. PTAL.

@luoyuxia Thanks, LGTM

@luoyuxia
Copy link
Contributor

luoyuxia commented Dec 4, 2025

@zhaohaidao Thanks for updating. Left few comments. Don't forget rebase main branch to resovle conflicts.

@zhaohaidao
Copy link
Contributor Author

@zhaohaidao Thanks for updating. Left few comments. Don't forget rebase main branch to resovle conflicts.

Thanks for reminding. The comments are addressed. PTAL if u have time

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@luoyuxia luoyuxia merged commit e4e017c into apache:main Dec 4, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants