Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple blocking binary reader issues #552

Merged
merged 5 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/binary/non_blocking/binary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl BinaryBuffer<Vec<u8>> {
// Make sure that there are `length` bytes in the `Vec` beyond `self.end`.
self.reserve_capacity(length);
// Get a mutable slice to the first `length` bytes starting at `self.end`.
let read_buffer = &mut self.data.as_mut_slice()[self.end..length];
let read_buffer = &mut self.data.as_mut_slice()[self.end..(self.end + length)];
// Use that slice as our input buffer to read from the source.
let bytes_read = source.read(read_buffer)?;
// Update `self.end` to reflect that we have more data available to read.
Expand All @@ -507,9 +507,7 @@ impl BinaryBuffer<Vec<u8>> {
// For now, it is unlikely that this would happen often.
let capacity = self.data.len() - self.end;
if capacity < length {
for _ in 0..(length - capacity) {
self.data.push(0);
}
self.data.resize(self.data.len() + length - capacity, 0);
}
}
}
Expand Down
60 changes: 38 additions & 22 deletions src/binary/non_blocking/raw_binary_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::binary::non_blocking::type_descriptor::{Header, TypeDescriptor};
use crate::binary::uint::DecodedUInt;
use crate::binary::var_uint::VarUInt;
use crate::binary::IonTypeCode;
use crate::raw_reader::BufferedRawReader;
use crate::raw_reader::{BufferedRawReader, Expandable};
use crate::result::{
decoding_error, decoding_error_raw, illegal_operation, illegal_operation_raw,
incomplete_data_error,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Container {
/// Note that if the buffer runs out of data between top level values, this will be interpreted
/// as the end of the stream. Applications can still add more data to the buffer and resume reading.
#[derive(Debug)]
pub struct RawBinaryReader<A: AsRef<[u8]>> {
pub struct RawBinaryReader<A: AsRef<[u8]> + Expandable> {
ion_version: (u8, u8),
state: ReaderState,
buffer: BinaryBuffer<A>,
Expand Down Expand Up @@ -351,15 +351,16 @@ impl From<Vec<u8>> for RawBinaryReader<Vec<u8>> {
}
}

impl<A: AsRef<[u8]>> RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> RawBinaryReader<A> {
/// Constructs a RawBinaryReader from a value that can be viewed as a byte slice.
pub fn new(source: A) -> RawBinaryReader<A> {
let expandable = source.expandable();
RawBinaryReader {
ion_version: (1, 0),
state: ReaderState::Ready,
buffer: BinaryBuffer::new(source),
parents: Vec::new(), // Does not allocate yet
is_eos: false,
is_eos: !expandable,
}
}

Expand Down Expand Up @@ -704,7 +705,7 @@ impl<A: AsRef<[u8]>> RawBinaryReader<A> {
}
}

impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
impl<A: AsRef<[u8]> + Expandable> IonReader for RawBinaryReader<A> {
type Item = RawStreamItem;
type Symbol = RawSymbolToken;

Expand Down Expand Up @@ -745,10 +746,18 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
return Ok(RawStreamItem::Nothing);
}
} else {
// We're at the top level. If we're out of data (`buffer.is_empty()`) and aren't waiting
// on more data (`Skipping(n)`), we return `Nothing` to indicate that we're at EOF.
// We're at the top level. If we are out of data, then we need to determine if we
// are at the end of the stream or not. If not, then we need to signal an
// incomplete error, otherwise we can return Nothing to indicate that we are done.
if self.buffer.is_empty() && self.state == ReaderState::Ready {
return Ok(RawStreamItem::Nothing);
if self.is_eos {
return Ok(RawStreamItem::Nothing);
} else {
return incomplete_data_error(
"ahead to next item",
self.buffer.total_consumed(),
);
}
}
}
}
Expand All @@ -762,7 +771,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// parsing will create a fresh transaction reader starting from the last good state.
let mut tx_reader = self.transaction_reader();

let item_result = tx_reader.read_next_item();
let item_result = tx_reader.read_next_item()?;
let nop_bytes_count = tx_reader.nop_bytes_count as usize;

// If we do not have enough bytes to materialize the next value, return an incomplete
Expand All @@ -782,7 +791,7 @@ impl<A: AsRef<[u8]>> IonReader for RawBinaryReader<A> {
// This guarantees that the first byte in the buffer is the first byte of the current item.
self.buffer.consume(nop_bytes_count);

item_result
Ok(item_result)
}

fn current(&self) -> Self::Item {
Expand Down Expand Up @@ -1374,6 +1383,10 @@ impl<'a, A: AsRef<[u8]>> TxReader<'a, A> {
return decoding_error("found an annotation wrapper with no value");
}

if annotations_length.value() > self.tx_buffer.remaining() {
return incomplete_data_error("an annotation wrapper", self.tx_buffer.total_consumed());
}

// Skip over the annotations sequence itself; the reader will return to it if/when the
// reader asks to iterate over those symbol IDs.
self.tx_buffer.consume(annotations_length.value());
Expand Down Expand Up @@ -1471,7 +1484,7 @@ mod tests {
}
}

fn expect_annotations<A: AsRef<[u8]>, I: IntoRawAnnotations>(
fn expect_annotations<A: AsRef<[u8]> + Expandable, I: IntoRawAnnotations>(
reader: &RawBinaryReader<A>,
annotations: I,
) {
Expand Down Expand Up @@ -1513,7 +1526,7 @@ mod tests {
#[test]
fn read_int_header() -> IonResult<()> {
let data = vec![0x21, 0x03];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
Ok(())
Expand All @@ -1529,11 +1542,14 @@ mod tests {
// This byte completes the int, but we still don't have another value to move to.
reader.append_bytes(&[0x03])?;
expect_value(reader.next(), IonType::Int);
expect_eof(reader.next());
expect_incomplete(reader.next()); // Incomplete, rather than EOF because we have not marked
// the stream complete.

// Now there's an empty string after the int
reader.append_bytes(&[0x80])?;
reader.stream_complete();
expect_value(reader.next(), IonType::String);
expect_eof(reader.next());

Ok(())
}
Expand All @@ -1545,7 +1561,7 @@ mod tests {
0x21, 0x02, // 2
0x21, 0x03, // 3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Int);
assert_eq!(reader.read_int()?, Int::I64(1));
expect_value(reader.next(), IonType::Int);
Expand All @@ -1564,7 +1580,7 @@ mod tests {
0x48, 0x40, 0x92, 0xc0, 0x00, 0x00, 0x00, 0x00, 0x00, // 1.2e3
0x48, 0xc0, 0x20, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, // -8.125e0
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Float);
assert_eq!(reader.read_f64()?, 5.5f64);
expect_value(reader.next(), IonType::Float);
Expand All @@ -1584,7 +1600,7 @@ mod tests {
0x52, 0x80, 0xe4, // -100.
0x52, 0x80, 0x1c, // 28.
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Decimal);
assert_eq!(reader.read_decimal()?, Decimal::new(0, 0));
expect_value(reader.next(), IonType::Decimal);
Expand Down Expand Up @@ -1612,7 +1628,7 @@ mod tests {
0xbb, // 2022-06-09T22:59:59.000+00:00
0xbb, 0xc3,
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Timestamp);
assert_eq!(
reader.read_timestamp()?,
Expand Down Expand Up @@ -1674,7 +1690,7 @@ mod tests {
0x71, 0x02, // $2
0x72, 0x00, 0x03, // inefficiently encoded $3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Symbol);
assert_eq!(reader.read_symbol_id()?, 0);
expect_value(reader.next(), IonType::Symbol);
Expand All @@ -1696,7 +1712,7 @@ mod tests {
0x83, 0x62, 0x61, 0x72, // "bar"
0x83, 0x62, 0x61, 0x7a, // "baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::String);
assert_eq!(reader.read_str()?, "");
expect_value(reader.next(), IonType::String);
Expand All @@ -1718,7 +1734,7 @@ mod tests {
0x93, 0x62, 0x61, 0x72, // b"bar"
0x93, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Clob);
assert_eq!(reader.read_clob_bytes()?, b"");
expect_value(reader.next(), IonType::Clob);
Expand All @@ -1740,7 +1756,7 @@ mod tests {
0xA3, 0x62, 0x61, 0x72, // b"bar"
0xA3, 0x62, 0x61, 0x7a, // b"baz"
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());
expect_value(reader.next(), IonType::Blob);
assert_eq!(reader.read_blob_bytes()?, b"");
expect_value(reader.next(), IonType::Blob);
Expand All @@ -1761,7 +1777,7 @@ mod tests {
0xE4, 0x81, 0x85, 0x21, 0x02, // $5::2
0xE6, 0x83, 0x86, 0x87, 0x88, 0x21, 0x03, // $6::$7::$8::3
];
let mut reader = RawBinaryReader::new(data);
let mut reader = RawBinaryReader::new(data.as_slice());

expect_value(reader.next(), IonType::Int);
expect_annotations(&reader, [4]);
Expand Down
2 changes: 1 addition & 1 deletion src/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<R: BufferedRawReader, T: ToIonDataSource> BlockingRawReader<R, T> {
loop {
let n = self.reader.read_from(&mut self.source, length)?;
bytes_read += n;
if n == 0 || bytes_read + n >= length {
if n == 0 || bytes_read >= length {
break;
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/raw_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,22 @@ pub trait BufferedRawReader: RawReader + From<Vec<u8>> {
fn stream_complete(&mut self);
fn is_stream_complete(&self) -> bool;
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}
21 changes: 1 addition & 20 deletions src/text/non_blocking/raw_text_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::element::{Blob, Clob};
use crate::types::Str;
use nom::Err::{Error, Failure, Incomplete};

use crate::raw_reader::{BufferedRawReader, RawStreamItem};
use crate::raw_reader::{BufferedRawReader, Expandable, RawStreamItem};
use crate::raw_symbol_token::RawSymbolToken;
use crate::result::{
decoding_error, illegal_operation, illegal_operation_raw, incomplete_text_error, IonError,
Expand Down Expand Up @@ -101,25 +101,6 @@ impl From<Vec<u8>> for RawTextReader<Vec<u8>> {
}
}

/// Provides a mechanism for identifying input types that allow adding more data.
/// This allows for some input-type oriented behaviors, like initializing the end of stream status
/// to true if we know more data can not be added.
pub trait Expandable {
fn expandable(&self) -> bool;
}

impl<T: AsRef<[u8]> + ?Sized> Expandable for &T {
fn expandable(&self) -> bool {
false
}
}

impl Expandable for Vec<u8> {
fn expandable(&self) -> bool {
true
}
}

impl<A: AsRef<[u8]> + Expandable> RawTextReader<A> {
pub fn new(input: A) -> RawTextReader<A> {
let expandable = input.expandable();
Expand Down