Skip to content

Commit

Permalink
Fix race condition of Tunnel.
Browse files Browse the repository at this point in the history
High watermark callback should only stop reading peer connection if its
output buffer is not empty.

Originally, the high watermark callback will stop reading the peer
connection and register a write complete callback unconditionally.

Consiter two events happening in the same EventLoop iteration:
  Connection C is readable
  Connection S is writable

In C.onMessage(), it calls S.send(), then decides to queue a high
watermark callback because S.outputBuffer is full.  S.HWM will be
called after event handling.

In S.handleWrite(), all data are sent, output buffer becomes empty,
but it doesn't queue write complete callback of S because it's not
registered yet.

In doPendingFunctors(), S.HWM is called, stop reading C, and register a
write complete callback for S to re-enable reading C.  This will never
happen because S has nothing to send, it is already write complete.

A false fix:
  Register write complete callback at beginning, never unregister it.
This of course has some performance penalty, but appears to work
because in the case of two events above, S.handleWrite() will queue its
write complete callback after S.HWM.  In doPendingFunctors(), first call
S.HWM to stop reading C, then call S.WC to re-enable reading C.

This false fix has a subtle dependency on the execution sequence of
muduo EventLoop::doPendingFunctors(), and if EventLoop replace the queue
with a stack, the program breaks mystically.
  • Loading branch information
chenshuo committed Dec 23, 2016
1 parent 7267ff1 commit 0502020
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions examples/socks4a/tunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
client_.setMessageCallback(
boost::bind(&Tunnel::onClientMessage, shared_from_this(), _1, _2, _3));
serverConn_->setHighWaterMarkCallback(
boost::bind(&Tunnel::onHighWaterMarkWeak, boost::weak_ptr<Tunnel>(shared_from_this()), kServer, _1, _2),
boost::bind(&Tunnel::onHighWaterMarkWeak,
boost::weak_ptr<Tunnel>(shared_from_this()), kServer, _1, _2),
1024*1024);
}

Expand All @@ -61,6 +62,7 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
serverConn_->setContext(boost::any());
serverConn_->shutdown();
}
clientConn_.reset();
}

void onClientConnection(const muduo::net::TcpConnectionPtr& conn)
Expand All @@ -70,10 +72,12 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
{
conn->setTcpNoDelay(true);
conn->setHighWaterMarkCallback(
boost::bind(&Tunnel::onHighWaterMarkWeak, boost::weak_ptr<Tunnel>(shared_from_this()), kClient, _1, _2),
boost::bind(&Tunnel::onHighWaterMarkWeak,
boost::weak_ptr<Tunnel>(shared_from_this()), kClient, _1, _2),
1024*1024);
serverConn_->setContext(conn);
serverConn_->startRead();
clientConn_ = conn;
if (serverConn_->inputBuffer()->readableBytes() > 0)
{
conn->send(serverConn_->inputBuffer());
Expand Down Expand Up @@ -113,17 +117,26 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
LOG_INFO << (which == kServer ? "server" : "client")
<< " onHighWaterMark " << conn->name()
<< " bytes " << bytesToSent;

if (which == kServer)
{
client_.connection()->stopRead();
serverConn_->setWriteCompleteCallback(
boost::bind(&Tunnel::onWriteCompleteWeak, boost::weak_ptr<Tunnel>(shared_from_this()), kServer, _1));
if (serverConn_->outputBuffer()->readableBytes() > 0)
{
clientConn_->stopRead();
serverConn_->setWriteCompleteCallback(
boost::bind(&Tunnel::onWriteCompleteWeak,
boost::weak_ptr<Tunnel>(shared_from_this()), kServer, _1));
}
}
else
{
serverConn_->stopRead();
client_.connection()->setWriteCompleteCallback(
boost::bind(&Tunnel::onWriteCompleteWeak, boost::weak_ptr<Tunnel>(shared_from_this()), kClient, _1));
if (clientConn_->outputBuffer()->readableBytes() > 0)
{
serverConn_->stopRead();
clientConn_->setWriteCompleteCallback(
boost::bind(&Tunnel::onWriteCompleteWeak,
boost::weak_ptr<Tunnel>(shared_from_this()), kClient, _1));
}
}
}

Expand All @@ -145,13 +158,13 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
<< " onWriteComplete " << conn->name();
if (which == kServer)
{
client_.connection()->startRead();
clientConn_->startRead();
serverConn_->setWriteCompleteCallback(muduo::net::WriteCompleteCallback());
}
else
{
serverConn_->startRead();
client_.connection()->setWriteCompleteCallback(muduo::net::WriteCompleteCallback());
clientConn_->setWriteCompleteCallback(muduo::net::WriteCompleteCallback());
}
}

Expand All @@ -169,6 +182,7 @@ class Tunnel : public boost::enable_shared_from_this<Tunnel>,
private:
muduo::net::TcpClient client_;
muduo::net::TcpConnectionPtr serverConn_;
muduo::net::TcpConnectionPtr clientConn_;
};
typedef boost::shared_ptr<Tunnel> TunnelPtr;

Expand Down

0 comments on commit 0502020

Please sign in to comment.