Skip to content

Commit

Permalink
add try_read_message() and fix bug in no_std read impl
Browse files Browse the repository at this point in the history
  • Loading branch information
dwrensha committed May 26, 2020
1 parent 986e4f0 commit 18c2e0c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
11 changes: 5 additions & 6 deletions capnp/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,12 @@ mod no_std_impls {

impl <'a> Read for &'a [u8] {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
if buf.len() > self.len() {
return Err(Error::failed("buffer is not large enough".to_string()));
}
let (a, b) = self.split_at(buf.len());
buf.copy_from_slice(a);
let amt = core::cmp::min(buf.len(), self.len());
let (a, b) = self.split_at(amt);

buf[..amt].copy_from_slice(a);
*self = b;
Ok(buf.len())
Ok(amt)
}
}

Expand Down
62 changes: 50 additions & 12 deletions capnp/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! [standard stream framing](https://capnproto.org/encoding.html#serialization-over-a-stream),
//! where each message is preceded by a segment table indicating the size of its segments.

use alloc::string::ToString;
use alloc::vec::Vec;
use core::convert::TryInto;
use crate::io::{Read, Write};
Expand Down Expand Up @@ -67,7 +68,10 @@ pub fn read_message_from_flat_slice<'a>(slice: &mut &'a [u8],
let all_bytes = *slice;
let mut bytes = *slice;
let orig_bytes_len = bytes.len();
let segment_lengths_builder = read_segment_table(&mut bytes, options)?;
let segment_lengths_builder = match read_segment_table(&mut bytes, options)? {
Some(b) => b,
None => return Err(Error::failed("empty slice".to_string())),
};
let segment_table_bytes_len = orig_bytes_len - bytes.len();
assert_eq!(segment_table_bytes_len % BYTES_PER_WORD, 0);
let num_words = segment_lengths_builder.total_words();
Expand Down Expand Up @@ -180,24 +184,53 @@ impl SegmentLengthsBuilder {
/// For optimal performance, `read` should be a buffered reader type.
pub fn read_message<R>(mut read: R, options: message::ReaderOptions) -> Result<message::Reader<OwnedSegments>>
where R: Read {
let owned_segments_builder = read_segment_table(&mut read, options)?;
let owned_segments_builder = match read_segment_table(&mut read, options)? {
Some(b) => b,
None => return Err(Error::failed("Premature end of file".to_string())),
};
read_segments(&mut read, owned_segments_builder.into_owned_segments(), options)
}

/// Like read_message(), but returns None instead of an error if there are zero bytes left in `read`.
/// This is useful for reading a message stream of unknown length -- you call this function
/// until it returns None.
pub fn try_read_message<R>(mut read: R, options: message::ReaderOptions) -> Result<Option<message::Reader<OwnedSegments>>>
where R: Read {
let owned_segments_builder = match read_segment_table(&mut read, options)? {
Some(b) => b,
None => return Ok(None),
};
Ok(Some(read_segments(&mut read, owned_segments_builder.into_owned_segments(), options)?))
}

/// Reads a segment table from `read` and returns the total number of words across all
/// segments, as well as the segment offsets.
///
/// The segment table format for streams is defined in the Cap'n Proto
/// [encoding spec](https://capnproto.org/encoding.html)
fn read_segment_table<R>(read: &mut R,
options: message::ReaderOptions)
-> Result<SegmentLengthsBuilder>
-> Result<Option<SegmentLengthsBuilder>>
where R: Read
{
let mut buf: [u8; 8] = [0; 8];

// read the first Word, which contains segment_count and the 1st segment length
read.read_exact(&mut buf)?;
let mut read_count = 0;
while read_count < 8 {
let n = read.read(&mut buf)?;
if n == 0 {
if read_count == 0 {
// clean EOF on message boundary
return Ok(None)
} else {
// EOF in the middle of segment table
return Err(Error::failed("Premature end of file".to_string()))
}
}
read_count += n;
}

let segment_count = u32::from_le_bytes(buf[0..4].try_into().unwrap()).wrapping_add(1) as usize;

if segment_count >= 512 {
Expand Down Expand Up @@ -236,7 +269,7 @@ fn read_segment_table<R>(read: &mut R,
receiving end, see capnp::message::ReaderOptions.", segment_lengths_builder.total_words())))
}

Ok(segment_lengths_builder)
Ok(Some(segment_lengths_builder))
}

/// Reads segments from `read`.
Expand Down Expand Up @@ -384,7 +417,7 @@ pub mod test {

use crate::message;
use crate::message::ReaderSegments;
use super::{read_message, read_message_from_flat_slice, flatten_segments,
use super::{read_message, try_read_message, read_message_from_flat_slice, flatten_segments,
read_segment_table, write_segment_table, write_segments};

/// Writes segments as if they were a Capnproto message.
Expand All @@ -396,6 +429,12 @@ pub mod test {
write_segments(write, borrowed_segments).unwrap();
}

#[test]
fn try_read_empty() {
let mut buf: &[u8] = &[];
assert!(try_read_message(&mut buf, message::ReaderOptions::new()).unwrap().is_none());
}

#[test]
fn test_read_segment_table() {

Expand All @@ -405,7 +444,7 @@ pub mod test {
0,0,0,0] // 0 length
.iter().cloned());
let segment_lengths_builder = read_segment_table(&mut &buf[..],
message::ReaderOptions::new()).unwrap();
message::ReaderOptions::new()).unwrap().unwrap();
assert_eq!(0, segment_lengths_builder.total_words());
assert_eq!(vec![(0,0)], segment_lengths_builder.to_segment_indices());
buf.clear();
Expand All @@ -414,7 +453,7 @@ pub mod test {
1,0,0,0] // 1 length
.iter().cloned());
let segment_lengths_builder = read_segment_table(&mut &buf[..],
message::ReaderOptions::new()).unwrap();
message::ReaderOptions::new()).unwrap().unwrap();
assert_eq!(1, segment_lengths_builder.total_words());
assert_eq!(vec![(0,1)], segment_lengths_builder.to_segment_indices());
buf.clear();
Expand All @@ -425,7 +464,7 @@ pub mod test {
0,0,0,0] // padding
.iter().cloned());
let segment_lengths_builder = read_segment_table(&mut &buf[..],
message::ReaderOptions::new()).unwrap();
message::ReaderOptions::new()).unwrap().unwrap();
assert_eq!(2, segment_lengths_builder.total_words());
assert_eq!(vec![(0,1), (1, 2)], segment_lengths_builder.to_segment_indices());
buf.clear();
Expand All @@ -436,7 +475,7 @@ pub mod test {
0,1,0,0] // 256 length
.iter().cloned());
let segment_lengths_builder = read_segment_table(&mut &buf[..],
message::ReaderOptions::new()).unwrap();
message::ReaderOptions::new()).unwrap().unwrap();
assert_eq!(258, segment_lengths_builder.total_words());
assert_eq!(vec![(0,1), (1, 2), (2, 258)], segment_lengths_builder.to_segment_indices());
buf.clear();
Expand All @@ -449,15 +488,14 @@ pub mod test {
0,0,0,0] // padding
.iter().cloned());
let segment_lengths_builder = read_segment_table(&mut &buf[..],
message::ReaderOptions::new()).unwrap();
message::ReaderOptions::new()).unwrap().unwrap();
assert_eq!(200, segment_lengths_builder.total_words());
assert_eq!(vec![(0,77), (77, 100), (100, 101), (101, 200)], segment_lengths_builder.to_segment_indices());
buf.clear();
}

#[test]
fn test_read_invalid_segment_table() {

let mut buf = vec![];

buf.extend([0,2,0,0].iter().cloned()); // 513 segments
Expand Down
10 changes: 10 additions & 0 deletions capnp/src/serialize_packed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ pub fn read_message<R>(read: R,
serialize::read_message(packed_read, options)
}

/// Like read_message(), but returns None instead of an error if there are zero bytes left in `read`.
pub fn try_read_message<R>(read: R,
options: message::ReaderOptions)
-> Result<Option<crate::message::Reader<serialize::OwnedSegments>>>
where R: BufRead
{
let packed_read = PackedRead { inner: read };
serialize::try_read_message(packed_read, options)
}

struct PackedWrite<W> where W: Write {
inner: W,
}
Expand Down

0 comments on commit 18c2e0c

Please sign in to comment.