Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple blocking binary reader issues #552

Merged
merged 5 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/binary/non_blocking/binary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl BinaryBuffer<Vec<u8>> {
// Make sure that there are `length` bytes in the `Vec` beyond `self.end`.
self.reserve_capacity(length);
// Get a mutable slice to the first `length` bytes starting at `self.end`.
let read_buffer = &mut self.data.as_mut_slice()[self.end..length];
let read_buffer = &mut self.data.as_mut_slice()[self.end..(self.end + length)];
// Use that slice as our input buffer to read from the source.
let bytes_read = source.read(read_buffer)?;
// Update `self.end` to reflect that we have more data available to read.
Expand All @@ -507,9 +507,7 @@ impl BinaryBuffer<Vec<u8>> {
// For now, it is unlikely that this would happen often.
let capacity = self.data.len() - self.end;
if capacity < length {
for _ in 0..(length - capacity) {
self.data.push(0);
}
self.data.resize(self.data.len() + length - capacity, 0);
}
}
}
Expand Down Expand Up @@ -810,4 +808,17 @@ mod tests {
.expect_err("This exceeded the configured max Int size.");
Ok(())
}

#[test]
fn validate_read_from_size() -> IonResult<()> {
// This test validates that the size of data we wish to read is actually honored by
// read_from. A bug existed where the sub-slice of the buffer was calculated incorrectly,
// leading to the potential for failed reads, or increasingly smaller reads.
let mut buffer = BinaryBuffer::new(vec![0; 10]);
let new_data: &[u8] = &[0; 11];
let bytes_read = buffer.read_from(new_data, 11)?;
assert_eq!(bytes_read, 11);

Ok(())
}
}
94 changes: 72 additions & 22 deletions src/binary/non_blocking/raw_binary_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::binary::non_blocking::type_descriptor::{Header, TypeDescriptor};
use crate::binary::uint::DecodedUInt;
use crate::binary::var_uint::VarUInt;
use crate::binary::IonTypeCode;
use crate::raw_reader::BufferedRawReader;
use crate::raw_reader::{BufferedRawReader, Expandable};
use crate::result::{
decoding_error, decoding_error_raw, illegal_operation, illegal_operation_raw,
incomplete_data_error,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Container {
/// Note that if the buffer runs out of data between top level values, this will be interpreted
/// as the end of the stream. Applications can still add more data to the buffer and resume reading.
#[derive(Debug)]
pub struct RawBinaryReader<A: AsRef<[u8]>> {
pub struct RawBinaryReader<A: AsRef<[u8]> + Expandable> {
ion_version: (u8, u8),
state: ReaderState,
buffer: BinaryBuffer<A>,
Expand Down Expand Up @@ -351,15 +351,16 @@ impl From<Vec<u8>> for RawBinaryReader<Vec<u8>> {
}
}

impl<A: AsRef<[u8]>> RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> RawBinaryReader<A> {
/// Constructs a RawBinaryReader from a value that can be viewed as a byte slice.
pub fn new(source: A) -> RawBinaryReader<A> {
let expandable = source.expandable();
RawBinaryReader {
ion_version: (1, 0),
state: ReaderState::Ready,
buffer: BinaryBuffer::new(source),
parents: Vec::new(), // Does not allocate yet
is_eos: false,
is_eos: !expandable,
}
}

Expand Down Expand Up @@ -704,7 +705,7 @@ impl<A: AsRef<[u8]>> RawBinaryReader<A> {
}
}

impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> IonReader for RawBinaryReader<A> {
type Item = RawStreamItem;
type Symbol = RawSymbolToken;

Expand Down Expand Up @@ -745,10 +746,18 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
return Ok(RawStreamItem::Nothing);
}
} else {
// We're at the top level. If we're out of data (`buffer.is_empty()`) and aren't waiting
// on more data (`Skipping(n)`), we return `Nothing` to indicate that we're at EOF.
// We're at the top level. If we are out of data, then we need to determine if we
// are at the end of the stream or not. If not, then we need to signal an
// incomplete error, otherwise we can return Nothing to indicate that we are done.
if self.buffer.is_empty() && self.state == ReaderState::Ready {
return Ok(RawStreamItem::Nothing);
if self.is_eos {
return Ok(RawStreamItem::Nothing);
} else {
return incomplete_data_error(
"ahead to next item",
self.buffer.total_consumed(),
);
}
}
}
}
Expand All @@ -762,7 +771,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// parsing will create a fresh transaction reader starting from the last good state.
let mut tx_reader = self.transaction_reader();

let item_result = tx_reader.read_next_item();
let item_result = tx_reader.read_next_item()?;
let nop_bytes_count = tx_reader.nop_bytes_count as usize;

// If we do not have enough bytes to materialize the next value, return an incomplete
Expand All @@ -782,7 +791,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// This guarantees that the first byte in the buffer is the first byte of the current item.
self.buffer.consume(nop_bytes_count);

item_result
Ok(item_result)
}

