Skip to content

Commit

Permalink
Coalesce readable & writable into ready event
Browse files Browse the repository at this point in the history
Having separate readable & writable events makes cleanly closing the socket
and freeing up resources a bit tricky. Instead, a single `ready` event is
used. The guarantees are that the event will be invoked only a single time per
socket per event loop tick.

Fixes #184
  • Loading branch information
carllerche committed Jul 7, 2015
1 parent f4fcc75 commit 2d80519
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 373 deletions.
186 changes: 5 additions & 181 deletions src/event.rs
Expand Up @@ -287,142 +287,12 @@ impl fmt::Debug for Interest {
}
}

#[derive(Copy, PartialEq, Eq, Clone, PartialOrd, Ord)]
pub struct ReadHint(usize);

impl ReadHint {
#[inline]
pub fn none() -> ReadHint {
ReadHint(0)
}

#[inline]
pub fn all() -> ReadHint {
ReadHint::data() | ReadHint::hup() | ReadHint::error()
}

#[inline]
pub fn data() -> ReadHint {
ReadHint(0x001)
}

#[inline]
pub fn hup() -> ReadHint {
ReadHint(0x002)
}

#[inline]
pub fn error() -> ReadHint {
ReadHint(0x004)
}

#[inline]
pub fn is_data(&self) -> bool {
self.contains(ReadHint::data())
}

#[inline]
pub fn is_hup(&self) -> bool {
self.contains(ReadHint::hup())
}

#[inline]
pub fn is_error(&self) -> bool {
self.contains(ReadHint::error())
}

#[inline]
pub fn insert(&mut self, other: ReadHint) {
self.0 |= other.0;
}

#[inline]
pub fn remove(&mut self, other: ReadHint) {
self.0 &= !other.0;
}

#[inline]
pub fn contains(&self, other: ReadHint) -> bool {
(*self & other) == other
}

#[inline]
pub fn bits(&self) -> usize {
self.0
}
}

impl ops::BitOr for ReadHint {
type Output = ReadHint;

#[inline]
fn bitor(self, other: ReadHint) -> ReadHint {
ReadHint(self.bits() | other.bits())
}
}

impl ops::BitXor for ReadHint {
type Output = ReadHint;

#[inline]
fn bitxor(self, other: ReadHint) -> ReadHint {
ReadHint(self.bits() ^ other.bits())
}
}

impl ops::BitAnd for ReadHint {
type Output = ReadHint;

#[inline]
fn bitand(self, other: ReadHint) -> ReadHint {
ReadHint(self.bits() & other.bits())
}
}

impl ops::Sub for ReadHint {
type Output = ReadHint;

#[inline]
fn sub(self, other: ReadHint) -> ReadHint {
ReadHint(self.bits() & !other.bits())
}
}

impl ops::Not for ReadHint {
type Output = ReadHint;

#[inline]
fn not(self) -> ReadHint {
ReadHint(!self.bits() & ReadHint::all().bits())
}
}

impl fmt::Debug for ReadHint {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut one = false;
let flags = [
(ReadHint::data(), "DataHint"),
(ReadHint::hup(), "HupHint"),
(ReadHint::error(), "ErrorHint")];

for &(flag, msg) in flags.iter() {
if self.contains(flag) {
if one { try!(write!(fmt, " | ")) }
try!(write!(fmt, "{}", msg));

one = true
}
}

Ok(())
}
}


// Keep this struct internal to mio
#[derive(Copy, Clone, Debug)]
pub struct IoEvent {
kind: Interest,
token: Token
pub kind: Interest,
pub token: Token
}

