Rust ZeroMQ bindings for Tokio
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
examples
rust-zmq
src
.gitignore
Cargo.toml
README.md

README.md

TMQ - Rust ZeroMQ bindings for Tokio

This crate bridges Tokio and ZeroMQ to allow for ZeroMQ in the async world.

Currently a WIP

Currently Implemented Sockets

  • Request
  • Response
  • Subscribe
  • Publish

Examples

There are two sets of examples, publish/subscribe and request/response.

Bring up two terminals and run either:

cargo run --example publish
# In Another Terminal
cargo run --example subcribe

Or:

cargo run --example request
# Another Terminal
cargo run --example response

Usage

Usage is made to be simple, but opinionated. See the examples for working code, but in general, you need to import tokio and tmq::*

Request

A request is a Stream takes an input stream of Messages (using the with function), sends them to a response socket, and then returns the messages as a stream.

let request = request(&Context::new())
    .connect("tcp://127.0.0.1:7899")
    .expect("Couldn't connect")
    .with(stream::iter_ok(vec!["Message1", "Message2", "Message3"].into_iter().map(|val| Message::from(val))))
    .for_each(|val| {
        info!("Response: {}", val.as_str().unwrap_or(""));
        Ok(())
    }).map_err(|err| {
        error!("Error with request: {}", err);
    });

tokio::run(request);

Response

A response socket is a Future that receives messages, responds to them, and sends them back as per the Responder implementation:

let responder = respond(&Context::new())
    .bind("tcp://127.0.0.1:7899")
    .expect("Couldn't bind address")
    .with(|msg: Message| {
        info!("Request: {}", msg.as_str().unwrap_or(""));
        Ok(msg)
    }).map_err(|err| {
        error!("Error from server:{}", err);
    });

tokio::run(responder);

Responder trait

The with function takes anything that implements the Responder trait or a closure as above:

//You can use a struct to respond by implementing the `Responder` trait
pub struct EchoResponder {}

impl Responder for EchoResponder {
    type Output = FutureResult<zmq::Message, Error>;

    fn respond(&mut self, msg: zmq::Message) -> Self::Output {
        return Ok(msg).into();
    }
}

//Or you can use a free-floating function
fn echo(msg: zmq::Message) -> impl Future<Item = zmq::Message, Error = Error> {
    return ok(msg);
}

Publish

A publish socket is a Sink that takes values, and sends them to any subscribe sockets connected (note: ZeroMQ will drop messages if noone is connected).

let mut i = 0;

let broadcast = Interval::new_interval(Duration::from_millis(1000))
    .map(move |_| {
        i += 1;
        let message = format!("Broadcast #{}", i);
        Message::from(&message)
    });

let request = publish(&Context::new())
    .bind("tcp://127.0.0.1:7899")
    .expect("Couldn't bind")
    .finish()
    .send_all(broadcast)
    .map(|_| ())
    .map_err(|e| {
        error!("Error publishing:{}", e);
    });

tokio::run(request);

Subscribe

a subscribe socket is a Stream that reads in values from a publish socket. You specify the filter prefix using the subscribe method, using "" for all messages.

let request = subscribe(&Context::new())
    .connect("tcp://127.0.0.1:7899")
    .expect("Couldn't connect")
    .subscribe("")
    .for_each(|val| {
        info!("Subscribe: {}", val.as_str().unwrap_or(""));
        Ok(())
    }).map_err(|e| {
        error!("Error Subscribing: {}", e);
    });