Skip to content

Commit

Permalink
Fix multiple blocking binary reader issues (#552)
Browse files Browse the repository at this point in the history
* Fix multiple blocking binary reader issues

- Fixes an issue where `read_from` would read fewer bytes than it was expected to.
- Addded incomplete guard to an annotation consume that could cause an error
  when the buffer did not have enough data.
- Fixed an issue where bytes read into a NBR were counted twice when determining
  if all data had been read. This probably wasn't an issue since `read_from`
  reads all of the data requested.
- Implements stream completion for non-blocking binary reader.
- Moves Expandable to the raw_reader module now that the binary raw reader is using it.

* Add unit tests for previous binary reader fixes
  • Loading branch information
nirosys committed May 25, 2023
1 parent 5e9fc59 commit 7eed0eb
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 47 deletions.
19 changes: 15 additions & 4 deletions src/binary/non_blocking/binary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl BinaryBuffer<Vec<u8>> {
// Make sure that there are `length` bytes in the `Vec` beyond `self.end`.
self.reserve_capacity(length);
// Get a mutable slice to the first `length` bytes starting at `self.end`.
let read_buffer = &mut self.data.as_mut_slice()[self.end..length];
let read_buffer = &mut self.data.as_mut_slice()[self.end..(self.end + length)];
// Use that slice as our input buffer to read from the source.
let bytes_read = source.read(read_buffer)?;
// Update `self.end` to reflect that we have more data available to read.
Expand All @@ -507,9 +507,7 @@ impl BinaryBuffer<Vec<u8>> {
// For now, it is unlikely that this would happen often.
let capacity = self.data.len() - self.end;
if capacity < length {
for _ in 0..(length - capacity) {
self.data.push(0);
}
self.data.resize(self.data.len() + length - capacity, 0);
}
}
}
Expand Down Expand Up @@ -810,4 +808,17 @@ mod tests {
.expect_err("This exceeded the configured max Int size.");
Ok(())
}

#[test]
fn validate_read_from_size() -> IonResult<()> {
// This test validates that the size of data we wish to read is actually honored by
// read_from. A bug existed where the sub-slice of the buffer was calculated incorrectly,
// leading to the potential for failed reads, or increasingly smaller reads.
let mut buffer = BinaryBuffer::new(vec![0; 10]);
let new_data: &[u8] = &[0; 11];
let bytes_read = buffer.read_from(new_data, 11)?;
assert_eq!(bytes_read, 11);

Ok(())
}
}
94 changes: 72 additions & 22 deletions src/binary/non_blocking/raw_binary_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::binary::non_blocking::type_descriptor::{Header, TypeDescriptor};
use crate::binary::uint::DecodedUInt;
use crate::binary::var_uint::VarUInt;
use crate::binary::IonTypeCode;
use crate::raw_reader::BufferedRawReader;
use crate::raw_reader::{BufferedRawReader, Expandable};
use crate::result::{
decoding_error, decoding_error_raw, illegal_operation, illegal_operation_raw,
incomplete_data_error,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Container {
/// Note that if the buffer runs out of data between top level values, this will be interpreted
/// as the end of the stream. Applications can still add more data to the buffer and resume reading.
#[derive(Debug)]
pub struct RawBinaryReader<A: AsRef<[u8]>> {
pub struct RawBinaryReader<A: AsRef<[u8]> + Expandable> {
ion_version: (u8, u8),
state: ReaderState,
buffer: BinaryBuffer<A>,
Expand Down Expand Up @@ -351,15 +351,16 @@ impl From<Vec<u8>> for RawBinaryReader<Vec<u8>> {
}
}

impl<A: AsRef<[u8]>> RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> RawBinaryReader<A> {
/// Constructs a RawBinaryReader from a value that can be viewed as a byte slice.
pub fn new(source: A) -> RawBinaryReader<A> {
let expandable = source.expandable();
RawBinaryReader {
ion_version: (1, 0),
state: ReaderState::Ready,
buffer: BinaryBuffer::new(source),
parents: Vec::new(), // Does not allocate yet
is_eos: false,
is_eos: !expandable,
}
}

Expand Down Expand Up @@ -704,7 +705,7 @@ impl<A: AsRef<[u8]>> RawBinaryReader<A> {
}
}

impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> IonReader for RawBinaryReader<A> {
type Item = RawStreamItem;
type Symbol = RawSymbolToken;

Expand Down Expand Up @@ -745,10 +746,18 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
return Ok(RawStreamItem::Nothing);
}
} else {
// We're at the top level. If we're out of data (`buffer.is_empty()`) and aren't waiting
// on more data (`Skipping(n)`), we return `Nothing` to indicate that we're at EOF.
// We're at the top level. If we are out of data, then we need to determine if we
// are at the end of the stream or not. If not, then we need to signal an
// incomplete error, otherwise we can return Nothing to indicate that we are done.
if self.buffer.is_empty() && self.state == ReaderState::Ready {
return Ok(RawStreamItem::Nothing);
if self.is_eos {
return Ok(RawStreamItem::Nothing);
} else {
return incomplete_data_error(
"ahead to next item",
self.buffer.total_consumed(),
);
}
}
}
}
Expand All @@ -762,7 +771,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// parsing will create a fresh transaction reader starting from the last good state.
let mut tx_reader = self.transaction_reader();

let item_result = tx_reader.read_next_item();
let item_result = tx_reader.read_next_item()?;
let nop_bytes_count = tx_reader.nop_bytes_count as usize;

// If we do not have enough bytes to materialize the next value, return an incomplete
Expand All @@ -782,7 +791,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// This guarantees that the first byte in the buffer is the first byte of the current item.
self.buffer.consume(nop_bytes_count);

item_result
Ok(item_result)
}

