Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Message atomicity bug in load-balancer fixed

If the peer getting the message have disconnected in the middle
of multiplart message, the remaining part of the message went
to a different peer. This patch fixes the issue.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
  • Loading branch information...
commit 1619b3d84a04fe1886347fd83280a6070c9603f4 1 parent 92c7c18
@sustrik sustrik authored
Showing with 27 additions and 1 deletion.
  1. +24 −1 src/lb.cpp
  2. +3 −0  src/lb.hpp
View
25 src/lb.cpp
@@ -29,6 +29,7 @@ zmq::lb_t::lb_t (own_t *sink_) :
active (0),
current (0),
more (false),
+ dropping (false),
sink (sink_),
terminating (false)
{
@@ -65,9 +66,16 @@ void zmq::lb_t::terminate ()
void zmq::lb_t::terminated (writer_t *pipe_)
{
+ pipes_t::size_type index = pipes.index (pipe_);
+
+ // If we are in the middle of multipart message and current pipe
+ // have disconnected, we have to drop the remainder of the message.
+ if (index == current && more)
+ dropping = true;
+
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
- if (pipes.index (pipe_) < active) {
+ if (index < active) {
active--;
if (current == active)
current = 0;
@@ -87,6 +95,21 @@ void zmq::lb_t::activated (writer_t *pipe_)
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
+ // Drop the message if required. If we are at the end of the message
+ // switch back to non-dropping mode.
+ if (dropping) {
+
+ more = msg_->flags & ZMQ_MSG_MORE;
+ if (!more)
+ dropping = false;
+
+ int rc = zmq_msg_close (msg_);
+ errno_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
while (active > 0) {
if (pipes [current]->write (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE;
View
3  src/lb.hpp
@@ -61,6 +61,9 @@ namespace zmq
// True if last we are in the middle of a multipart message.
bool more;
+ // True if we are dropping current message.
+ bool dropping;
+
// Object to send events to.
class own_t *sink;
Please sign in to comment.
Something went wrong with that request. Please try again.