Skip to content

Commit

Permalink
ARROW-8225: [Rust] Rust Arrow IPC reader must respect continuation ma…
Browse files Browse the repository at this point in the history
…rkers.

A continuation marker (value of 0xffffffff) in a message size block is
used to align the next block to an 8-byte boundary. This value needs to
be skipped over if encountered.

Closes #6724 from maxburke/rust_continuation

Authored-by: Max Burke <max@urbanlogiq.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
maxburke authored and kou committed Mar 27, 2020
1 parent a35cf92 commit 1de2593
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
21 changes: 20 additions & 1 deletion rust/arrow/src/ipc/reader.rs
Expand Up @@ -33,6 +33,8 @@ use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;

const CONTINUATION_MARKER: u32 = 0xffff_ffff;

/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &Vec<u8>) -> Buffer {
let start_offset = buf.offset() as usize;
Expand Down Expand Up @@ -730,6 +732,12 @@ impl<R: Read> StreamReader<R> {
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if u32::from_le_bytes(meta_size) == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
}

let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
// message header is a Schema, so read it
Expand Down Expand Up @@ -762,7 +770,18 @@ impl<R: Read> StreamReader<R> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
self.reader.read_exact(&mut meta_size)?;
let meta_len = u32::from_le_bytes(meta_size);
let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_len == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
};

if meta_len == 0 {
// the stream has ended, mark the reader as finished
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/logicalplan.rs
Expand Up @@ -841,8 +841,8 @@ mod tests {
.build()?;

let expected = "Projection: #0, #1 AS total_salary\
\n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\
\n TableScan: employee.csv projection=Some([3, 4])";
\n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\
\n TableScan: employee.csv projection=Some([3, 4])";

assert_eq!(expected, format!("{:?}", plan));

Expand Down
9 changes: 5 additions & 4 deletions rust/datafusion/src/sql/planner.rs
Expand Up @@ -272,9 +272,10 @@ impl<S: SchemaProvider> SqlToRel<S> {
Ok(Expr::Literal(ScalarValue::Utf8(s.clone())))
}

ASTNode::SQLAliasedExpr(ref expr, ref alias) => {
Ok(Alias(Arc::new(self.sql_to_rex(&expr, schema)?), alias.to_owned()))
}
ASTNode::SQLAliasedExpr(ref expr, ref alias) => Ok(Alias(
Arc::new(self.sql_to_rex(&expr, schema)?),
alias.to_owned(),
)),

ASTNode::SQLIdentifier(ref id) => {
match schema.fields().iter().position(|c| c.name().eq(id)) {
Expand Down Expand Up @@ -610,7 +611,7 @@ mod tests {
fn select_aliased_scalar_func() {
let sql = "SELECT sqrt(age) AS square_people FROM person";
let expected = "Projection: sqrt(CAST(#3 AS Float64)) AS square_people\
\n TableScan: person projection=None";
\n TableScan: person projection=None";
quick_test(sql, expected);
}

Expand Down

0 comments on commit 1de2593

Please sign in to comment.