Skip to content

Commit

Permalink
Add a ToIonDataSource implementation for std::io::Cursor and referenc…
Browse files Browse the repository at this point in the history
…es to byte arrays. (#406)
  • Loading branch information
jnicholls committed Aug 12, 2022
1 parent cebec11 commit 9445115
Showing 1 changed file with 42 additions and 67 deletions.
109 changes: 42 additions & 67 deletions src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,6 @@ use crate::result::{decoding_error, IonError, IonResult};
/// byte array.
pub trait IonDataSource: BufRead {
/// Ignore the next `number_of_bytes` bytes in the data source.
fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()>;

/// Returns the next byte in the data source if available.
fn next_byte(&mut self) -> IonResult<Option<u8>>;

/// Calls `byte_processor` on each byte in the data source until it returns false.
/// Returns the number of bytes that were read and processed.
fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
where
F: FnMut(u8) -> bool;

/// Calls `slice_processor` on a slice containing the next `length` bytes from the
/// data source. If the required bytes are already in the input buffer, a reference to that
/// slice of the input buffer will be used. If they are not, the required bytes will be read
/// into `fallback_buffer` and that will be used instead. If `fallback_buffer` does not have
/// enough capacity to store the requested data, it will be resized. It will never be shrunk,
/// however--it is the caller's responsibility to manage this memory.
fn read_slice<T, F>(
&mut self,
length: usize,
fallback_buffer: &mut Vec<u8>,
slice_processor: F,
) -> IonResult<T>
where
F: FnOnce(&[u8]) -> IonResult<T>;
}

// Allows all implementations of `BufRead` to be used as an IonDataSource, including BufReader
// and io::Cursor.
impl<T: BufRead> IonDataSource for T {
// Moves the cursor within the input buffer until `number_of_bytes` bytes have been skipped.
// Will read from the underlying data source as needed.
fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()> {
let mut bytes_skipped = 0;
while bytes_skipped < number_of_bytes {
Expand All @@ -72,9 +40,7 @@ impl<T: BufRead> IonDataSource for T {
Ok(())
}

// Returns the next byte in the input buffer if one is available. Otherwise reads one from the
// underlying data source.
#[inline(always)]
/// Returns the next byte in the data source if available.
fn next_byte(&mut self) -> IonResult<Option<u8>> {
match self.fill_buf()?.first() {
Some(&byte) => {
Expand All @@ -85,23 +51,8 @@ impl<T: BufRead> IonDataSource for T {
}
}

// Some data types in the binary Ion spec have a length that must be discovered by reading a
// single byte at a time from the data source. Simply calling
// [io::Read::read](https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read)
// or iterating over the data source's input bytes using
// [io::Read::bytes](https://doc.rust-lang.org/std/io/trait.Read.html#method.bytes)
// requires copying and error handling to be performed for each byte read, causing these methods
// to be prohibitively expensive.
//
// This method directly accesses the data source's input buffer, allowing us to process
// the bytes in-place. No error handling is performed unless the end of the buffer is reached,
// requiring us to perform a single read() call to refill it.
//
// For more information, read the documentation for the BufRead trait's
// [fill_buf](https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.fill_buf)
// and
// [consume](https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.consume)
// methods.
/// Calls `byte_processor` on each byte in the data source until it returns false.
/// Returns the number of bytes that were read and processed.
fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
where
F: FnMut(u8) -> bool,
Expand Down Expand Up @@ -143,23 +94,26 @@ impl<T: BufRead> IonDataSource for T {
}
}

// Like `read_next_byte_while`, this method will prefer to process the next `number_of_bytes`
// bytes without copying them out of the input buffer. It can be used to process any Ion value
// of a known size.
fn read_slice<V, F>(
/// Calls `slice_processor` on a slice containing the next `length` bytes from the
/// data source. If the required bytes are already in the input buffer, a reference to that
/// slice of the input buffer will be used. If they are not, the required bytes will be read
/// into `fallback_buffer` and that will be used instead. If `fallback_buffer` does not have
/// enough capacity to store the requested data, it will be resized. It will never be shrunk,
/// however--it is the caller's responsibility to manage this memory.
fn read_slice<T, F>(
&mut self,
number_of_bytes: usize,
length: usize,
fallback_buffer: &mut Vec<u8>,
slice_processor: F,
) -> IonResult<V>
) -> IonResult<T>
where
F: FnOnce(&[u8]) -> IonResult<V>,
F: FnOnce(&[u8]) -> IonResult<T>,
{
// Get a reference to the data source's input buffer, refilling it if it's empty.
let buffer = self.fill_buf()?;

// If the buffer is still empty, we've run out of data.
if buffer.is_empty() && number_of_bytes > 0 {
if buffer.is_empty() && length > 0 {
// TODO: IonResult should have a distinct `IncompleteData` error case
// https://github.com/amzn/ion-rust/issues/299
return decoding_error("Unexpected end of stream.");
Expand All @@ -168,21 +122,21 @@ impl<T: BufRead> IonDataSource for T {
// If the requested value is already in our input buffer, there's no need to copy it out
// into a separate buffer. We can return a slice of the input buffer and consume() that
// number of bytes.
if buffer.len() >= number_of_bytes {
let result = slice_processor(&buffer[..number_of_bytes]);
self.consume(number_of_bytes);
if buffer.len() >= length {
let result = slice_processor(&buffer[..length]);
self.consume(length);
return result;
}

// Grow the Vec to accommodate the requested data if needed
let buffer: &mut [u8] = if fallback_buffer.len() < number_of_bytes {
fallback_buffer.resize(number_of_bytes, 0);
let buffer: &mut [u8] = if fallback_buffer.len() < length {
fallback_buffer.resize(length, 0);
// Get a mutable reference to the underlying byte array
fallback_buffer.as_mut()
} else {
// Otherwise, split the Vec's underlying storage to get a &mut [u8] slice of the
// required size
let (required_buffer, _) = fallback_buffer.split_at_mut(number_of_bytes);
let (required_buffer, _) = fallback_buffer.split_at_mut(length);
required_buffer
};

Expand All @@ -200,6 +154,8 @@ impl<T: BufRead> IonDataSource for T {
}
}

impl<T> IonDataSource for T where T: BufRead {}

#[cfg(test)]
mod tests {
use super::IonDataSource;
Expand Down Expand Up @@ -309,7 +265,7 @@ mod tests {
/// allowing users to build a [Reader] from a variety of types that might not define I/O operations
/// on their own.
pub trait ToIonDataSource {
type DataSource: BufRead;
type DataSource: IonDataSource;
fn to_ion_data_source(self) -> Self::DataSource;
}

Expand Down Expand Up @@ -337,6 +293,14 @@ impl<'a> ToIonDataSource for &'a [u8] {
}
}

impl<'a, const N: usize> ToIonDataSource for &'a [u8; N] {
type DataSource = io::Cursor<Self>;

fn to_ion_data_source(self) -> Self::DataSource {
io::Cursor::new(self)
}
}

impl ToIonDataSource for Vec<u8> {
type DataSource = io::Cursor<Self>;

Expand All @@ -353,6 +317,17 @@ impl<T: BufRead, U: BufRead> ToIonDataSource for io::Chain<T, U> {
}
}

impl<T> ToIonDataSource for io::Cursor<T>
where
T: AsRef<[u8]>,
{
type DataSource = Self;

fn to_ion_data_source(self) -> Self::DataSource {
self
}
}

impl<T: Read> ToIonDataSource for BufReader<T> {
type DataSource = Self;

Expand Down

0 comments on commit 9445115

Please sign in to comment.