/// IoEvent represents the raw event that the OS-specific selector
Expand All @@ -433,56 +303,10 @@ pub struct IoEvent {
/// Selector when they have events to report.
impl IoEvent {
/// Create a new IoEvent.
pub fn new(kind: Interest, token: usize) -> IoEvent {
pub fn new(kind: Interest, token: Token) -> IoEvent {
IoEvent {
kind: kind,
token: Token(token)
}
}

pub fn token(&self) -> Token {
self.token
}

/// Return an optional hint for a readable handle. Currently,
/// this method supports the HupHint, which indicates that the
/// kernel reported that the remote side hung up. This allows a
/// consumer to avoid reading in order to discover the hangup.
pub fn read_hint(&self) -> ReadHint {
let mut hint = ReadHint::none();

// The backend doesn't support hinting
if !self.kind.is_hinted() {
return hint;
token: token,
}

if self.kind.is_hup() {
hint = hint | ReadHint::hup();
}

if self.kind.is_readable() {
hint = hint | ReadHint::data();
}

if self.kind.is_error() {
hint = hint | ReadHint::error();
}

hint
}

/// This event indicated that the handle is now readable
pub fn is_readable(&self) -> bool {
self.kind.is_readable() || self.kind.is_hup()
}

/// This event indicated that the handle is now writable
pub fn is_writable(&self) -> bool {
self.kind.is_writable()
}

/// This event indicated that the handle had an error
pub fn is_error(&self) -> bool {
self.kind.is_error()
}
}
33 changes: 13 additions & 20 deletions src/event_loop.rs
Expand Up @@ -263,7 +263,6 @@ impl<H: Handler> EventLoop<H> {
self.io_process(handler, events);
self.notify(handler, messages);
self.timer_process(handler);

Ok(())
}

Expand Down Expand Up @@ -295,25 +294,17 @@ impl<H: Handler> EventLoop<H> {

trace!("event={:?}", evt);

match evt.token() {
match evt.token {
NOTIFY => self.notify.cleanup(),
_ => self.io_event(handler, evt)
_ => self.io_event(handler, evt)
}

i += 1;
}
}

fn io_event(&mut self, handler: &mut H, evt: IoEvent) {
let tok = evt.token();

if evt.is_readable() | evt.is_error() {
handler.readable(self, tok, evt.read_hint());
}

if evt.is_writable() {
handler.writable(self, tok);
}
handler.ready(self, evt.token, evt.kind);
}

fn notify(&mut self, handler: &mut H, mut cnt: usize) {
Expand Down Expand Up @@ -382,7 +373,7 @@ mod tests {
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::SeqCst;
use super::EventLoop;
use {buf, unix, Buf, Handler, Token, TryRead, TryWrite, ReadHint};
use {buf, unix, Buf, Handler, Token, TryRead, TryWrite, Interest};

#[test]
pub fn test_event_loop_size() {
Expand All @@ -408,14 +399,16 @@ mod tests {
type Timeout = usize;
type Message = ();

fn readable(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token, _: ReadHint) {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
fn ready(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token, events: Interest) {
if events.is_readable() {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}

fn writable(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token) {
(*self.wcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
if events.is_writable() {
(*self.wcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/handler.rs
@@ -1,22 +1,32 @@
use {EventLoop, ReadHint, Token};
use {EventLoop, Interest, Token};

#[allow(unused_variables)]
pub trait Handler {
type Timeout;
type Message: Send;

fn readable(&mut self, event_loop: &mut EventLoop<Self>, token: Token, hint: ReadHint) {
}

fn writable(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
/// Invoked when the socket represented by `token` is ready to be operated
/// on. `events` indicates the specific operations that are
/// ready to be performed.
///
/// For example, when a TCP socket is ready to be read from, `events` will
/// have `readable` set. When the socket is ready to be written to,
/// `events` will have `writable` set.
///
/// This function will only be invoked a single time per socket per event
/// loop tick.
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Interest) {
}

/// Invoked when a message has been received via the event loop's channel.
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
}

/// Invoked when a timeout has completed.
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
}

/// Invoked when `EventLoop` has been interrupted by a signal interrupt.
fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
}
}
3 changes: 1 addition & 2 deletions src/lib.rs
Expand Up @@ -51,7 +51,7 @@
//! type Timeout = ();
//! type Message = ();
//!
//! fn readable(&mut self, event_loop: &mut EventLoop<MyHandler>, token: Token, _: ReadHint) {
//! fn ready(&mut self, event_loop: &mut EventLoop<MyHandler>, token: Token, _: Interest) {
//! match token {
//! SERVER => {
//! let MyHandler(ref mut server) = *self;
Expand Down Expand Up @@ -109,7 +109,6 @@ pub use buf::{
pub use event::{
PollOpt,
Interest,
ReadHint,
};
pub use event_loop::{
EventLoop,
Expand Down
27 changes: 4 additions & 23 deletions src/poll.rs
Expand Up @@ -2,16 +2,18 @@ use {sys, Evented, Token};
use event::{Interest, IoEvent, PollOpt};
use std::{fmt, io};

pub use sys::{Events};

pub struct Poll {
selector: sys::Selector,
events: sys::Events
events: sys::Events,
}

impl Poll {
pub fn new() -> io::Result<Poll> {
Ok(Poll {
selector: try!(sys::Selector::new()),
events: sys::Events::new()
events: sys::Events::new(),
})
}

Expand Down Expand Up @@ -56,10 +58,6 @@ impl Poll {
pub fn event(&self, idx: usize) -> IoEvent {
self.events.get(idx)
}

pub fn iter(&self) -> EventsIterator {
EventsIterator { events: &self.events, index: 0 }
}
}

impl fmt::Debug for Poll {
Expand All @@ -68,20 +66,3 @@ impl fmt::Debug for Poll {
}
}

pub struct EventsIterator<'a> {
events: &'a sys::Events,
index: usize
}

impl<'a> Iterator for EventsIterator<'a> {
type Item = IoEvent;

fn next(&mut self) -> Option<IoEvent> {
if self.index == self.events.len() {
None
} else {
self.index += 1;
Some(self.events.get(self.index - 1))
}
}
}

0 comments on commit 2d80519

Please sign in to comment.