Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeats between master and worker processes. #18

Open
amitmurthy opened this issue Jul 26, 2013 · 25 comments
Open

Heartbeats between master and worker processes. #18

amitmurthy opened this issue Jul 26, 2013 · 25 comments

Comments

@amitmurthy
Copy link
Contributor

Heartbeats between master and workers have been mentioned before. Seeing JuliaLang/IJulia.jl#8 triggered it again.

Suggesting a design :

  • The requirement is to provide an alternate communication channel to a Julia worker process that can be used for 2 things presently - a) heartbeats and b) querying the state of the process (including long running computation progress)
  • This has to be in a different thread since a long running compute bound task will not yield frequently enough
  • The handler also needs to be in C since core Julia is not yet thread-safe
  • Currently the accept_handler in multi.jl waits for connection requests from peer processes.
  • Upon a new connection it will now read a a short 4-byte header field that will specify what type of connection it is.
  • If the client has sent "WRKR", it will call create_message_handler_loop as it currently does
  • If the client has sent "ADMN", it will disengage the socket from libuv, start a new OS thread and execute a C "admin" function in the new thread with this socket. Only one admin thread can be running at any time.
  • The "admin" function will currently just respond to "ping" messages with some state information about the worker process. Maybe also provide a means for https://github.com/timholy/ProgressMeter.jl to publish compute progress information which is returned with every ping. Will use a pipe to push information between the Julia thread and the admin thread.
  • If a ping message has not been received for a specified timeout (say 2 minutes) or this socket connection breaks, the worker exits if configured to do so.
  • Julia master process too starts a new thread (C code) that continually sends these heartbeat messages to all connected workers and exposes the returned state information to the Julia thread. A show_workers() can pretty print this information.

The whole heartbeat mechanism can be switched on via a command line argument to the julia executable (default is off)

It seems workable enough and we don't need the admin thread to listen on a different port. Do let me know what you guys think.

@ViralBShah
Copy link
Member

@JeffBezanson This seems like a post 0.2 thing. Thoughts?

Let's keep thisPR around.

@Keno
Copy link
Member

Keno commented Jul 31, 2013

This is definitely not in scope for 0.2

@amitmurthy
Copy link
Contributor Author

This is not yet a PR. Will convert it into a PR if folks are OK with the general design.

@malmaud
Copy link

malmaud commented Oct 26, 2015

Hey @amitmurthy , I'm thinking of trying to make a PR for heartbeat functionality based on the new message-passing idiom. Would that be stepping on your toes? Do you have any additional thoughts about heartbeat support since this issue was created?

@amitmurthy
Copy link
Contributor Author

Would that be stepping on your toes?

Perish the thought. No issues whatsoever.

I would initially focus just on the distributed "progress meter".

Removing failed from the cluster currently happens when the remote socket connection is closed. This workers pretty well in most cases - the exception being when different transports are used (which is not very widespread).

Restarting workers is not straightforward since we have to recreate worker global state if any.

Reconnecting in the event of broken connections (due to serialization/deserialization) errors can and should be implemented. This would require

  • Having a one-up message number that is acknowledged by the receiver.
  • The sender keeping a reference to all sent messages around till an ack is recd. So while messages with errors are notified as such, messages sent after the message-in-error can be retried on a new connection. This is independent of heartbeats though.

Note : I am currently traveling and will not have reliable internet over the next few days - my responses may be delayed.

@malmaud
Copy link

malmaud commented Oct 26, 2015

OK, that makes sense.

But wouldn't it to be simpler to just not close the connection in the first place in the event of a serialization error? If a worker can't deserialize a message from pid1 (ie, deserialize(r_stream) throws), why not send a message to that effect back to pid1 instead of exiting?

I guess I'm not really understanding why exceptions caused by evaluation of the thunk the worker has been told to evaluate should be treated so differently than exceptions caused by deserializing pid1's message.

