Version 0.3.0! Already!
This release sees the introduction of the TimeoutStream<S: Stream<Error = Error>>
struct. This type is particularly useful since you can wrap a MultipartStream
, a ControlledStream
, or an EndingStream
in a TimeoutStream
and receive an Either<A: S::Item, B: Timeout>
when the stream is ready. This means that you either get a Timeout notification, or you get the value the stream was intending to produce.
tokio_zmq::TimeoutStream
differs from tokio_timer::TimeoutStream
in that tokio_timer
's stream will error on timeout, while tokio_zmq
's stream will produce an Either
.
Example with TimeoutStream
#![feature(try_from)]
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;
extern crate zmq;
use std::rc::Rc;
use std::convert::TryInto;
use std::time::Duration;
use futures::future::Either;
use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Sub};
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let sub: Sub = Socket::builder(ctx, &handle)
.connect("tcp://localhost:5556")
.filter(b"")
.try_into()
.unwrap();
let consumer = sub
.stream()
.timeout(Duration::from_secs(30))
.filter(|either| {
Either::A(multipart) => Some(multipart),
Either::B(_) => {
println!("Operation timed out");
None
}
})
.for_each(|multipart| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received: {}", msg);
}
}
Ok(())
});
core.run(consumer).unwrap();
}
API Changes
- Introduce streams with timeouts
- Rename
Socket::create
toSocket::builder
to reflect that it returns aSocketBuilder
. - Do builder notation for
MultipartStreams
for control, ending, and timers. Remove the separate Controlled variants fromsrc/socket/types.rs
. - Remove
AsControlledSocket
,ControlledStreamSocket
,ControlledSinkSocket
, andIntoControlledSocket
. - Remove Controlled macros.
- Remove
Option<EndHandler>
fromMultipartStream
. - Remove the
DefaultHandler
concept, sinceEndHandler
s are only in the type if they're in use. - Remove
stream_with_end(end_handler: EndHandler)
in favor ofstream().with_end(end_handler: EndHandler)
. - Remove
ControlledSocket
socket type in favor ofsocket.stream().controlled(control: impl StreamSocket, handler: impl ControlHandler)
. - Update all examples to work with changes.