feat(cubestore-cli): Support Arrow format for responses#10915
Conversation
|
Claude finished @ovr's task in 2m 15s —— View job PR Review: feat(cubestore-cli): Support Arrow format for responses
SummaryClean, well-structured PR that upgrades the CubeStore CLI's wire protocol from the legacy stringified-rows FlatBuffer format to Apache Arrow IPC. The design is backwards-compatible — the client requests Arrow, but gracefully handles Legacy responses from older servers via the What's good
Issues found (see inline comments)
VerdictThe only substantive concern is the |
| pub fn get_columns(&self) -> Vec<String> { | ||
| match &self.data { | ||
| ResultData::Legacy { columns, .. } => columns.clone(), | ||
| ResultData::Arrow { schema, .. } => { | ||
| schema.fields().iter().map(|f| f.name().clone()).collect() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Nit (performance): get_columns() allocates a new Vec<String> on every call. In exec.rs it's called once, but in format.rs and codec.rs it's called in paths where the result is used immediately and then dropped.
For the Legacy variant this clones all strings unnecessarily — the caller just needs to read names and count them. Consider returning Cow<'_, [String]> or a dedicated iterator/slice wrapper so the Legacy path avoids the clone:
pub fn get_columns(&self) -> Cow<'_, Vec<String>> {
match &self.data {
ResultData::Legacy { columns, .. } => Cow::Borrowed(columns),
ResultData::Arrow { schema, .. } => {
Cow::Owned(schema.fields().iter().map(|f| f.name().clone()).collect())
}
}
}Not a blocker — the column count is small — but worth noting for future optimization if this gets called in a hot path.
| fn batch_formatters<'a>( | ||
| batch: &'a RecordBatch, | ||
| options: &'a FormatOptions, | ||
| ) -> Vec<ArrayFormatter<'a>> { | ||
| batch | ||
| .columns() | ||
| .iter() | ||
| .map(|col| ArrayFormatter::try_new(col.as_ref(), options)) | ||
| .collect::<Result<_, _>>() | ||
| .expect("arrow ArrayFormatter init") | ||
| } |
There was a problem hiding this comment.
Concern: .expect("arrow ArrayFormatter init") will panic if Arrow encounters an unsupported data type. While this is unlikely with standard CubeStore types, a malformed server response could crash the CLI rather than printing an error.
Consider propagating the error or falling back gracefully:
fn batch_formatters<'a>(
batch: &'a RecordBatch,
options: &'a FormatOptions,
) -> Option<Vec<ArrayFormatter<'a>>> {
batch.columns()
.iter()
.map(|col| ArrayFormatter::try_new(col.as_ref(), options))
.collect::<Result<_, _>>()
.ok()
}Then skip rendering that batch (or fall back to debug formatting) if None.
| fn render_arrow_batches(columns: &[String], batches: &[RecordBatch]) -> String { | ||
| let ncols = columns.len(); | ||
| let fmt_options = FormatOptions::default().with_display_error(true); | ||
| let mut widths = header_widths(columns); | ||
|
|
||
| for batch in batches { | ||
| let formatters = batch_formatters(batch, &fmt_options); | ||
| let batch_cols = formatters.len().min(ncols); | ||
| for row_idx in 0..batch.num_rows() { | ||
| for col_idx in 0..batch_cols { | ||
| let cell = arrow_cell(batch, &formatters, col_idx, row_idx); | ||
| observe_width(&mut widths, col_idx, cell_len(cell.as_deref())); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let mut out = String::new(); | ||
| write_header(&mut out, columns, &widths); | ||
| write_separator(&mut out, &widths); | ||
| for batch in batches { | ||
| let formatters = batch_formatters(batch, &fmt_options); | ||
| let batch_cols = formatters.len().min(ncols); | ||
| for row_idx in 0..batch.num_rows() { | ||
| out.push('\n'); | ||
| for (i, &width) in widths.iter().enumerate() { | ||
| let cell = if i < batch_cols { | ||
| arrow_cell(batch, &formatters, i, row_idx) | ||
| } else { | ||
| None | ||
| }; | ||
| write_cell(&mut out, i, cell.as_deref(), width); | ||
| } | ||
| } | ||
| } | ||
| out |
There was a problem hiding this comment.
Nit: batch_formatters() is called twice per batch — once in the sizing pass and once in the rendering pass. The doc comment acknowledges the trade-off (CPU vs. not materializing all rows into strings), which is fair.
If this ever becomes a bottleneck with large result sets, you could compute formatters once per batch and store (batch, formatters) pairs, but the current approach is clean and the overhead is likely negligible for CLI output.
| fn decode_arrow_ipc(bytes: &[u8]) -> Result<QueryResult, TransportError> { | ||
| let reader = StreamReader::try_new(Cursor::new(bytes), None) | ||
| .map_err(|e| TransportError::Protocol(format!("arrow IPC open: {e}")))?; | ||
|
|
||
| let schema = reader.schema(); | ||
| let batches: Vec<RecordBatch> = reader | ||
| .collect::<Result<_, _>>() | ||
| .map_err(|e| TransportError::Protocol(format!("arrow IPC read batch: {e}")))?; | ||
|
|
||
| Ok(QueryResult { | ||
| data: ResultData::Arrow { schema, batches }, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Question: decode_arrow_ipc collects all batches into a Vec<RecordBatch> eagerly. For very large result sets this means the entire result is held in memory. This is fine for a CLI tool, but if this transport crate is intended to be reused by other consumers, a streaming approach (yielding batches one at a time) might be worth considering in the future.
Also: is there a size limit consideration? The StreamReader will happily read whatever the server sends. The WS layer already has a 256MiB frame cap (from actor.rs), so that provides an implicit ceiling, which seems sufficient.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #10915 +/- ##
==========================================
- Coverage 83.53% 78.93% -4.60%
==========================================
Files 254 470 +216
Lines 75844 92837 +16993
Branches 0 3449 +3449
==========================================
+ Hits 63353 73279 +9926
- Misses 12491 19054 +6563
- Partials 0 504 +504
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
No description provided.