From cc68cb56012d40cf4f463c469163af69d021069a Mon Sep 17 00:00:00 2001 From: Richard Giliam Date: Tue, 16 May 2023 10:37:29 -0700 Subject: [PATCH 1/4] Fix multiple blocking binary reader issues - Fixes an issue where `read_from` would read fewer bytes than it was expected to. - Addded incomplete guard to an annotation consume that could cause an error when the buffer did not have enough data. - Fixed an issue where bytes read into a NBR were counted twice when determining if all data had been read. This probably wasn't an issue since `read_from` reads all of the data requested. - Implements stream completion for non-blocking binary reader. - Moves Expandable to the raw_reader module now that the binary raw reader is using it. --- src/binary/non_blocking/binary_buffer.rs | 6 +- src/binary/non_blocking/raw_binary_reader.rs | 60 +++++++++++++------- src/blocking_reader.rs | 2 +- src/raw_reader.rs | 19 +++++++ src/text/non_blocking/raw_text_reader.rs | 21 +------ 5 files changed, 61 insertions(+), 47 deletions(-) diff --git a/src/binary/non_blocking/binary_buffer.rs b/src/binary/non_blocking/binary_buffer.rs index e3c10b7d..2cab40fb 100644 --- a/src/binary/non_blocking/binary_buffer.rs +++ b/src/binary/non_blocking/binary_buffer.rs @@ -489,7 +489,7 @@ impl BinaryBuffer> { // Make sure that there are `length` bytes in the `Vec` beyond `self.end`. self.reserve_capacity(length); // Get a mutable slice to the first `length` bytes starting at `self.end`. - let read_buffer = &mut self.data.as_mut_slice()[self.end..length]; + let read_buffer = &mut self.data.as_mut_slice()[self.end..(self.end + length)]; // Use that slice as our input buffer to read from the source. let bytes_read = source.read(read_buffer)?; // Update `self.end` to reflect that we have more data available to read. @@ -507,9 +507,7 @@ impl BinaryBuffer> { // For now, it is unlikely that this would happen often. let capacity = self.data.len() - self.end; if capacity < length { - for _ in 0..(length - capacity) { - self.data.push(0); - } + self.data.resize(self.data.len() + length - capacity, 0); } } } diff --git a/src/binary/non_blocking/raw_binary_reader.rs b/src/binary/non_blocking/raw_binary_reader.rs index d06fae3c..08c2bc50 100644 --- a/src/binary/non_blocking/raw_binary_reader.rs +++ b/src/binary/non_blocking/raw_binary_reader.rs @@ -5,7 +5,7 @@ use crate::binary::non_blocking::type_descriptor::{Header, TypeDescriptor}; use crate::binary::uint::DecodedUInt; use crate::binary::var_uint::VarUInt; use crate::binary::IonTypeCode; -use crate::raw_reader::BufferedRawReader; +use crate::raw_reader::{BufferedRawReader, Expandable}; use crate::result::{ decoding_error, decoding_error_raw, illegal_operation, illegal_operation_raw, incomplete_data_error, @@ -314,7 +314,7 @@ impl Container { /// Note that if the buffer runs out of data between top level values, this will be interpreted /// as the end of the stream. Applications can still add more data to the buffer and resume reading. #[derive(Debug)] -pub struct RawBinaryReader> { +pub struct RawBinaryReader + Expandable> { ion_version: (u8, u8), state: ReaderState, buffer: BinaryBuffer, @@ -351,15 +351,16 @@ impl From> for RawBinaryReader> { } } -impl> RawBinaryReader { +impl + Expandable> RawBinaryReader { /// Constructs a RawBinaryReader from a value that can be viewed as a byte slice. pub fn new(source: A) -> RawBinaryReader { + let expandable = source.expandable(); RawBinaryReader { ion_version: (1, 0), state: ReaderState::Ready, buffer: BinaryBuffer::new(source), parents: Vec::new(), // Does not allocate yet - is_eos: false, + is_eos: !expandable, } } @@ -704,7 +705,7 @@ impl> RawBinaryReader { } } -impl> IonReader for RawBinaryReader { +impl + Expandable> IonReader for RawBinaryReader { type Item = RawStreamItem; type Symbol = RawSymbolToken; @@ -745,10 +746,18 @@ impl> IonReader for RawBinaryReader { return Ok(RawStreamItem::Nothing); } } else { - // We're at the top level. If we're out of data (`buffer.is_empty()`) and aren't waiting - // on more data (`Skipping(n)`), we return `Nothing` to indicate that we're at EOF. + // We're at the top level. If we are out of data, then we need to determine if we + // are at the end of the stream or not. If not, then we need to signal an + // incomplete error, otherwise we can return Nothing to indicate that we are done. if self.buffer.is_empty() && self.state == ReaderState::Ready { - return Ok(RawStreamItem::Nothing); + if self.is_eos { + return Ok(RawStreamItem::Nothing); + } else { + return incomplete_data_error( + "ahead to next item", + self.buffer.total_consumed(), + ); + } } } } @@ -762,7 +771,7 @@ impl> IonReader for RawBinaryReader { // parsing will create a fresh transaction reader starting from the last good state. let mut tx_reader = self.transaction_reader(); - let item_result = tx_reader.read_next_item(); + let item_result = tx_reader.read_next_item()?; let nop_bytes_count = tx_reader.nop_bytes_count as usize; // If we do not have enough bytes to materialize the next value, return an incomplete @@ -782,7 +791,7 @@ impl> IonReader for RawBinaryReader { // This guarantees that the first byte in the buffer is the first byte of the current item. self.buffer.consume(nop_bytes_count); - item_result + Ok(item_result) } fn current(&self) -> Self::Item { @@ -1374,6 +1383,10 @@ impl<'a, A: AsRef<[u8]>> TxReader<'a, A> { return decoding_error("found an annotation wrapper with no value"); } + if annotations_length.value() > self.tx_buffer.remaining() { + return incomplete_data_error("an annotation wrapper", self.tx_buffer.total_consumed()); + } + // Skip over the annotations sequence itself; the reader will return to it if/when the // reader asks to iterate over those symbol IDs. self.tx_buffer.consume(annotations_length.value()); @@ -1471,7 +1484,7 @@ mod tests { } } - fn expect_annotations, I: IntoRawAnnotations>( + fn expect_annotations + Expandable, I: IntoRawAnnotations>( reader: &RawBinaryReader, annotations: I, ) { @@ -1513,7 +1526,7 @@ mod tests { #[test] fn read_int_header() -> IonResult<()> { let data = vec![0x21, 0x03]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Int); expect_eof(reader.next()); Ok(()) @@ -1529,11 +1542,14 @@ mod tests { // This byte completes the int, but we still don't have another value to move to. reader.append_bytes(&[0x03])?; expect_value(reader.next(), IonType::Int); - expect_eof(reader.next()); + expect_incomplete(reader.next()); // Incomplete, rather than EOF because we have not marked + // the stream complete. // Now there's an empty string after the int reader.append_bytes(&[0x80])?; + reader.stream_complete(); expect_value(reader.next(), IonType::String); + expect_eof(reader.next()); Ok(()) } @@ -1545,7 +1561,7 @@ mod tests { 0x21, 0x02, // 2 0x21, 0x03, // 3 ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Int); assert_eq!(reader.read_int()?, Int::I64(1)); expect_value(reader.next(), IonType::Int); @@ -1564,7 +1580,7 @@ mod tests { 0x48, 0x40, 0x92, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, // 1.2e3 0x48, 0xc0, 0x20, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // -8.125e0 ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Float); assert_eq!(reader.read_f64()?, 5.5f64); expect_value(reader.next(), IonType::Float); @@ -1584,7 +1600,7 @@ mod tests { 0x52, 0x80, 0xe4, // -100. 0x52, 0x80, 0x1c, // 28. ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Decimal); assert_eq!(reader.read_decimal()?, Decimal::new(0, 0)); expect_value(reader.next(), IonType::Decimal); @@ -1612,7 +1628,7 @@ mod tests { 0xbb, // 2022-06-09T22:59:59.000+00:00 0xbb, 0xc3, ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Timestamp); assert_eq!( reader.read_timestamp()?, @@ -1674,7 +1690,7 @@ mod tests { 0x71, 0x02, // $2 0x72, 0x00, 0x03, // inefficiently encoded $3 ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Symbol); assert_eq!(reader.read_symbol_id()?, 0); expect_value(reader.next(), IonType::Symbol); @@ -1696,7 +1712,7 @@ mod tests { 0x83, 0x62, 0x61, 0x72, // "bar" 0x83, 0x62, 0x61, 0x7a, // "baz" ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::String); assert_eq!(reader.read_str()?, ""); expect_value(reader.next(), IonType::String); @@ -1718,7 +1734,7 @@ mod tests { 0x93, 0x62, 0x61, 0x72, // b"bar" 0x93, 0x62, 0x61, 0x7a, // b"baz" ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Clob); assert_eq!(reader.read_clob_bytes()?, b""); expect_value(reader.next(), IonType::Clob); @@ -1740,7 +1756,7 @@ mod tests { 0xA3, 0x62, 0x61, 0x72, // b"bar" 0xA3, 0x62, 0x61, 0x7a, // b"baz" ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Blob); assert_eq!(reader.read_blob_bytes()?, b""); expect_value(reader.next(), IonType::Blob); @@ -1761,7 +1777,7 @@ mod tests { 0xE4, 0x81, 0x85, 0x21, 0x02, // $5::2 0xE6, 0x83, 0x86, 0x87, 0x88, 0x21, 0x03, // $6::$7::$8::3 ]; - let mut reader = RawBinaryReader::new(data); + let mut reader = RawBinaryReader::new(data.as_slice()); expect_value(reader.next(), IonType::Int); expect_annotations(&reader, [4]); diff --git a/src/blocking_reader.rs b/src/blocking_reader.rs index b7903f02..1925e0b9 100644 --- a/src/blocking_reader.rs +++ b/src/blocking_reader.rs @@ -30,7 +30,7 @@ impl BlockingRawReader { loop { let n = self.reader.read_from(&mut self.source, length)?; bytes_read += n; - if n == 0 || bytes_read + n >= length { + if n == 0 || bytes_read >= length { break; } } diff --git a/src/raw_reader.rs b/src/raw_reader.rs index cf5ee613..74fb94e8 100644 --- a/src/raw_reader.rs +++ b/src/raw_reader.rs @@ -174,3 +174,22 @@ pub trait BufferedRawReader: RawReader + From> { fn stream_complete(&mut self); fn is_stream_complete(&self) -> bool; } + +/// Provides a mechanism for identifying input types that allow adding more data. +/// This allows for some input-type oriented behaviors, like initializing the end of stream status +/// to true if we know more data can not be added. +pub trait Expandable { + fn expandable(&self) -> bool; +} + +impl + ?Sized> Expandable for &T { + fn expandable(&self) -> bool { + false + } +} + +impl Expandable for Vec { + fn expandable(&self) -> bool { + true + } +} diff --git a/src/text/non_blocking/raw_text_reader.rs b/src/text/non_blocking/raw_text_reader.rs index 615e1a7f..57ac08e0 100644 --- a/src/text/non_blocking/raw_text_reader.rs +++ b/src/text/non_blocking/raw_text_reader.rs @@ -4,7 +4,7 @@ use crate::element::{Blob, Clob}; use crate::types::Str; use nom::Err::{Error, Failure, Incomplete}; -use crate::raw_reader::{BufferedRawReader, RawStreamItem}; +use crate::raw_reader::{BufferedRawReader, Expandable, RawStreamItem}; use crate::raw_symbol_token::RawSymbolToken; use crate::result::{ decoding_error, illegal_operation, illegal_operation_raw, incomplete_text_error, IonError, @@ -101,25 +101,6 @@ impl From> for RawTextReader> { } } -/// Provides a mechanism for identifying input types that allow adding more data. -/// This allows for some input-type oriented behaviors, like initializing the end of stream status -/// to true if we know more data can not be added. -pub trait Expandable { - fn expandable(&self) -> bool; -} - -impl + ?Sized> Expandable for &T { - fn expandable(&self) -> bool { - false - } -} - -impl Expandable for Vec { - fn expandable(&self) -> bool { - true - } -} - impl + Expandable> RawTextReader { pub fn new(input: A) -> RawTextReader { let expandable = input.expandable(); From c9a94e33b89706ab04999cef864aaaae90ce7de2 Mon Sep 17 00:00:00 2001 From: Richard Giliam Date: Thu, 25 May 2023 02:24:03 -0700 Subject: [PATCH 2/4] Add unit tests for previous binary reader fixes --- src/binary/non_blocking/binary_buffer.rs | 13 ++++++++ src/binary/non_blocking/raw_binary_reader.rs | 34 +++++++++++++++++++ src/blocking_reader.rs | 35 ++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/src/binary/non_blocking/binary_buffer.rs b/src/binary/non_blocking/binary_buffer.rs index 2cab40fb..cf6da61a 100644 --- a/src/binary/non_blocking/binary_buffer.rs +++ b/src/binary/non_blocking/binary_buffer.rs @@ -808,4 +808,17 @@ mod tests { .expect_err("This exceeded the configured max Int size."); Ok(()) } + + #[test] + fn validate_read_from_size() -> IonResult<()> { + // This test validates that the size of data we wish to read, is actually honored by + // read_from. A bug existed where the sub-slice of the buffer was calculated incorrectly, + // leading to the potential for failed reads, or increasingly smaller reads. + let mut buffer = BinaryBuffer::new(vec![0; 10]); + let new_data: &[u8] = &[0; 11]; + let bytes_read = buffer.read_from(new_data, 11)?; + assert_eq!(bytes_read, 11); + + Ok(()) + } } diff --git a/src/binary/non_blocking/raw_binary_reader.rs b/src/binary/non_blocking/raw_binary_reader.rs index 08c2bc50..bde9fd2e 100644 --- a/src/binary/non_blocking/raw_binary_reader.rs +++ b/src/binary/non_blocking/raw_binary_reader.rs @@ -2062,4 +2062,38 @@ mod tests { ); Ok(()) } + + #[test] + fn test_incomplete_annotation_wrapper() -> IonResult<()> { + // This test ensures that if we reach the end of the buffer while processing the annotation + // wrapper, we do not try to consume beyond the buffer's limits. Instead, we should get an + // incomplete error so that the data can be skipped properly. + + let ion_data = &[ + // First top-level value in the stream + 0xDB, // 11-byte struct + 0x8B, // Field ID 11 + 0xB6, // 6-byte List + 0x21, 0x01, // Integer 1 + 0x21, 0x02, // Integer 2 + 0x21, 0x03, // Integer 3 + 0x8A, // Field ID 10 + 0x21, 0x01, // Integer 1 + // Second top-level value in the stream + 0xE3, // 3-byte annotations envelope + 0x81, // * Annotations themselves take 1 byte (buffer read stops here) + 0x8C, // * Annotation w/SID $12 + 0x10, // Boolean false + ]; + + let mut cursor = RawBinaryReader::new(&ion_data[0..14]); + assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?); + match cursor.next() { + Err(IonError::Incomplete { .. }) => (), + Err(_) => assert!(false, "Unexpected error"), + Ok(_) => assert!(false, "Successful parse of incomplete data."), + } + + Ok(()) + } } diff --git a/src/blocking_reader.rs b/src/blocking_reader.rs index 1925e0b9..38b4a3c1 100644 --- a/src/blocking_reader.rs +++ b/src/blocking_reader.rs @@ -536,4 +536,39 @@ mod tests { ); Ok(()) } + + #[test] + fn test_binary_end_of_stream() -> IonResult<()> { + // This test is to ensure that the non-blocking binary reader is honoring the end of stream + // flag, and that the blocking reader is making use of it. A previous bug existed where the + // binary reader was not using the end of stream flag, and ending a read on a data boundary + // would result in the blocking reader not providing any more data, since it relies on + // Incomplete errors to do so. + + // {$11: [1, 2, 3], $10: 1} + let ion_data: &[u8] = &[ + // First top-level value in the stream + 0xDB, // 11-byte struct + 0x8B, // Field ID 11 + 0xB6, // 6-byte List + 0x21, 0x01, // Integer 1 + 0x21, 0x02, // Integer 2 + 0x21, 0x03, // Integer 3 + 0x8A, // Field ID 10 + 0x21, 0x01, // Integer 1 + // Second top-level value in the stream + 0xE3, // 3-byte annotations envelope + 0x81, // * Annotations themselves take 1 byte + 0x8C, // * Annotation w/SID $12 + 0x10, // Boolean false + ]; + // Create a blocking reader with an initial buffer size of 12, so that we can read exactly + // the first value. If EOS is not honored, our second call to `next` should result in no + // value being read, since the blocking reader would not know to provide more data. + let mut cursor = BlockingRawBinaryReader::new_with_size(ion_data.to_owned(), 12)?; + assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?); + assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?); + + Ok(()) + } } From 53b83d129d725b68dc53d26175ae8c489cdb89fe Mon Sep 17 00:00:00 2001 From: Richard Giliam Date: Thu, 25 May 2023 02:43:36 -0700 Subject: [PATCH 3/4] Address clippy checks --- src/binary/non_blocking/raw_binary_reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/binary/non_blocking/raw_binary_reader.rs b/src/binary/non_blocking/raw_binary_reader.rs index bde9fd2e..5118e245 100644 --- a/src/binary/non_blocking/raw_binary_reader.rs +++ b/src/binary/non_blocking/raw_binary_reader.rs @@ -2090,8 +2090,8 @@ mod tests { assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?); match cursor.next() { Err(IonError::Incomplete { .. }) => (), - Err(_) => assert!(false, "Unexpected error"), - Ok(_) => assert!(false, "Successful parse of incomplete data."), + Err(_) => panic!("Unexpected error"), + Ok(_) => panic!("Successful parse of incomplete data."), } Ok(()) From f45cc8b3c43820156ed2bb55e0afdf3284fe9852 Mon Sep 17 00:00:00 2001 From: Zack Slayton Date: Thu, 25 May 2023 13:30:24 -0400 Subject: [PATCH 4/4] Fixing typo --- src/binary/non_blocking/binary_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/binary/non_blocking/binary_buffer.rs b/src/binary/non_blocking/binary_buffer.rs index cf6da61a..cede2afd 100644 --- a/src/binary/non_blocking/binary_buffer.rs +++ b/src/binary/non_blocking/binary_buffer.rs @@ -811,7 +811,7 @@ mod tests { #[test] fn validate_read_from_size() -> IonResult<()> { - // This test validates that the size of data we wish to read, is actually honored by + // This test validates that the size of data we wish to read is actually honored by // read_from. A bug existed where the sub-slice of the buffer was calculated incorrectly, // leading to the potential for failed reads, or increasingly smaller reads. let mut buffer = BinaryBuffer::new(vec![0; 10]);