fn current(&self) -> Self::Item {
Expand Down Expand Up @@ -1374,6 +1383,10 @@ impl<'a, A: AsRef<[u8]>> TxReader<'a, A> {
return decoding_error("found an annotation wrapper with no value");
}

if annotations_length.value() > self.tx_buffer.remaining() {
return incomplete_data_error("an annotation wrapper", self.tx_buffer.total_consumed());
}

// Skip over the annotations sequence itself; the reader will return to it if/when the
// reader asks to iterate over those symbol IDs.
self.tx_buffer.consume(annotations_length.value());
Expand Down Expand Up @@ -1471,7 +1484,7 @@ mod tests {
}
}

fn expect_annotations<A: AsRef<[u8]>, I: IntoRawAnnotations>(
fn expect_annotations<A: AsRef<[u8]> + Expandable, I: IntoRawAnnotations>(
reader: &RawBinaryReader<A>,
annotations: I,
) {
Expand Down Expand Up @@ -1513,7 +1526,7 @@ mod tests {
#[test]
fn read_int_header() -> IonResult<()> {
let data = vec![0x21, 0x03];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
Ok(())
Expand All @@ -1529,11 +1542,14 @@ mod tests {
// This byte completes the int, but we still don't have another value to move to.
reader.append_bytes(&[0x03])?;
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
expect_incomplete(reader.next()); // Incomplete, rather than EOF because we have not marked
// the stream complete.

// Now there's an empty string after the int
reader.append_bytes(&[0x80])?;
reader.stream_complete();
expect_value(reader.next(), IonType::String);
expect_eof(reader.next());

Ok(())
}
Expand All @@ -1545,7 +1561,7 @@ mod tests {
0x21, 0x02, // 2
0x21, 0x03, // 3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
assert_eq!(reader.read_int()?, Int::I64(1));
expect_value(reader.next(), IonType::Int);
Expand All @@ -1564,7 +1580,7 @@ mod tests {
0x48, 0x40, 0x92, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, // 1.2e3
0x48, 0xc0, 0x20, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // -8.125e0
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Float);
assert_eq!(reader.read_f64()?, 5.5f64);
expect_value(reader.next(), IonType::Float);
Expand All @@ -1584,7 +1600,7 @@ mod tests {
0x52, 0x80, 0xe4, // -100.
0x52, 0x80, 0x1c, // 28.
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Decimal);
assert_eq!(reader.read_decimal()?, Decimal::new(0, 0));
expect_value(reader.next(), IonType::Decimal);
Expand Down Expand Up @@ -1612,7 +1628,7 @@ mod tests {
0xbb, // 2022-06-09T22:59:59.000+00:00
0xbb, 0xc3,
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Timestamp);
assert_eq!(
reader.read_timestamp()?,
Expand Down Expand Up @@ -1674,7 +1690,7 @@ mod tests {
0x71, 0x02, // $2
0x72, 0x00, 0x03, // inefficiently encoded $3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Symbol);
assert_eq!(reader.read_symbol_id()?, 0);
expect_value(reader.next(), IonType::Symbol);
Expand All @@ -1696,7 +1712,7 @@ mod tests {
0x83, 0x62, 0x61, 0x72, // "bar"
0x83, 0x62, 0x61, 0x7a, // "baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::String);
assert_eq!(reader.read_str()?, "");
expect_value(reader.next(), IonType::String);
Expand All @@ -1718,7 +1734,7 @@ mod tests {
0x93, 0x62, 0x61, 0x72, // b"bar"
0x93, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Clob);
assert_eq!(reader.read_clob_bytes()?, b"");
expect_value(reader.next(), IonType::Clob);
Expand All @@ -1740,7 +1756,7 @@ mod tests {
0xA3, 0x62, 0x61, 0x72, // b"bar"
0xA3, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Blob);
assert_eq!(reader.read_blob_bytes()?, b"");
expect_value(reader.next(), IonType::Blob);
Expand All @@ -1761,7 +1777,7 @@ mod tests {
0xE4, 0x81, 0x85, 0x21, 0x02, // $5::2
0xE6, 0x83, 0x86, 0x87, 0x88, 0x21, 0x03, // $6::$7::$8::3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());

expect_value(reader.next(), IonType::Int);
expect_annotations(&reader, [4]);
Expand Down Expand Up @@ -2046,4 +2062,38 @@ mod tests {
);
Ok(())
}

#[test]
fn test_incomplete_annotation_wrapper() -> IonResult<()> {
// This test ensures that if we reach the end of the buffer while processing the annotation
// wrapper, we do not try to consume beyond the buffer's limits. Instead, we should get an
// incomplete error so that the data can be skipped properly.

let ion_data = &[
// First top-level value in the stream
0xDB, // 11-byte struct
0x8B, // Field ID 11
0xB6, // 6-byte List
0x21, 0x01, // Integer 1
0x21, 0x02, // Integer 2
0x21, 0x03, // Integer 3
0x8A, // Field ID 10
0x21, 0x01, // Integer 1
// Second top-level value in the stream
0xE3, // 3-byte annotations envelope
0x81, // * Annotations themselves take 1 byte (buffer read stops here)
0x8C, // * Annotation w/SID $12
0x10, // Boolean false
];

let mut cursor = RawBinaryReader::new(&ion_data[0..14]);
assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
match cursor.next() {
Err(IonError::Incomplete { .. }) => (),
Err(_) => panic!("Unexpected error"),
Ok(_) => panic!("Successful parse of incomplete data."),
}

Ok(())
}
}
37 changes: 36 additions & 1 deletion src/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<R: BufferedRawReader, T: ToIonDataSource> BlockingRawReader<R, T> {
loop {
let n = self.reader.read_from(&mut self.source, length)?;
bytes_read += n;
if n == 0 || bytes_read + n >= length {
if n == 0 || bytes_read >= length {
break;
}
}
Expand Down Expand Up @@ -536,4 +536,39 @@ mod tests {
);
Ok(())
}

#[test]
fn test_binary_end_of_stream() -> IonResult<()> {
// This test is to ensure that the non-blocking binary reader is honoring the end of stream
// flag, and that the blocking reader is making use of it. A previous bug existed where the
// binary reader was not using the end of stream flag, and ending a read on a data boundary
// would result in the blocking reader not providing any more data, since it relies on
// Incomplete errors to do so.

// {$11: [1, 2, 3], $10: 1}
let ion_data: &[u8] = &[
// First top-level value in the stream
0xDB, // 11-byte struct
0x8B, // Field ID 11
0xB6, // 6-byte List
0x21, 0x01, // Integer 1
0x21, 0x02, // Integer 2
0x21, 0x03, // Integer 3
0x8A, // Field ID 10
0x21, 0x01, // Integer 1
// Second top-level value in the stream
0xE3, // 3-byte annotations envelope
0x81, // * Annotations themselves take 1 byte
0x8C, // * Annotation w/SID $12
0x10, // Boolean false
];
// Create a blocking reader with an initial buffer size of 12, so that we can read exactly
// the first value. If EOS is not honored, our second call to `next` should result in no
// value being read, since the blocking reader would not know to provide more data.
let mut cursor = BlockingRawBinaryReader::new_with_size(ion_data.to_owned(), 12)?;
assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);

Ok(())
}
}
19 changes: 19 additions & 0 deletions src/raw_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,22 @@ pub trait BufferedRawReader: RawReader + From<Vec<u8>> {
fn stream_complete(&mut self);
fn is_stream_complete(&self) -> bool;
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}
21 changes: 1 addition & 20 deletions src/text/non_blocking/raw_text_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::element::{Blob, Clob};
use crate::types::Str;
use nom::Err::{Error, Failure, Incomplete};

use crate::raw_reader::{BufferedRawReader, RawStreamItem};
use crate::raw_reader::{BufferedRawReader, Expandable, RawStreamItem};
use crate::raw_symbol_token::RawSymbolToken;
use crate::result::{
decoding_error, illegal_operation, illegal_operation_raw, incomplete_text_error, IonError,
Expand Down Expand Up @@ -101,25 +101,6 @@ impl From<Vec<u8>> for RawTextReader<Vec<u8>> {
}
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}

impl<A: AsRef<[u8]> + Expandable> RawTextReader<A> {
pub fn new(input: A) -> RawTextReader<A> {
let expandable = input.expandable();
Expand Down