fn current(&self) -> Self::Item {
Expand Down Expand Up @@ -1374,6 +1383,10 @@ impl<'a, A: AsRef<[u8]>> TxReader<'a, A> {
return decoding_error("found an annotation wrapper with no value");
}

if annotations_length.value() > self.tx_buffer.remaining() {
return incomplete_data_error("an annotation wrapper", self.tx_buffer.total_consumed());
}

// Skip over the annotations sequence itself; the reader will return to it if/when the
// reader asks to iterate over those symbol IDs.
self.tx_buffer.consume(annotations_length.value());
Expand Down Expand Up @@ -1471,7 +1484,7 @@ mod tests {
}
}

fn expect_annotations<A: AsRef<[u8]>, I: IntoRawAnnotations>(
fn expect_annotations<A: AsRef<[u8]> + Expandable, I: IntoRawAnnotations>(
reader: &RawBinaryReader<A>,
annotations: I,
) {
Expand Down Expand Up @@ -1513,7 +1526,7 @@ mod tests {
#[test]
fn read_int_header() -> IonResult<()> {
let data = vec![0x21, 0x03];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
Ok(())
Expand All @@ -1529,11 +1542,14 @@ mod tests {
// This byte completes the int, but we still don't have another value to move to.
reader.append_bytes(&[0x03])?;
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
expect_incomplete(reader.next()); // Incomplete, rather than EOF because we have not marked
// the stream complete.

// Now there's an empty string after the int
reader.append_bytes(&[0x80])?;
reader.stream_complete();
expect_value(reader.next(), IonType::String);
expect_eof(reader.next());

Ok(())
}
Expand All @@ -1545,7 +1561,7 @@ mod tests {
0x21, 0x02, // 2
0x21, 0x03, // 3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
assert_eq!(reader.read_int()?, Int::I64(1));
expect_value(reader.next(), IonType::Int);
Expand All @@ -1564,7 +1580,7 @@ mod tests {
0x48, 0x40, 0x92, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, // 1.2e3
0x48, 0xc0, 0x20, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // -8.125e0
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Float);
assert_eq!(reader.read_f64()?, 5.5f64);
expect_value(reader.next(), IonType::Float);
Expand All @@ -1584,7 +1600,7 @@ mod tests {
0x52, 0x80, 0xe4, // -100.
0x52, 0x80, 0x1c, // 28.
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Decimal);
assert_eq!(reader.read_decimal()?, Decimal::new(0, 0));
expect_value(reader.next(), IonType::Decimal);
Expand Down Expand Up @@ -1612,7 +1628,7 @@ mod tests {
0xbb, // 2022-06-09T22:59:59.000+00:00
0xbb, 0xc3,
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Timestamp);
assert_eq!(
reader.read_timestamp()?,
Expand Down Expand Up @@ -1674,7 +1690,7 @@ mod tests {
0x71, 0x02, // $2
0x72, 0x00, 0x03, // inefficiently encoded $3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Symbol);
assert_eq!(reader.read_symbol_id()?, 0);
expect_value(reader.next(), IonType::Symbol);
Expand All @@ -1696,7 +1712,7 @@ mod tests {
0x83, 0x62, 0x61, 0x72, // "bar"
0x83, 0x62, 0x61, 0x7a, // "baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::String);
assert_eq!(reader.read_str()?, "");
expect_value(reader.next(), IonType::String);
Expand All @@ -1718,7 +1734,7 @@ mod tests {
0x93, 0x62, 0x61, 0x72, // b"bar"
0x93, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Clob);
assert_eq!(reader.read_clob_bytes()?, b"");
expect_value(reader.next(), IonType::Clob);
Expand All @@ -1740,7 +1756,7 @@ mod tests {
0xA3, 0x62, 0x61, 0x72, // b"bar"
0xA3, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Blob);
assert_eq!(reader.read_blob_bytes()?, b"");
expect_value(reader.next(), IonType::Blob);
Expand All @@ -1761,7 +1777,7 @@ mod tests {
0xE4, 0x81, 0x85, 0x21, 0x02, // $5::2
0xE6, 0x83, 0x86, 0x87, 0x88, 0x21, 0x03, // $6::$7::$8::3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());

expect_value(reader.next(), IonType::Int);
expect_annotations(&reader, [4]);
Expand Down Expand Up @@ -2046,4 +2062,38 @@ mod tests {
);
Ok(())
}

#[test]
fn test_incomplete_annotation_wrapper() -> IonResult<()> {
// This test ensures that if we reach the end of the buffer while processing the annotation
// wrapper, we do not try to consume beyond the buffer's limits. Instead, we should get an
// incomplete error so that the data can be skipped properly.

let ion_data = &[
// First top-level value in the stream
0xDB, // 11-byte struct
0x8B, // Field ID 11
0xB6, // 6-byte List
0x21, 0x01, // Integer 1
0x21, 0x02, // Integer 2
0x21, 0x03, // Integer 3
0x8A, // Field ID 10
0x21, 0x01, // Integer 1
// Second top-level value in the stream
0xE3, // 3-byte annotations envelope
0x81, // * Annotations themselves take 1 byte (buffer read stops here)
0x8C, // * Annotation w/SID $12
0x10, // Boolean false
];

let mut cursor = RawBinaryReader::new(&ion_data[0..14]);
assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
match cursor.next() {
Err(IonError::Incomplete { .. }) => (),
Err(_) => panic!("Unexpected error"),
Ok(_) => panic!("Successful parse of incomplete data."),
}

Ok(())
}
}
37 changes: 36 additions & 1 deletion src/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<R: BufferedRawReader, T: ToIonDataSource> BlockingRawReader<R, T> {
loop {
let n = self.reader.read_from(&mut self.source, length)?;
bytes_read += n;
if n == 0 || bytes_read + n >= length {
if n == 0 || bytes_read >= length {
break;
}
}
Expand Down Expand Up @@ -536,4 +536,39 @@ mod tests {
);
Ok(())
}

#[test]
fn test_binary_end_of_stream() -> IonResult<()> {
// This test is to ensure that the non-blocking binary reader is honoring the end of stream
// flag, and that the blocking reader is making use of it. A previous bug existed where the
// binary reader was not using the end of stream flag, and ending a read on a data boundary
// would result in the blocking reader not providing any more data, since it relies on
// Incomplete errors to do so.

// {$11: [1, 2, 3], $10: 1}
let ion_data: &[u8] = &[
// First top-level value in the stream
0xDB, // 11-byte struct
0x8B, // Field ID 11
0xB6, // 6-byte List
0x21, 0x01, // Integer 1
0x21, 0x02, // Integer 2
0x21, 0x03, // Integer 3
0x8A, // Field ID 10
0x21, 0x01, // Integer 1
// Second top-level value in the stream
0xE3, // 3-byte annotations envelope
0x81, // * Annotations themselves take 1 byte
0x8C, // * Annotation w/SID $12
0x10, // Boolean false
];
// Create a blocking reader with an initial buffer size of 12, so that we can read exactly
// the first value. If EOS is not honored, our second call to `next` should result in no
// value being read, since the blocking reader would not know to provide more data.
let mut cursor = BlockingRawBinaryReader::new_with_size(ion_data.to_owned(), 12)?;
assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);

Ok(())
}
}
19 changes: 19 additions & 0 deletions src/raw_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,22 @@ pub trait BufferedRawReader: RawReader + From<Vec<u8>> {
fn stream_complete(&mut self);
fn is_stream_complete(&self) -> bool;
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}
21 changes: 1 addition & 20 deletions src/text/non_blocking/raw_text_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::element::{Blob, Clob};
use crate::types::Str;
use nom::Err::{Error, Failure, Incomplete};

use crate::raw_reader::{BufferedRawReader, RawStreamItem};
use crate::raw_reader::{BufferedRawReader, Expandable, RawStreamItem};
use crate::raw_symbol_token::RawSymbolToken;
use crate::result::{
decoding_error, illegal_operation, illegal_operation_raw, incomplete_text_error, IonError,
Expand Down Expand Up @@ -101,25 +101,6 @@ impl From<Vec<u8>> for RawTextReader<Vec<u8>> {
}
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}

impl<A: AsRef<[u8]> + Expandable> RawTextReader<A> {
pub fn new(input: A) -> RawTextReader<A> {
let expandable = input.expandable();
Expand Down

0 comments on commit 7eed0eb

Please sign in to comment.