(Understand you're traveling and am not expecting an immediate response)

@malmaud
Copy link

malmaud commented Oct 26, 2015

Oh, is it because the state of the stream is then in a corrupted state so serializing/deserializing messages over it is no longer possible?

In that case, one unifying solution might be to have a second stream to each worker, an out-of-band "status" stream. It could both be used for heartbeats and for indicating that the main message stream is no longer usable because of a serialization error and has to be reset on both sides.

@amitmurthy
Copy link
Contributor Author

  • The out-of-band communication needs to be in a separate thread
  • reset is not simple. Consider this
    • Separate tasks on pid1 write msg1, msg2 and msg3 to pid2 at the same time.
    • On the sender side, all 3 messages have been written to the socket and the write calls have returned
    • On the receiver side, while deserializing msg1, an exception is raised.
    • Since we don't have a message length header field, we cannot discard the bytes of msg1 and read msg2 and msg3.
    • The reason we don't have a length header is because of long messages which are written to the socket directly. To otherwise calculate the msg size in bytes would require writing to an intermediate buffer.
    • We could implement a serialized_sizeof(object) which would calculate the serialized size of any object. That would help in prefixing a length-of-message and can help in recovering from deserialization errors.
    • Another option is to have ack messages and keep sent objects around till they are acked.
    • Any other schemes?

@malmaud
Copy link

malmaud commented Oct 27, 2015

Ya, I see that out-of-band would have to be on a separate thread. IJulia has example multithreaded heartbeat code (https://github.com/JuliaLang/IJulia.jl/blob/3a1ad9eac8c259c79a6a1c09342eac262b16f34b/src/heartbeat.jl). It might require a new C function somewhere in src. And at least in my testing, using a Julia function that ccalls sleep and then writes to a socket in a loop works fine, although maybe there's something unsafe about that which haven't shown up yet.

I see the complexity of dealing with a deserialization error. If we're throwing out possible schemes, we can take inspiration from http multipart encoding -

  1. to put a message on the wire, first write a random boundary token,
  2. then write the serialized message,
  3. then write the boundary token again.

The receiver first reads the token, then deserializes the message. If it encounters an error during deserialization, it just reads the bytes from the stream until it finds the boundary token so it can start processing the next message.

@ssfrr
Copy link

ssfrr commented Oct 27, 2015

If you're interested in a way to frame messages in a stream I've always been a fan of COBS, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

@malmaud
Copy link

malmaud commented Oct 27, 2015

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though
my applications have usually been wire protocols on embedded systems where
the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I
could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

@malmaud
Copy link

malmaud commented Oct 27, 2015

Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud malmaud@gmail.com wrote:

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

@malmaud
Copy link

malmaud commented Oct 28, 2015

It also doesn't seem like it would work here because it needs to know the
number of frame bytes in the message before encoding the message, but we
don't have the length of the message since it's being written directly to
the socket.

On Tue, Oct 27, 2015 at 7:56 PM Jonathan Malmaud malmaud@gmail.com wrote:

Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud malmaud@gmail.com
wrote:

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

@ssfrr
Copy link

ssfrr commented Oct 28, 2015

I'll preface this by saying that I'm not entirely convinced either that this is a better way to go, and it's possible that my ignorance of the larger context is getting in the way. Here are a couple of things that concern me with the random delimiter scheme though:

  1. though the chance of a collision is almost infinitesimally small, it's nonzero and I imagine lots and lots of these messages getting sent. I'm not sure what the target is here for failure probability, but this is probably just me being pedantic.
  2. if the process writing the frame gets borked in a way that prevents it from writing the closing delimiter, the stream never recovers. This might not be a failure mode that we need to worry about though.

Re: needing to know the number of frame bytes: that's why you'd need the 254-byte buffer on the writer side. You only need to lookahead for a 0x00 up to 254 bytes because if it's longer than that you just put 0xff as the frame byte and send the next 254 bytes as-is.

Anyways, mostly I just wanted to throw it out there as an alternative, not advocate to change course, so if these points aren't convincing I think the random delimiter is also a good way to go.

@ssfrr
Copy link

ssfrr commented Oct 28, 2015

reading my comment, "code byte" is probably a better term than "frame byte". The frame delimiter is a literal 0x00, and the code byte can happen many times within the frame, each time indicating the number of nonzero bytes following.

@malmaud
Copy link

malmaud commented Oct 28, 2015

Oh ya, I see. Do you mind if I implement it before you? Now I'm excited about it.

@ssfrr
Copy link

ssfrr commented Oct 28, 2015

I wouldn't be able to get to it until the beginning of next week, so go for it!

@StefanKarpinski
Copy link
Sponsor Member

That's a clever way to do this.

@malmaud
Copy link

malmaud commented Oct 28, 2015

Alright, JuliaLang/julia#13795 can now recover gracefully from deserialize failures by keeping a log of ACKed messages and using frame delimiters to reset the message stream to a working state.

@amitmurthy
Copy link
Contributor Author

Why do we need both? Won't just using frame delimiters be enough? As for the possibility of a (extremely unlikely) collision, it is to be noted that it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after JuliaLang/julia@77b2527

@malmaud
Copy link

malmaud commented Oct 29, 2015

Well, you want a way for the worker to signal to the client that a
deserialization error happened, and to associate that signal with a specific
message. The worker doesn't know the response_oid of the client's Call msg
because that was part of the garbled message. So messages need to have a
unique ID and senders need to remember the association between message IDs
and messages at least until a successful Ack comes back.

Plus I'm hoping it might lay some groundwork for other applications that
take advantage of a message log, such as debugging of the communication
between workers and maybe eventually replaying messages to get workers
into a known state.

On Wed, Oct 28, 2015 at 11:53 PM Amit Murthy notifications@github.com
wrote:

Why do we need both? Won't just using frame delimiters be enough? As for
the possibility of a (extremely unlikely) collision, it is to be noted that
it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after 77b2527
JuliaLang/julia@77b2527


Reply to this email directly or view it on GitHub
#18.

@amitmurthy
Copy link
Contributor Author

It may be better to split the message into a "header" and "body" and deserialize in two steps. First we deserialize the header (which should never fail - if it does fail, treat it as a fatal error) and then the function/args. Errors while deserializing function and args are treated the same as errors while executing f(args), i.e., send back an appropriate RemoteException.

@malmaud
Copy link

malmaud commented Oct 29, 2015

Makes sense. If the header is going to be message-agnostic, there'd need to
be a bit of refactoring of the message types so that response_oid is part
of the header instead of the message bodies, and just won't have a meaning
for message types that don't currently have a response_oid.

Otherwise each message type could have its own corresponding header type,
but that seems a bit cumbersome.

On Thu, Oct 29, 2015 at 12:14 AM Amit Murthy notifications@github.com
wrote:

It may be better to split the message into a "header" and "body" and
deserialize in two steps. First we deserialize the header (which should
never fail - if it does fail, treat it as a fatal error) and then the
function/args. Errors while deserializing function and args are treated the
same as errors while executing f(args), i.e., send back an appropriate
RemoteException.


Reply to this email directly or view it on GitHub
#18.

@amitmurthy
Copy link
Contributor Author

Yes. The header needs to contain just the response_oid which identifies the RemoteRef that stores the result and notify_oid that identifies the ref that the caller task (on the sender side) is waiting on. They may be nothing in some cases where a matching response is not expected.

@malmaud
Copy link

malmaud commented Oct 29, 2015

Alright, JuliaLang/julia#13795 implements the header mechanism.

@vtjnash vtjnash transferred this issue from JuliaLang/julia Feb 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants