Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ where
let mut offset = 0;

if max_rep_level > 0 {
if offset > buf.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should never evaluate true?

return Err(general_err!("not enough data to read levels"));
}
let (bytes_read, level_data) = parse_v1_level(
max_rep_level,
num_values,
Expand All @@ -455,6 +458,9 @@ where
}

if max_def_level > 0 {
if offset > buf.len() {
return Err(general_err!("not enough data to read levels"));
}
let (bytes_read, level_data) = parse_v1_level(
max_def_level,
num_values,
Expand All @@ -469,6 +475,9 @@ where
.set_data(def_level_encoding, level_data);
}

if offset > buf.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the checks above are moved to after offset is incremented, then I think this check can be eliminated as it will either always be true (offset is still 0), or the error condition was already caught.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it more, I think this error checking needs to move into parse_v1_levels which can still panic. If that doesn't return an error, then we don't need to check again here.

return Err(general_err!("not enough data to read levels"));
}
self.values_decoder.set_data(
encoding,
buf.slice(offset..),
Expand Down Expand Up @@ -509,27 +518,33 @@ where
self.has_record_delimiter =
self.page_reader.at_record_boundary()?;

if rep_levels_byte_len as usize > buf.len() {
return Err(general_err!("not enough data to read levels"));
}
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.slice(..rep_levels_byte_len as usize),
);
}

let def_levels_end = rep_levels_byte_len + def_levels_byte_len;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this check higher up and check all levels in one shot?

if def_levels_end as usize > buf.len() {
return Err(general_err!("not enough data to read levels"));
}
// DataPage v2 only supports RLE encoding for definition
// levels
if self.descr.max_def_level() > 0 {
self.def_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.slice(
rep_levels_byte_len as usize
..(rep_levels_byte_len + def_levels_byte_len) as usize,
rep_levels_byte_len as usize..def_levels_end as usize,
),
);
}

self.values_decoder.set_data(
encoding,
buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
buf.slice(def_levels_end as usize..),
num_values as usize,
Some((num_values - num_nulls) as usize),
)?;
Expand Down Expand Up @@ -1038,6 +1053,82 @@ mod tests {
);
}

#[test]
fn test_data_page_v2_truncated_rep_levels() {
use std::collections::VecDeque;
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(get_test_int32_type()),
1,
1,
ColumnPath::new(Vec::new()),
));

// Create a buffer that is smaller than rep_levels_byte_len
let buf = Bytes::from(vec![0; 5]);

let page = Page::DataPageV2 {
buf,
num_values: 10,
encoding: Encoding::PLAIN,
num_nulls: 0,
num_rows: 10,
def_levels_byte_len: 10,
rep_levels_byte_len: 10, // larger than buffer
is_compressed: false,
statistics: None,
};

let page_reader = InMemoryPageReader::new(VecDeque::from(vec![page]));
let column_reader = get_column_reader(desc.clone(), Box::new(page_reader));
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);
let mut values = vec![];
let mut def_levels = vec![];
let mut rep_levels = vec![];
let err = typed_reader
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
.unwrap_err();
assert!(
matches!(err, ParquetError::General(msg) if msg == "not enough data to read levels")
);
}

#[test]
fn test_data_page_v2_truncated_def_levels() {
use std::collections::VecDeque;
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(get_test_int32_type()),
1,
1,
ColumnPath::new(Vec::new()),
));
// Test truncated definition levels
let buf = Bytes::from(vec![0; 15]);
let page = Page::DataPageV2 {
buf,
num_values: 10,
encoding: Encoding::PLAIN,
num_nulls: 0,
num_rows: 10,
def_levels_byte_len: 10, // rep(10) + def(10) > 15
rep_levels_byte_len: 10,
is_compressed: false,
statistics: None,
};

let page_reader = InMemoryPageReader::new(VecDeque::from(vec![page]));
let column_reader = get_column_reader(desc, Box::new(page_reader));
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);
let mut values = vec![];
let mut def_levels = vec![];
let mut rep_levels = vec![];
let err = typed_reader
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
.unwrap_err();
assert!(
matches!(err, ParquetError::General(msg) if msg == "not enough data to read levels")
);
}

// ----------------------------------------------------------------------
// Helper methods to make pages and test
//
Expand Down
Loading