Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make workers resilient to deserialize failures #13795

Merged
merged 4 commits into from
Jun 30, 2016
Merged

Conversation

malmaud
Copy link
Contributor

@malmaud malmaud commented Oct 27, 2015

  • Add a boundary marker to the wire format of transmitting remote messages so errors in deserialization can be recovered from.
  • Store deserialization error in proper refs to the client sees the error at the expected time.

@@ -786,7 +805,7 @@ end

function wait_ref(rid, callee, args...)
v = fetch_ref(rid, args...)
if isa(v, RemoteException)
if isa(v, Remot1eException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliberate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you know the answer is to that

@malmaud malmaud force-pushed the jmm/boundary_message branch 3 times, most recently from e6e88a5 to 57dd49c Compare October 29, 2015 18:44
@malmaud malmaud changed the title WIP: Make workers resilient to deserialize failures Make workers resilient to deserialize failures Oct 29, 2015
@kshyatt kshyatt added the domain:parallelism Parallel or distributed computation label Oct 29, 2015
@amitmurthy
Copy link
Contributor

Looks good!

@amitmurthy
Copy link
Contributor

It is to be noted that this adds a minimum of an extra 20 bytes for every message. While it is not much generally, it will worsen the already poor scalability of our existing DGC mechanism. That issue should be tackled by having write-once Futures (in most cases where RemoteRefs are currently used), and a complete removal of RemoteRefs from DArrays.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

If the 20 bytes really is a concern, then @ssfrr proposal to use https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing could be used.

But I do totally agree that if we're at the point where 20 bytes/message is a concern, it's a symbol of the fact that too many small messages are being sent and that can be more productively addressed through architectural changes than by byte-shaving.

@amitmurthy
Copy link
Contributor

I prefer this scheme to COBS which will have quite a computation overhead for long messages, no?

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

Looking at Wikipedia's C reference implementation, It will be O(n) in the length of the message with a constant pretty close to 1.

@amitmurthy
Copy link
Contributor

How would COBS work for direct read/write to large array locations without any intermediate buffer?

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

COBS required a fixed 256-byte buffer independent of the size of the stream it's encoding.

So we could create a new IO type that has as members an internal buffer of Vector{UInt}(256) and a reference to the underlying (TCP) stream. The calls to serialize and deserialize in multi.jl would be passed an instance of this type instead of the TCP socket directly.

But COBS introduces one byte of overhead for every 256 bytes in the original message.So by the time the message is (20*256)=5120 bytes, the overhead is greater than the random-10-byte boundary marker scheme.

@amitmurthy
Copy link
Contributor

What I meant was the computational overhead will include the cost of a memcpy for each block of 256 bytes. In case of large arrays, this would be quite significant.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

Ya, that's true.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

Anyways, I'm definitely not advocating for COBS, just pointing it out. I'd vote for merging this as-is and then tacking the general latency and performance problems with routine @spawn and @parallel usage as a separate matter.

@StefanKarpinski
Copy link
Sponsor Member

I don't like the idea that there's even a remote chance of accidental incorrect decoding. That just seems like asking for trouble. But I also don't want to pick a scheme that precludes zero-copy I/O for arrays.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 31, 2015

I think the risk is being overstated - if deserialization occurs correctly, then it doesn't matter if the serialized data has the boundary bytes in it. That's the key thing to remember.

The only risk is if deserialization throws an error, and then in the middle of the deserialization stream past where the error was thrown the 10 boundary bytes occur (there's a 1e-25 probability of any such 8-byte sequence accidentally matching the boundary bytes). And even in that vanishingly small situation that requires the confluence of deserialization failing and a ~1e-25 event occurring, the worker would just immediately throw a fatal error as it tries to read the next message in the stream.

Cf the widely used HTTP multipart encoding, which uses this same scheme except will fail if the boundary bytes occur anywhere in the payload, regardless of there's any kind of error or not.

@StefanKarpinski
Copy link
Sponsor Member

That's a good argument.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 2, 2015

What's the current thinking about this? Anything more I can do here?

@jakebolewski
Copy link
Member

Can you add more detailed comments as to what is going on (for instance the byte representation of a message stream)?

@amitmurthy
Copy link
Contributor

LGTM. It will be good to test the impact of this change with timing, say a 10^5 remotecall_fetch that just echoes its argument. Just to rule out any perf regression.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 5, 2015

Testing locally with

addprocs(1)
@everywhere echo(x) = x
function test()
  for n=1:10^5
    remotecall_fetch(echo, 2, :test)
  end
end

@time test()
@time test()

gives 19.044689 seconds (10.24 M allocations: 570.170 MB, 0.36% gc time) on current julia master, and 24.348992 seconds (13.84 M allocations: 815.837 MB, 0.35% gc time) with the PR.

So the regression is actually more serious than I thought.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 5, 2015

Maybe instead of allocating and randomly generating the 10-byte boundary marker each time a message sent, a single pre-generated boundary could be used for the entire Julia session.

@StefanKarpinski
Copy link
Sponsor Member

Why not just pick a fixed 10-byte boundary? Then it's deterministic, which I like better. I guess that makes it vulnerable to intentional attacks though.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 6, 2015

Ya, that's what I meant by 'pre-generated boundary'.

Another other obvious optimization is to hoist the allocation of the buffer that stores the boundary bytes outside the loop in the worker message-processing loop.

@amitmurthy
Copy link
Contributor

I don't see how it increases vulnerability since we are not depending on the boundary for message processing. We are dependent on it only for recovery from a deserialization error.

We could have a random 10-byte boundary that is generated and sent once for every connection as part of the absolute first message sent from either side.This is used for the duration of the connection and we have a pair of them for every connection. For connections from master-worker, JoinPGRPMsg and JoinCompleteMsg are the two initial handshake messages.
For worker-worker connections we have IdentifySocketMsg sent from the initiating side and we will need to define one more for the response (which will only have the 10-byte random string).

So, for a connection between workers say, 4 and 5, we have a 10-byte boundary defined for messages sent from 4 to 5 and a different boundary for messages sent from 5 to 4.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 7, 2015

What's gained from that vs a fixed Julia-wise 10 bytes?
On Fri, Nov 6, 2015 at 7:55 PM Amit Murthy notifications@github.com wrote:

I don't see how it increases vulnerability since we are not depending on
the boundary for message processing. We are dependent on it only for
recovery from a deserialization error.

We could have a random 10-byte boundary that is generated and sent once
for every connection as part of the absolute first message sent from either
side.This is used for the duration of the connection and we have a pair of
them for every connection. For connections from master-worker, JoinPGRPMsg
and JoinCompleteMsg are the two initial handshake messages.
For worker-worker connections we have IdentifySocketMsg sent from the
initiating side and we will need to define one more for the response (which
will only have the 10-byte random string).

So, for a connection between workers say, 4 and 5, we have a 10-byte
boundary defined for messages sent from 4 to 5 and a different boundary
for messages sent from 5 to 4.


Reply to this email directly or view it on GitHub
#13795 (comment).

@malmaud
Copy link
Contributor Author

malmaud commented Jun 23, 2016

Hmm, this whole PR kinda slipped my mind. I could probably look into it today.

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

@amitmurthy I'm thinking of working on this during the hackathon today. Not sure it's worth the effort to formally rebase given the numbers of conflicts I'm getting, but I can reimplement the logic on top of current master without too much trouble.

@amitmurthy
Copy link
Contributor

As discussed, have rebased onto master. There is still a slowdown that needs to be tackled.

master:
julia> @time test()
  9.576020 seconds (10.41 M allocations: 360.453 MB, 0.42% gc time)

julia> @time test()
  9.567583 seconds (10.38 M allocations: 358.963 MB, 0.38% gc time)


Branch:
julia> @time test()
 12.360206 seconds (13.08 M allocations: 430.679 MB, 0.37% gc time)

julia> @time test()
 12.479311 seconds (13.08 M allocations: 430.680 MB, 0.38% gc time)

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

Great! Thinks to try:

  • Make MsgHeader a stack-allocatable immutable
  • Time the serialization of MsgHeader objects. If too high, maybe just sent four raw ints over the wire.

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

Is there a reason to not make RRID an immutable with concretely-typed fields? Then it could be stack-allocatable, in turn causing MsgHeader to be stack-allocatable.

@amitmurthy
Copy link
Contributor

Not really. Making stuff immutable didn't help. Will try writing the ints directly next.

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

Hmm, I pushed my own version of immutability but I'm not sure exactly what timing tests you're running.

@amitmurthy
Copy link
Contributor

Locally testing. Running the same test as #13795 (comment)

@amitmurthy
Copy link
Contributor

Serializing 4 ints and not the type returns

julia> @time test()
  8.709813 seconds (10.31 M allocations: 363.496 MB, 0.49% gc time)

julia> @time test()
  8.572864 seconds (10.28 M allocations: 362.015 MB, 0.45% gc time)

Better than master. What timings are you seeing with just making the header stack-allocatable?

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

No improvement. I didn't look at the codegen though to see if its actually stack allocating though. It's also possible that even if the MsgHeader itself is stack-allocating, serializing it is still allocating wastefully.

@malmaud
Copy link
Contributor Author

malmaud commented Jun 25, 2016

So seems the right thing to do is to just sent the raw ints? Probably still a good coding practice to change as many of the types in multi.jl to immutable as possible, but that can be a separate PR.

@amitmurthy
Copy link
Contributor

Just pushed header-as-ints update. But this leads me to wonder what the actual overhead now is of each message itself wrapped in a type.

@amitmurthy
Copy link
Contributor

Bypassing the serializer for AbstractMessage container improves the timings a bit more.

julia> @time test()
  7.924848 seconds (8.19 M allocations: 347.378 MB, 0.53% gc time)

julia> @time test()
  7.787827 seconds (8.19 M allocations: 347.401 MB, 0.50% gc time)

Will cleanup the code and push

@@ -28,13 +28,8 @@ let REF_ID::Int = 1
end

immutable RRID
<<<<<<< Updated upstream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be sure to squash this

@amitmurthy
Copy link
Contributor

@malmaud , have pushed an update. I have optimized it a bit more and am seeing decent numbers now.

julia> @time test()
  7.412087 seconds (7.69 M allocations: 336.697 MB, 0.52% gc time)

julia> @time test()
  7.625430 seconds (7.69 M allocations: 336.720 MB, 0.50% gc time)

Please review and merge.

On a different note, optimizing generic serialization of compound types will result in quite a large speedup of inter-process communication. But that is probably 0.6 work.

end
end
end
@eval begin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine the two eval blocks?

@malmaud
Copy link
Contributor Author

malmaud commented Jun 30, 2016

LGTM! Let's merge once CI passes unless anyone has any further objections. Thanks @amitmurthy for all the help in seeing this through.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain:parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants