From 944511591310f7e273074eaa252f6cd37b5728bb Mon Sep 17 00:00:00 2001 From: Jarred Nicholls Date: Thu, 11 Aug 2022 21:05:14 -0400 Subject: [PATCH] Add a ToIonDataSource implementation for std::io::Cursor and references to byte arrays. (#406) --- src/data_source.rs | 109 +++++++++++++++++---------------------------- 1 file changed, 42 insertions(+), 67 deletions(-) diff --git a/src/data_source.rs b/src/data_source.rs index 9153333a..916ac840 100644 --- a/src/data_source.rs +++ b/src/data_source.rs @@ -23,38 +23,6 @@ use crate::result::{decoding_error, IonError, IonResult}; /// byte array. pub trait IonDataSource: BufRead { /// Ignore the next `number_of_bytes` bytes in the data source. - fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()>; - - /// Returns the next byte in the data source if available. - fn next_byte(&mut self) -> IonResult>; - - /// Calls `byte_processor` on each byte in the data source until it returns false. - /// Returns the number of bytes that were read and processed. - fn read_next_byte_while(&mut self, byte_processor: &mut F) -> IonResult - where - F: FnMut(u8) -> bool; - - /// Calls `slice_processor` on a slice containing the next `length` bytes from the - /// data source. If the required bytes are already in the input buffer, a reference to that - /// slice of the input buffer will be used. If they are not, the required bytes will be read - /// into `fallback_buffer` and that will be used instead. If `fallback_buffer` does not have - /// enough capacity to store the requested data, it will be resized. It will never be shrunk, - /// however--it is the caller's responsibility to manage this memory. - fn read_slice( - &mut self, - length: usize, - fallback_buffer: &mut Vec, - slice_processor: F, - ) -> IonResult - where - F: FnOnce(&[u8]) -> IonResult; -} - -// Allows all implementations of `BufRead` to be used as an IonDataSource, including BufReader -// and io::Cursor. -impl IonDataSource for T { - // Moves the cursor within the input buffer until `number_of_bytes` bytes have been skipped. - // Will read from the underlying data source as needed. fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()> { let mut bytes_skipped = 0; while bytes_skipped < number_of_bytes { @@ -72,9 +40,7 @@ impl IonDataSource for T { Ok(()) } - // Returns the next byte in the input buffer if one is available. Otherwise reads one from the - // underlying data source. - #[inline(always)] + /// Returns the next byte in the data source if available. fn next_byte(&mut self) -> IonResult> { match self.fill_buf()?.first() { Some(&byte) => { @@ -85,23 +51,8 @@ impl IonDataSource for T { } } - // Some data types in the binary Ion spec have a length that must be discovered by reading a - // single byte at a time from the data source. Simply calling - // [io::Read::read](https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read) - // or iterating over the data source's input bytes using - // [io::Read::bytes](https://doc.rust-lang.org/std/io/trait.Read.html#method.bytes) - // requires copying and error handling to be performed for each byte read, causing these methods - // to be prohibitively expensive. - // - // This method directly accesses the data source's input buffer, allowing us to process - // the bytes in-place. No error handling is performed unless the end of the buffer is reached, - // requiring us to perform a single read() call to refill it. - // - // For more information, read the documentation for the BufRead trait's - // [fill_buf](https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.fill_buf) - // and - // [consume](https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.consume) - // methods. + /// Calls `byte_processor` on each byte in the data source until it returns false. + /// Returns the number of bytes that were read and processed. fn read_next_byte_while(&mut self, byte_processor: &mut F) -> IonResult where F: FnMut(u8) -> bool, @@ -143,23 +94,26 @@ impl IonDataSource for T { } } - // Like `read_next_byte_while`, this method will prefer to process the next `number_of_bytes` - // bytes without copying them out of the input buffer. It can be used to process any Ion value - // of a known size. - fn read_slice( + /// Calls `slice_processor` on a slice containing the next `length` bytes from the + /// data source. If the required bytes are already in the input buffer, a reference to that + /// slice of the input buffer will be used. If they are not, the required bytes will be read + /// into `fallback_buffer` and that will be used instead. If `fallback_buffer` does not have + /// enough capacity to store the requested data, it will be resized. It will never be shrunk, + /// however--it is the caller's responsibility to manage this memory. + fn read_slice( &mut self, - number_of_bytes: usize, + length: usize, fallback_buffer: &mut Vec, slice_processor: F, - ) -> IonResult + ) -> IonResult where - F: FnOnce(&[u8]) -> IonResult, + F: FnOnce(&[u8]) -> IonResult, { // Get a reference to the data source's input buffer, refilling it if it's empty. let buffer = self.fill_buf()?; // If the buffer is still empty, we've run out of data. - if buffer.is_empty() && number_of_bytes > 0 { + if buffer.is_empty() && length > 0 { // TODO: IonResult should have a distinct `IncompleteData` error case // https://github.com/amzn/ion-rust/issues/299 return decoding_error("Unexpected end of stream."); @@ -168,21 +122,21 @@ impl IonDataSource for T { // If the requested value is already in our input buffer, there's no need to copy it out // into a separate buffer. We can return a slice of the input buffer and consume() that // number of bytes. - if buffer.len() >= number_of_bytes { - let result = slice_processor(&buffer[..number_of_bytes]); - self.consume(number_of_bytes); + if buffer.len() >= length { + let result = slice_processor(&buffer[..length]); + self.consume(length); return result; } // Grow the Vec to accommodate the requested data if needed - let buffer: &mut [u8] = if fallback_buffer.len() < number_of_bytes { - fallback_buffer.resize(number_of_bytes, 0); + let buffer: &mut [u8] = if fallback_buffer.len() < length { + fallback_buffer.resize(length, 0); // Get a mutable reference to the underlying byte array fallback_buffer.as_mut() } else { // Otherwise, split the Vec's underlying storage to get a &mut [u8] slice of the // required size - let (required_buffer, _) = fallback_buffer.split_at_mut(number_of_bytes); + let (required_buffer, _) = fallback_buffer.split_at_mut(length); required_buffer }; @@ -200,6 +154,8 @@ impl IonDataSource for T { } } +impl IonDataSource for T where T: BufRead {} + #[cfg(test)] mod tests { use super::IonDataSource; @@ -309,7 +265,7 @@ mod tests { /// allowing users to build a [Reader] from a variety of types that might not define I/O operations /// on their own. pub trait ToIonDataSource { - type DataSource: BufRead; + type DataSource: IonDataSource; fn to_ion_data_source(self) -> Self::DataSource; } @@ -337,6 +293,14 @@ impl<'a> ToIonDataSource for &'a [u8] { } } +impl<'a, const N: usize> ToIonDataSource for &'a [u8; N] { + type DataSource = io::Cursor; + + fn to_ion_data_source(self) -> Self::DataSource { + io::Cursor::new(self) + } +} + impl ToIonDataSource for Vec { type DataSource = io::Cursor; @@ -353,6 +317,17 @@ impl ToIonDataSource for io::Chain { } } +impl ToIonDataSource for io::Cursor +where + T: AsRef<[u8]>, +{ + type DataSource = Self; + + fn to_ion_data_source(self) -> Self::DataSource { + self + } +} + impl ToIonDataSource for BufReader { type DataSource = Self;