Skip to content

Commit

Permalink
fix: Allow more flexible EventStream buffers
Browse files Browse the repository at this point in the history
This commit changes the `EventStream` type to hopefully avoid some of
the previously observed difficulties (viz. #120) around the lifetime of
the user-provided buffer.

Previously, calls to `EventStream::new` were required to provide an
`&'a mut [u8]`, where the `EventStream` is generic over the lifetime
`'a`. This is an issue in many cases, where the `EventStream` must
live for the `'static` lifetime, as it requires a `'static mut` buffer,
which is difficult to create safely.

This branch changes the `EventStream` type to be generic over a type
`T` which implements `AsRef<[u8]>` and `AsMut<[u8]>`. Since an
`&mut [u8]` implements these traits, means that the `EventStream` may
now be constructed with an owned buffer _or_ a borrowed buffer. Thus,
users can pass in a `&mut [u8]` to a short-lived `EventStream`, while a
`'static` one can be constructed by passing in a `[u8; N]`, a
`Vec<u8>`, _or_ a `&'static mut [u8]`.

Additionally, this means that buffer types from other crates (such as
`bytes`) could theoretically be used, without requiring `inotify` to
depend on those crates. I'm not sure if this would actually happen that
often in practice, but it's a nice side benefit.

Fixes #120.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Jan 25, 2019
1 parent be829e4 commit ea3e7a3
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
6 changes: 1 addition & 5 deletions examples/issue-120-workaround.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ use tempdir::TempDir;
use tokio::prelude::*;

fn main() -> Result<(), io::Error> {
/// XXX: this is a workaround to set the inotify's buffer to be
/// used, matching with tokio's runtime API
/// https://github.com/inotify-rs/inotify/issues/120
static mut BUFFER: [u8; 32] = [0; 32];

let mut inotify = Inotify::init()?;

Expand All @@ -26,7 +22,7 @@ fn main() -> Result<(), io::Error> {
});

let future = inotify
.event_stream(unsafe { &mut BUFFER })
.event_stream([0; 32])
.map_err(|e| println!("inotify error: {:?}", e))
.for_each(move |event| {
println!("event: {:?}", event);
Expand Down
15 changes: 8 additions & 7 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ impl Inotify {
///
/// [`Inotify::event_stream_with_handle`]: struct.Inotify.html#method.event_stream_with_handle
#[cfg(feature = "stream")]
pub fn event_stream<'buffer>(&mut self, buffer: &'buffer mut [u8])
-> EventStream<'buffer>
pub fn event_stream<T>(&mut self, buffer: T)
-> EventStream<T>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
EventStream::new(self.fd.clone(), buffer)
}
Expand All @@ -417,11 +419,10 @@ impl Inotify {
///
/// [`Inotify::event_stream`]: struct.Inotify.html#method.event_stream
#[cfg(feature = "stream")]
pub fn event_stream_with_handle<'buffer>(&mut self,
handle: &Handle,
buffer: &'buffer mut [u8],
)
-> io::Result<EventStream<'buffer>>
pub fn event_stream_with_handle<T>(&mut self, handle: &Handle, buffer: T)
-> io::Result<EventStream<T>>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
EventStream::new_with_handle(self.fd.clone(), handle, buffer)
}
Expand Down
22 changes: 14 additions & 8 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ use util::read_into_buffer;
/// Allows for streaming events returned by [`Inotify::event_stream`].
///
/// [`Inotify::event_stream`]: struct.Inotify.html#method.event_stream
pub struct EventStream<'buffer> {
pub struct EventStream<T> {
fd: PollEvented<EventedFdGuard>,
buffer: &'buffer mut [u8],
buffer: T,
buffer_pos: usize,
unused_bytes: usize,
}

impl<'buffer> EventStream<'buffer> {
impl<T> EventStream<T>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
/// Returns a new `EventStream` associated with the default reactor.
pub(crate) fn new(fd: Arc<FdGuard>, buffer: &'buffer mut [u8]) -> Self {
pub(crate) fn new(fd: Arc<FdGuard>, buffer: T) -> Self {
EventStream {
fd: PollEvented::new(EventedFdGuard(fd)),
buffer: buffer,
Expand All @@ -60,7 +63,7 @@ impl<'buffer> EventStream<'buffer> {
pub(crate) fn new_with_handle(
fd : Arc<FdGuard>,
handle: &Handle,
buffer: &'buffer mut [u8],
buffer: T,
)
-> io::Result<Self>
{
Expand All @@ -73,7 +76,10 @@ impl<'buffer> EventStream<'buffer> {
}
}

impl<'buffer> Stream for EventStream<'buffer> {
impl<T> Stream for EventStream<T>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
type Item = EventOwned;
type Error = io::Error;

Expand All @@ -82,7 +88,7 @@ impl<'buffer> Stream for EventStream<'buffer> {
if self.unused_bytes == 0 {
// Nothing usable in buffer. Need to reset and fill buffer.
self.buffer_pos = 0;
self.unused_bytes = try_ready!(self.fd.poll_read(&mut self.buffer));
self.unused_bytes = try_ready!(self.fd.poll_read(&mut self.buffer.as_mut()));
}

if self.unused_bytes == 0 {
Expand All @@ -96,7 +102,7 @@ impl<'buffer> Stream for EventStream<'buffer> {
// least one event in there and can call `from_buffer` to take it out.
let (bytes_consumed, event) = Event::from_buffer(
Arc::downgrade(self.fd.get_ref()),
&self.buffer[self.buffer_pos..],
&self.buffer.as_ref()[self.buffer_pos..],
);
self.buffer_pos += bytes_consumed;
self.unused_bytes -= bytes_consumed;
Expand Down
2 changes: 1 addition & 1 deletion tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn it_should_watch_a_file_async() {

use futures::Stream;
let events = inotify
.event_stream(&mut buffer)
.event_stream(&mut buffer[..])
.take(1)
.wait()
.collect::<Vec<_>>();
Expand Down

0 comments on commit ea3e7a3

Please sign in to comment.