diff --git a/examples/dealer_router.rs b/examples/dealer_router.rs
new file mode 100644
index 0000000..7584e68
--- /dev/null
+++ b/examples/dealer_router.rs
@@ -0,0 +1,266 @@
+/*
+ * This file is part of Tokio ZMQ.
+ *
+ * Copyright © 2017 Riley Trautman
+ *
+ * Tokio ZMQ is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Tokio ZMQ is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Tokio ZMQ. If not, see .
+ */
+
+#![feature(try_from)]
+
+extern crate futures;
+extern crate tokio_core;
+extern crate zmq;
+extern crate tokio_zmq;
+extern crate log;
+extern crate env_logger;
+
+use std::rc::Rc;
+use std::convert::TryInto;
+use std::collections::VecDeque;
+use std::thread;
+use std::env;
+
+use futures::stream::iter_ok;
+use futures::{Future, Stream};
+use tokio_core::reactor::Core;
+use tokio_zmq::prelude::*;
+use tokio_zmq::{Dealer, Rep, Req, Router, Pub, Sub};
+use tokio_zmq::{Socket, Error};
+
+pub struct Stop;
+
+impl ControlHandler for Stop {
+ fn should_stop(&self, _: VecDeque) -> bool {
+ println!("Got stop signal");
+ true
+ }
+}
+
+fn client() {
+ let mut core = Core::new().unwrap();
+ let ctx = Rc::new(zmq::Context::new());
+ let req: Req = Socket::new(Rc::clone(&ctx), core.handle())
+ .connect("tcp://localhost:5559")
+ .try_into()
+ .unwrap();
+
+ let zpub: Pub = Socket::new(Rc::clone(&ctx), core.handle())
+ .bind("tcp://*:5561")
+ .try_into()
+ .unwrap();
+
+ let runner = iter_ok(0..10)
+ .and_then(|request_nbr| {
+ let msg = zmq::Message::from_slice(b"Hewwo?").unwrap();
+
+ let mut multipart = VecDeque::new();
+ multipart.push_back(msg);
+
+ println!("Sending 'Hewwo?' for {}", request_nbr);
+
+ let response = req.recv();
+ let request = req.send(multipart);
+
+ request.and_then(move |_| {
+ response.map(move |multipart| (request_nbr, multipart))
+ })
+ })
+ .for_each(|(request_nbr, multipart)| {
+ for msg in multipart {
+ if let Some(msg) = msg.as_str() {
+ println!("Received reply {} {}", request_nbr, msg);
+ }
+ }
+
+ Ok(())
+ })
+ .and_then(|_| {
+ let msg = zmq::Message::from_slice(b"").unwrap();
+
+ let mut multipart = VecDeque::new();
+ multipart.push_back(msg);
+
+ zpub.send(multipart)
+ });
+
+ let res = core.run(runner);
+ if let Err(e) = res {
+ println!("client bailed: {:?}", e);
+ }
+}
+
+fn worker() {
+ let mut core = Core::new().unwrap();
+ let ctx = Rc::new(zmq::Context::new());
+
+ let rep: Rep = Socket::new(Rc::clone(&ctx), core.handle())
+ .connect("tcp://localhost:5560")
+ .try_into()
+ .unwrap();
+
+ let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
+ .connect("tcp://localhost:5561")
+ .filter(b"")
+ .try_into()
+ .unwrap();
+
+ let rep = rep.controlled(sub);
+
+ let runner = rep.stream(Stop)
+ .map(|multipart| {
+ for msg in multipart {
+ if let Some(msg) = msg.as_str() {
+ println!("Received request: {}", msg);
+ }
+ }
+
+ let msg = zmq::Message::from_slice(b"Mr Obama???").unwrap();
+ let mut multipart = VecDeque::new();
+ multipart.push_back(msg);
+
+ multipart
+ })
+ .forward(rep.sink::());
+
+ let res = core.run(runner);
+
+ if let Err(e) = res {
+ println!("worker bailed: {:?}", e);
+ }
+}
+
+fn broker() {
+ let mut core = Core::new().unwrap();
+ let ctx = Rc::new(zmq::Context::new());
+
+ let router: Router = Socket::new(Rc::clone(&ctx), core.handle())
+ .bind("tcp://*:5559")
+ .try_into()
+ .unwrap();
+ let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
+ .connect("tcp://localhost:5561")
+ .filter(b"")
+ .try_into()
+ .unwrap();
+ let router = router.controlled(sub);
+
+ let dealer: Dealer = Socket::new(Rc::clone(&ctx), core.handle())
+ .bind("tcp://*:5560")
+ .try_into()
+ .unwrap();
+ let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
+ .connect("tcp://localhost:5561")
+ .filter(b"")
+ .try_into()
+ .unwrap();
+ let dealer = dealer.controlled(sub);
+
+ let d2r = dealer
+ .stream(Stop)
+ .map(|multipart| {
+ for msg in &multipart {
+ if let Some(msg) = msg.as_str() {
+ println!("Relaying message '{}' to router", msg);
+ } else {
+ println!("Relaying unknown message to router");
+ }
+ }
+ multipart
+ })
+ .forward(router.sink::());
+ let r2d = router
+ .stream(Stop)
+ .map(|multipart| {
+ for msg in &multipart {
+ if let Some(msg) = msg.as_str() {
+ println!("Relaying message '{}' to dealer", msg);
+ } else {
+ println!("Relaying unknown message to dealer");
+ }
+ }
+ multipart
+ })
+ .forward(dealer.sink::());
+
+ core.handle().spawn(d2r.map(|_| ()).map_err(|e| {
+ println!("d2r bailed: {:?}", e)
+ }));
+ let res = core.run(r2d);
+
+ if let Err(e) = res {
+ println!("broker bailed: {:?}", e);
+ }
+}
+
+#[derive(Debug, PartialEq)]
+enum Selection {
+ All,
+ Broker,
+ Worker,
+ Client,
+}
+
+impl Selection {
+ fn broker(&self) -> bool {
+ *self == Selection::All || *self == Selection::Broker
+ }
+
+ fn worker(&self) -> bool {
+ *self == Selection::All || *self == Selection::Worker
+ }
+
+ fn client(&self) -> bool {
+ *self == Selection::All || *self == Selection::Client
+ }
+}
+
+fn main() {
+ env_logger::init().unwrap();
+
+ let selection = env::var("SELECTION").unwrap_or("all".into());
+
+ let selection = match selection.as_ref() {
+ "broker" => Selection::Broker,
+ "worker" => Selection::Worker,
+ "client" => Selection::Client,
+ _ => Selection::All,
+ };
+
+ println!("SELECTION: {:?}", selection);
+
+ let mut broker_thread = None;
+ let mut worker_thread = None;
+ let mut client_thread = None;
+
+ if selection.broker() {
+ broker_thread = Some(thread::spawn(broker));
+ }
+ if selection.worker() {
+ worker_thread = Some(thread::spawn(worker));
+ }
+ if selection.client() {
+ client_thread = Some(thread::spawn(client));
+ }
+
+ if let Some(broker_thread) = broker_thread {
+ broker_thread.join().unwrap();
+ }
+ if let Some(worker_thread) = worker_thread {
+ worker_thread.join().unwrap();
+ }
+ if let Some(client_thread) = client_thread {
+ client_thread.join().unwrap();
+ }
+}
diff --git a/examples/req.rs b/examples/req.rs
index c9030d6..18d7b7a 100644
--- a/examples/req.rs
+++ b/examples/req.rs
@@ -47,7 +47,7 @@ fn main() {
.try_into()
.unwrap();
- let runner = iter_ok(0..10)
+ let runner = iter_ok(0..10000)
.and_then(|i| {
let mut multipart = VecDeque::new();
let msg1 = zmq::Message::from_slice(format!("Hewwo? {}", i).as_bytes()).unwrap();
diff --git a/src/async/future.rs b/src/async/future.rs
index 8b695fe..c310707 100644
--- a/src/async/future.rs
+++ b/src/async/future.rs
@@ -26,7 +26,7 @@ use std::rc::Rc;
use zmq;
use tokio_core::reactor::PollEvented;
use tokio_file_unix::File;
-use futures::{Async, Future, Poll};
+use futures::{Async, AsyncSink, Future, Poll};
use futures::task;
use error::Error;
@@ -99,52 +99,66 @@ impl MultipartRequest {
fn send(&mut self) -> Poll<(), Error> {
loop {
+ debug!("MultipartRequest: loop");
let mut multipart = match self.multipart.take() {
Some(multipart) => multipart,
- None => return Ok(Async::Ready(())),
+ None => {
+ debug!("MultipartRequest: breaking loop, no multipart");
+ break;
+ }
};
let msg = match multipart.pop_front() {
Some(msg) => msg,
None => {
self.multipart = None;
- return Ok(Async::Ready(()));
+ debug!("MultipartRequest: breaking loop, empty multipart");
+ task::current().notify();
+ break;
}
};
let place = if multipart.is_empty() {
+ debug!("MultipartRequest: Last message");
MsgPlace::Last
} else {
+ debug!("MultipartRequest: Nth message");
MsgPlace::Nth
};
match self.send_msg(msg, place)? {
- Async::Ready(()) => (),
- Async::NotReady => {
- // In the future, push_front the failed message
- ()
- }
- }
+ AsyncSink::Ready => {
+ debug!("MultipartRequest: Sent!");
- if multipart.is_empty() {
- return Ok(Async::Ready(()));
+ if multipart.is_empty() {
+ debug!("MultipartRequest: breaking loop");
+ break;
+ }
+ }
+ AsyncSink::NotReady(msg) => {
+ multipart.push_front(msg);
+ }
}
self.multipart = Some(multipart);
}
+
+ Ok(Async::Ready(()))
}
- fn send_msg(&mut self, msg: zmq::Message, place: MsgPlace) -> Poll<(), Error> {
+ fn send_msg(
+ &mut self,
+ msg: zmq::Message,
+ place: MsgPlace,
+ ) -> Result, Error> {
+ debug!("MultipartRequest: send_msg");
let events = self.sock.get_events()? as i16;
if events & zmq::POLLOUT == 0 {
- if events & zmq::POLLIN != 0 {
- self.file.need_read();
- } else {
- self.file.need_write();
- }
+ debug!("MultipartRequest: need_write()");
+ self.file.need_write();
- return Ok(Async::NotReady);
+ return Ok(AsyncSink::NotReady(msg));
}
let flags = zmq::DONTWAIT |
@@ -155,8 +169,12 @@ impl MultipartRequest {
};
match self.sock.send_msg(msg, flags) {
- Ok(_) => Ok(Async::Ready(())),
- Err(e @ zmq::Error::EAGAIN) => Err(e.into()),
+ Ok(_) => Ok(AsyncSink::Ready),
+ Err(e @ zmq::Error::EAGAIN) => {
+ // return message in future
+ debug!("MultipartRequest: EAGAIN");
+ Err(e.into())
+ }
Err(e) => Err(e.into()),
}
}
@@ -167,13 +185,14 @@ impl MultipartRequest {
let events = self.sock.get_events()? as i16;
if events & zmq::POLLOUT != 0 {
// manually schedule a wakeup and procede
- debug!("Write ready, but file doesn't think so");
+ debug!("MultipartRequest: Write ready, but file doesn't think so");
task::current().notify();
} else {
return Ok(false);
}
if events & zmq::POLLIN != 0 {
+ debug!("MultipartRequest: need_read()");
self.file.need_read();
}
}
@@ -187,6 +206,7 @@ impl Future for MultipartRequest {
type Error = Error;
fn poll(&mut self) -> Poll {
+ debug!("MultipartRequest: in poll");
if self.check_write()? {
self.send()
} else {
@@ -253,44 +273,61 @@ impl MultipartResponse {
}
fn recv(&mut self) -> Poll {
+ debug!("MultipartResponse: recv");
let events = self.sock.get_events()? as i16;
if events & zmq::POLLIN == 0 {
- if events & zmq::POLLOUT != 0 {
- self.file.need_write();
- } else {
- self.file.need_read();
- }
+ debug!("MultipartResponse: need_read()");
+ self.file.need_read();
+ debug!("MultipartResponse: leaving recv");
return Ok(Async::NotReady);
}
- match self.recv_msg()? {
- Async::Ready(msg) => {
- let mut multipart = self.multipart.take().unwrap_or(VecDeque::new());
+ let mut first = true;
- let more = msg.get_more();
+ loop {
+ debug!("MultipartResponse: loop");
+ match self.recv_msg()? {
+ Async::Ready(msg) => {
+ first = false;
+ let mut multipart = self.multipart.take().unwrap_or(VecDeque::new());
- multipart.push_back(msg);
+ let more = msg.get_more();
- if !more {
- return Ok(Async::Ready(multipart));
- }
+ multipart.push_back(msg);
- task::current().notify();
- self.multipart = Some(multipart);
- Ok(Async::NotReady)
+ if !more {
+ debug!("MultipartResponse: Done receiving, returning multipart");
+ return Ok(Async::Ready(multipart));
+ }
+
+ debug!("MultipartResponse: Waiting on more");
+ self.multipart = Some(multipart);
+ }
+ Async::NotReady => {
+ if first {
+ debug!("MultipartResponse: leaving recv, not ready");
+ return Ok(Async::NotReady);
+ }
+ }
}
- Async::NotReady => Ok(Async::NotReady),
}
}
fn recv_msg(&mut self) -> Poll {
+ debug!("MultipartResponse: recv_msg");
let mut msg = zmq::Message::new()?;
match self.sock.recv(&mut msg, zmq::DONTWAIT) {
- Ok(_) => Ok(Async::Ready(msg)),
- Err(zmq::Error::EAGAIN) => Ok(Async::NotReady),
+ Ok(_) => {
+ debug!("MultipartResponse: received: {:?}", msg.as_str());
+ Ok(Async::Ready(msg))
+ }
+ Err(zmq::Error::EAGAIN) => {
+ debug!("MultipartResponse: EAGAIN");
+ Ok(Async::NotReady)
+ }
Err(e) => Err(e.into()),
}
}
@@ -300,7 +337,7 @@ impl MultipartResponse {
let events = self.sock.get_events()? as i16;
if events & zmq::POLLIN != 0 {
// manually schedule a wakeup and procede
- debug!("Read ready, but file doesn't think so");
+ debug!("MultipartResponse: Read ready, but file doesn't think so");
task::current().notify();
} else {
return Ok(false);
@@ -320,6 +357,7 @@ impl Future for MultipartResponse {
type Error = Error;
fn poll(&mut self) -> Poll {
+ debug!("MultipartResponse: In poll");
if self.check_read()? {
self.recv()
} else {
diff --git a/src/async/sink.rs b/src/async/sink.rs
index fc96cd0..c2b795b 100644
--- a/src/async/sink.rs
+++ b/src/async/sink.rs
@@ -26,10 +26,10 @@ use std::rc::Rc;
use zmq;
use tokio_core::reactor::PollEvented;
use tokio_file_unix::File;
-use futures::{Async, AsyncSink, Poll, Sink, StartSend};
-use futures::task;
+use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend};
-use super::{MsgPlace, Multipart};
+use super::Multipart;
+use async::future::MultipartRequest;
use error::Error;
use file::ZmqFile;
@@ -80,10 +80,10 @@ pub struct MultipartSink
where
E: From,
{
+ request: Option,
sock: Rc,
// Handles notifications to/from the event loop
file: Rc>>,
- multipart: Option,
phantom: PhantomData,
}
@@ -93,144 +93,28 @@ where
{
pub fn new(sock: Rc, file: Rc>>) -> Self {
MultipartSink {
+ request: None,
sock: sock,
file: file,
- multipart: None,
phantom: PhantomData,
}
}
- fn send_msg(
- &mut self,
- msg: zmq::Message,
- place: MsgPlace,
- ) -> Result, E> {
- debug!("MultipartSink: In send: {:?}", msg.as_str());
- // Get the events currently waiting on the socket
- let events = self.sock.get_events().map_err(Error::from)? as i16;
- debug!("MultipartSink: Got events");
-
- // Double check that the socket is ready for writing
- if events & zmq::POLLOUT == 0 {
- if events & zmq::POLLIN != 0 {
- self.file.need_read();
- debug!(
- "MultipartSink: Events does not indicate POLLOUT: e{} != p{}",
- events,
- zmq::POLLOUT
- );
- return Ok(AsyncSink::NotReady(msg));
- } else {
- self.file.need_write();
- debug!(
- "MultipartSink: Events does not indicate POLLOUT: e{} != p{}",
- events,
- zmq::POLLOUT
- );
- return Ok(AsyncSink::NotReady(msg));
- }
- }
-
- let flags = zmq::DONTWAIT |
- if place == MsgPlace::Last {
- debug!("MultipartSink: flags: DONTWAIT | 0");
- 0
- } else {
- debug!("MultipartSink: flags: DONTWAIT | SNDMORE");
- zmq::SNDMORE
- };
-
- match self.sock.send_msg(msg, flags) {
- Ok(_) => {
- debug!("MultipartSink: Sent!");
- Ok(AsyncSink::Ready)
- }
- Err(zmq::Error::EAGAIN) => {
- /* I'd like to have the following code in place, but the Rust zmq high-level
- * bindings don't return the message you just tried to send if it fails to send.
- * For now, we bubble the error.
- *
- * // If EAGAIN, the socket is busy. This shouldn't happend because we have already
- * // witnessed that the socket is ready for writing.
- * Ok(AsyncSink::NotReady(msg))
- */
- debug!("MultipartSink: EAGAIN");
- let e = zmq::Error::EAGAIN;
- let e: Error = e.into();
- Err(e.into())
- }
- Err(e) => {
- debug!("MultipartSink: Error: {}", e);
- let e: Error = e.into();
- Err(e.into())
- }
- }
- }
-
- fn flush(&mut self) -> Result, E> {
- debug!("MultipartSink: In flush");
- loop {
- if let Some(mut multipart) = self.multipart.take() {
- if let Some(curr_msg) = multipart.pop_front() {
- debug!("MultipartSink: Sending current message");
-
- let msg_place = if multipart.is_empty() {
- MsgPlace::Last
- } else {
- MsgPlace::Nth
- };
-
- match self.send_msg(curr_msg, msg_place)? {
- AsyncSink::Ready => {
- debug!("MultipartSink: self.send success!");
-
- if !multipart.is_empty() {
- self.multipart = Some(multipart);
- continue;
- // return Ok(Async::NotReady);
- }
- break;
- }
- AsyncSink::NotReady(curr_msg) => {
- debug!("MultipartSink: Failed send");
- // If we couldn't send the current message, put it back and return NotReady
- multipart.push_front(curr_msg);
- self.multipart = Some(multipart);
- continue;
- // return Ok(Async::NotReady);
- }
- }
- } else {
- self.multipart = None;
- break;
- }
- } else {
- break;
+ fn poll_request(&mut self, mut request: MultipartRequest) -> Poll<(), E> {
+ match request.poll()? {
+ Async::Ready(()) => Ok(Async::Ready(())),
+ Async::NotReady => {
+ self.request = Some(request);
+ Ok(Async::NotReady)
}
}
-
- debug!("MultipartSink: Ready!");
- Ok(Async::Ready(()))
}
- fn check_write(&mut self) -> Result {
- if let Async::NotReady = self.file.poll_write() {
- // Get the events currently waiting on the socket
- let events = self.sock.get_events().map_err(Error::from)? as i16;
- if events & zmq::POLLOUT != 0 {
- // manually schedule a wakeup and procede
- debug!("Write ready, but file doesn't think so");
- task::current().notify();
- } else {
- return Ok(false);
- }
-
- if events & zmq::POLLIN != 0 {
- self.file.need_read();
- }
- }
+ fn make_request(&mut self, multipart: Multipart) {
+ let sock = Rc::clone(&self.sock);
+ let file = Rc::clone(&self.file);
- Ok(true)
+ self.request = Some(MultipartRequest::new(sock, file, multipart));
}
}
@@ -246,39 +130,30 @@ where
multipart: Self::SinkItem,
) -> StartSend {
debug!("MultipartSink: start_send");
- if self.check_write()? {
- if self.multipart.is_none() {
- debug!("MultipartSink: Set multipart, ready!");
- self.multipart = Some(multipart);
- self.file.need_write();
-
- Ok(AsyncSink::Ready)
- } else {
- debug!("MultipartSink: not ready");
- match self.flush()? {
- Async::Ready(()) => {
- debug!("MultipartSink: Flushed, ready now");
- self.multipart = Some(multipart);
+ if let Some(request) = self.request.take() {
+ match self.poll_request(request)? {
+ Async::Ready(()) => {
+ self.make_request(multipart);
+ self.file.need_write();
Ok(AsyncSink::Ready)
}
- Async::NotReady => {
- debug!("MultipartSink: still not ready");
- Ok(AsyncSink::NotReady(multipart))
- }
- }
+ Async::NotReady => Ok(AsyncSink::NotReady(multipart)),
}
} else {
- Ok(AsyncSink::NotReady(multipart))
+ self.make_request(multipart);
+ self.file.need_write();
+
+ Ok(AsyncSink::Ready)
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
debug!("MultipartSink: poll_complete");
- if self.check_write()? {
- self.flush()
+ if let Some(request) = self.request.take() {
+ self.poll_request(request)
} else {
- Ok(Async::NotReady)
+ Ok(Async::Ready(()))
}
}
}
diff --git a/src/async/stream.rs b/src/async/stream.rs
index 2117a6d..c754879 100644
--- a/src/async/stream.rs
+++ b/src/async/stream.rs
@@ -101,6 +101,7 @@ impl Stream for MultipartStream {
type Error = Error;
fn poll(&mut self) -> Poll