Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EAGAIN / socket::send #240

Closed
fabienjuif opened this issue Feb 16, 2019 · 11 comments
Closed

EAGAIN / socket::send #240

fabienjuif opened this issue Feb 16, 2019 · 11 comments

Comments

@fabienjuif
Copy link

fabienjuif commented Feb 16, 2019

Hi 👋!

As you can see here, I tried to write a custom broker:

The JS version hangs for some reason, I wanted to try with Rust because I don't know if this is the JS binding or a poor code design from my side.

But I rely on socket::send to returns an error if the message can't be send (it helps me considerer a worker is dead, and try to send the task to an other).
At this point, socket::send doesn't seems to send an error if message can't be delivered (I tried with and without flag zmq::DONTWAIT.

My socket is in ROUTER mode (SocketType::ROUTER).
With this code, if I kill the "worker" process, there is no error:

socket.send(&worker_name, zmq::SNDMORE & zmq::DONTWAIT).unwrap();
socket.send("", zmq::SNDMORE & zmq::DONTWAIT).unwrap(); // TODO: this could be removed!
socket.send(&task.payload, zmq::DONTWAIT).unwrap();

Same for this code:

socket.send(&worker_name, zmq::SNDMORE).unwrap();
socket.send("", zmq::SNDMORE).unwrap(); // TODO: this could be removed!
socket.send(&task.payload, 0).unwrap();

Is it something I don't understand or this is a known lack from rust-zmq?

Versions:

  • zeromq: 4.3.1
  • rust-zmq: 0.9

Thank you!

@kysely
Copy link

kysely commented Feb 25, 2019

Hey @fabienjuif I believe you have to use bitwise OR (|) to combine the flags, not AND. It works perfectly for me.

@fabienjuif
Copy link
Author

Hey @kysely !

Thank you for your help, so you are right about |, I am dumb ^^'
But I did try to change that, and the result is the same

            socket.send(&worker_name, zmq::SNDMORE | zmq::DONTWAIT).expect("send should work 1");
            socket.send("", zmq::SNDMORE | zmq::DONTWAIT).expect("send should work 2"); // TODO: this could be removed!
            socket.send(&task.payload, zmq::DONTWAIT).expect("send should work 3");

I never goes in my expects sadly 😢

https://github.com/fabienjuif/zeromq-rs-tryout/blob/master/src/main.rs#L149

@fabienjuif
Copy link
Author

fabienjuif commented Feb 26, 2019

I did a little video to show you the issue I have: https://i.imgur.com/63ncAbx.mp4

  1. I run the "broker" to the far left
  2. I run a worker
  3. I disconnect the worker
    • You can see that the "broker" sees a worker, and don't disconnect it right away,
    • This is attended, the worker should be disconnected when we try to send a message, and have an error
  4. I run the client
  5. We see that the "broker" sends the messages to the worker (that is not there anymore), without prompting "send should work"
    • It should print "send should work"

@kysely
Copy link

kysely commented Feb 26, 2019

Hmm, I overlooked this at first, but DONTWAIT flag is not compatible with ROUTER sockets

When ROUTER can't deliver the message (either unknown identity or full send buffer), it simply drops the message. It never blocks (thus you can't use the DONTWAIT flag).

You can check against the official zmq_socket docs, “action in mute state” section.

@kysely
Copy link

kysely commented Feb 26, 2019

I also made a toy example just to make sure.

I intentionally set the high-water mark for send buffer to 1 message so that we hit the mute state when trying to send 2 messages.

You'll see the ROUTER successfully finishes because when it cannot buffer the first message for sending out (due to unknown identity), it simply drops the message.

On the other hand, DEALER successfully saves the first message into the send buffer, however it cannot do the same with the second message (due to high-water mark). Because we use DONTWAIT flag, it expectedly returns an error (if we didn't use DONTWAIT, it would simply block at send call).

use zmq;

fn router_example(ctx: &zmq::Context) {
    let sock = ctx.socket(zmq::ROUTER).unwrap();
    sock.bind("tcp://*:5555").unwrap();

    // set send buffer to 1 message so that we enter “mute state”
    // when trying to send out more than 1 msg
    sock.set_sndhwm(1).unwrap();

    println!("\nRunning ROUTER example");
    let mut i = 0;
    while i < 2 {
        println!("Sending out msg #{:?}...", i);
        sock.send("IDENTITY", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("HEADER", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("PAYLOAD", zmq::DONTWAIT).unwrap();
        println!("Message #{:?} sent", i);
        i += 1;
    }
}

fn dealer_example(ctx: &zmq::Context) {
    let sock = ctx.socket(zmq::DEALER).unwrap();
    sock.connect("tcp://localhost:5555").unwrap();

    // set send buffer to 1 message so that we enter “mute state”
    // when trying to send out more than 1 msg
    sock.set_sndhwm(1).unwrap();

    println!("\nRunning DEALER example");
    let mut i = 0;
    while i < 2 {
        println!("Sending out msg #{:?}...", i);
        sock.send("HEADER", zmq::DONTWAIT | zmq::SNDMORE).unwrap();
        sock.send("PAYLOAD", zmq::DONTWAIT).unwrap();
        println!("Message #{:?} buffered", i);
        i += 1;
    }
}

fn main() {
    let ctx = zmq::Context::new();
    router_example(&ctx);
    dealer_example(&ctx);
}

@fabienjuif
Copy link
Author

Hm ok, surprisingly it works the way I described it with the js binding 🤔 : if send doesn't find the worker it drops an error.

I will re-read your response and read the pointer on doc you give me.

Did you get the concept of what I tried to achieve?
If so, do you think this is dumb or should I use a different socket type?

Thank you again!

@kysely
Copy link

kysely commented Feb 26, 2019

I quickly looked at your JS load balancer and there is one important difference.

You enabled ZMQ_ROUTER_MANDATORY option for your ROUTER socket (only in JS version, not in Rust). As mentioned in the socket docs I linked earlier above, this works because:

When a ZMQ_ROUTER socket enters the mute state due to having reached the high water mark for all peers, then any messages sent to the socket shall be dropped until the mute state ends. Likewise, any messages routed to a peer for which the individual high water mark has been reached shall also be dropped, unless ZMQ_ROUTER_MANDATORY socket option is set.

By default, ROUTER drops message if it cannot resolve the host or the send buffer is full. However with ZMQ_ROUTER_MANDATORY, you enforce the address to be send-able, otherwise raise an error.

You can do the same in Rust via

socket.set_router_mandatory(true).unwrap();

@fabienjuif
Copy link
Author

Yo very nice I forgot about it thank you!!

@kysely
Copy link

kysely commented Feb 26, 2019

I think we've proven this is not a rust-zmq problem and the issue can be closed.

@fabienjuif
Copy link
Author

Sure, I wanted to test it first.
I do it right now and close the issue if this is OK here 👌

@fabienjuif
Copy link
Author

thread 'main' panicked at 'send should work 1: Host unreachable', src/libcore/result.rs:1009:5

Nice !

Thank you @kysely 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants