diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index b28885144f..7c537a78ad 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -253,7 +253,8 @@ class Peer : public std::enable_shared_from_this, virtual void connectHandler(asio::error_code const& ec); virtual void - writeHandler(asio::error_code const& error, size_t bytes_transferred) + writeHandler(asio::error_code const& error, size_t bytes_transferred, + size_t messages_transferred) { } @@ -263,7 +264,8 @@ class Peer : public std::enable_shared_from_this, } virtual void - readBodyHandler(asio::error_code const& error, size_t bytes_transferred) + readBodyHandler(asio::error_code const& error, size_t bytes_transferred, + size_t expected_length) { } diff --git a/src/overlay/PeerDoor.cpp b/src/overlay/PeerDoor.cpp index 22b37213ef..95c1eac24b 100644 --- a/src/overlay/PeerDoor.cpp +++ b/src/overlay/PeerDoor.cpp @@ -57,8 +57,8 @@ PeerDoor::acceptNextPeer() } CLOG(DEBUG, "Overlay") << "PeerDoor acceptNextPeer()"; - auto sock = make_shared( - mApp.getClock().getIOContext(), TCPPeer::BUFSZ, TCPPeer::BUFSZ); + auto sock = make_shared(mApp.getClock().getIOContext(), + TCPPeer::BUFSZ); mAcceptor.async_accept(sock->next_layer(), [this, sock](asio::error_code const& ec) { if (ec) diff --git a/src/overlay/TCPPeer.cpp b/src/overlay/TCPPeer.cpp index 2a71d731e4..d3ffdb9c9f 100644 --- a/src/overlay/TCPPeer.cpp +++ b/src/overlay/TCPPeer.cpp @@ -45,8 +45,7 @@ TCPPeer::initiate(Application& app, PeerBareAddress const& address) CLOG(DEBUG, "Overlay") << "TCPPeer:initiate" << " to " << address.toString(); assertThreadIsMain(); - auto socket = - make_shared(app.getClock().getIOContext(), BUFSZ, BUFSZ); + auto socket = make_shared(app.getClock().getIOContext(), BUFSZ); auto result = make_shared(app, WE_CALLED_REMOTE, socket); result->mAddress = address; result->startIdleTimer(); @@ -147,21 +146,20 @@ TCPPeer::sendMessage(xdr::msg_ptr&& xdrBytes) CLOG(TRACE, "Overlay") << "TCPPeer:sendMessage to " << toString(); assertThreadIsMain(); - // places the buffer to write into the write queue TimestampedMessage msg; msg.mEnqueuedTime = mApp.getClock().now(); msg.mMessage = std::move(xdrBytes); - auto tsm = std::make_shared(std::move(msg)); + mWriteQueue.emplace_back(std::move(msg)); - auto self = static_pointer_cast(shared_from_this()); - - self->mWriteQueue.emplace(tsm); - - if (!self->mWriting) + if (!mWriting) { - self->mWriting = true; - // kick off the async write chain if we're the first one - self->messageSender(); + mWriting = true; + // Post a write to the next crank. We do this asynchronously so + // we have a (brief but important) chance to enqueue a bunch of messages + // before we issue the write. + auto self = static_pointer_cast(shared_from_this()); + self->getApp().postOnMainThread([self]() { self->messageSender(); }, + "TCPPeer: messageSender"); } } @@ -234,56 +232,78 @@ TCPPeer::messageSender() { assertThreadIsMain(); - auto self = static_pointer_cast(shared_from_this()); - - // if nothing to do, flush and return + // if nothing to do, mark progress and return. if (mWriteQueue.empty()) { - mLastEmpty = mApp.getClock().now(); - mSocket->async_flush([self](asio::error_code const& ec, std::size_t) { - self->writeHandler(ec, 0); - if (!ec) - { - if (!self->mWriteQueue.empty()) - { - self->messageSender(); - } - else - { - self->mWriting = false; - // there is nothing to send and delayed shutdown was - // requested - time to perform it - if (self->mDelayedShutdown) - { - self->shutdown(); - } - } - } - }); + mWriting = false; + // there is nothing to send and delayed shutdown was + // requested - time to perform it + if (mDelayedShutdown) + { + shutdown(); + } return; } - // peek the buffer from the queue - // do not remove it yet as we need the buffer for the duration of the - // write operation - auto tsm = mWriteQueue.front(); - tsm->mIssuedTime = mApp.getClock().now(); - - asio::async_write( - *(mSocket.get()), - asio::buffer(tsm->mMessage->raw_data(), tsm->mMessage->raw_size()), - [self, tsm](asio::error_code const& ec, std::size_t length) { - self->writeHandler(ec, length); - tsm->mCompletedTime = self->mApp.getClock().now(); - tsm->recordWriteTiming(self->getOverlayMetrics()); - self->mWriteQueue.pop(); // done with front element - - // continue processing the queue/flush - if (!ec) - { - self->messageSender(); - } - }); + // Take a snapshot of the contents of mWriteQueue into mWriteBuffers, in + // terms of asio::const_buffers pointing into the elements of mWriteQueue, + // and then issue a single multi-buffer ("scatter-gather") async_write that + // covers the whole snapshot. We'll get called back when the batch is + // completed, at which point we'll clear mWriteBuffers and remove the entire + // snapshot worth of corresponding messages from mWriteQueue (though it may + // have grown a bit in the meantime -- we remove only a prefix). + assert(mWriteBuffers.empty()); + auto now = mApp.getClock().now(); + size_t expected_length = 0; + for (auto& tsm : mWriteQueue) + { + tsm.mIssuedTime = now; + size_t sz = tsm.mMessage->raw_size(); + mWriteBuffers.emplace_back(tsm.mMessage->raw_data(), sz); + expected_length += sz; + } + + getOverlayMetrics().mAsyncWrite.Mark(); + auto self = static_pointer_cast(shared_from_this()); + asio::async_write(*(mSocket.get()), mWriteBuffers, + [self, expected_length](asio::error_code const& ec, + std::size_t length) { + if (expected_length != length) + { + self->drop("error during async_write", + Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); + return; + } + self->writeHandler(ec, length, + self->mWriteBuffers.size()); + + // Walk through a _prefix_ of the write queue + // _corresponding_ to the write buffers we just sent. + // While walking, record the sent-time in metrics, but + // also advance iterator 'i' so we wind up with an + // iterator range to erase from the front of the write + // queue. + auto now = self->mApp.getClock().now(); + auto i = self->mWriteQueue.begin(); + while (!self->mWriteBuffers.empty()) + { + i->mCompletedTime = now; + i->recordWriteTiming(self->getOverlayMetrics()); + ++i; + self->mWriteBuffers.pop_back(); + } + + // Erase the messages from the write queue that we + // just forgot about the buffers for. + self->mWriteQueue.erase(self->mWriteQueue.begin(), i); + + // continue processing the queue + if (!ec) + { + self->messageSender(); + } + }); } void @@ -299,7 +319,8 @@ TCPPeer::TimestampedMessage::recordWriteTiming(OverlayMetrics& metrics) void TCPPeer::writeHandler(asio::error_code const& error, - std::size_t bytes_transferred) + std::size_t bytes_transferred, + size_t messages_transferred) { assertThreadIsMain(); mLastWrite = mApp.getClock().now(); @@ -329,14 +350,66 @@ TCPPeer::writeHandler(asio::error_code const& error, else if (bytes_transferred != 0) { LoadManager::PeerContext loadCtx(mApp, mPeerID); - getOverlayMetrics().mMessageWrite.Mark(); + getOverlayMetrics().mMessageWrite.Mark(messages_transferred); getOverlayMetrics().mByteWrite.Mark(bytes_transferred); - ++mPeerMetrics.mMessageWrite; + mPeerMetrics.mMessageWrite += messages_transferred; mPeerMetrics.mByteWrite += bytes_transferred; } } +void +TCPPeer::noteErrorReadHeader(size_t nbytes, asio::error_code const& ec) +{ + receivedBytes(nbytes, false); + getOverlayMetrics().mErrorRead.Mark(); + std::string msg("error reading message header: "); + msg.append(ec.message()); + drop(msg, Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); +} + +void +TCPPeer::noteShortReadHeader(size_t nbytes) +{ + receivedBytes(nbytes, false); + getOverlayMetrics().mErrorRead.Mark(); + drop("short read of message header", Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); +} + +void +TCPPeer::noteFullyReadHeader() +{ + receivedBytes(HDRSZ, false); +} + +void +TCPPeer::noteErrorReadBody(size_t nbytes, asio::error_code const& ec) +{ + receivedBytes(nbytes, false); + getOverlayMetrics().mErrorRead.Mark(); + std::string msg("error reading message body: "); + msg.append(ec.message()); + drop(msg, Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); +} + +void +TCPPeer::noteShortReadBody(size_t nbytes) +{ + receivedBytes(nbytes, false); + getOverlayMetrics().mErrorRead.Mark(); + drop("short read of message body", Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); +} + +void +TCPPeer::noteFullyReadBody(size_t nbytes) +{ + receivedBytes(nbytes, true); +} + void TCPPeer::startRead() { @@ -346,58 +419,64 @@ TCPPeer::startRead() return; } - const size_t HDRSZ = 4; - auto self = static_pointer_cast(shared_from_this()); - - assert(self->mIncomingHeader.size() == 0); + mIncomingHeader.clear(); if (Logging::logTrace("Overlay")) - CLOG(TRACE, "Overlay") << "TCPPeer::startRead to " << self->toString(); + CLOG(TRACE, "Overlay") << "TCPPeer::startRead to " << toString(); - self->mIncomingHeader.resize(HDRSZ); + mIncomingHeader.resize(HDRSZ); - // We read large-ish (1MB) buffers of data from TCP which might have quite a - // few messages in them. We want to digest as many of these _synchronously_ - // as we can before we issue an async_read against ASIO. + // We read large-ish (256KB) buffers of data from TCP which might have quite + // a few messages in them. We want to digest as many of these + // _synchronously_ as we can before we issue an async_read against ASIO. YieldTimer yt(mApp.getClock()); while (mSocket->in_avail() >= HDRSZ && yt.shouldKeepGoing()) { - size_t n = mSocket->read_some(asio::buffer(mIncomingHeader)); + asio::error_code ec_hdr, ec_body; + size_t n = mSocket->read_some(asio::buffer(mIncomingHeader), ec_hdr); + if (ec_hdr) + { + noteErrorReadHeader(n, ec_hdr); + return; + } if (n != HDRSZ) { - drop("error during header read_some", - Peer::DropDirection::WE_DROPPED_REMOTE, - Peer::DropMode::IGNORE_WRITE_QUEUE); + noteShortReadHeader(n); return; } - size_t length = static_cast(getIncomingMsgLength()); - if (length != 0) + size_t length = getIncomingMsgLength(); + if (mSocket->in_avail() >= length) { - if (mSocket->in_avail() >= length) + // We can finish reading a full message here synchronously, + // which means we will count the received header bytes here. + noteFullyReadHeader(); + if (length != 0) { - // We can finish reading a full message here synchronously. mIncomingBody.resize(length); - n = mSocket->read_some(asio::buffer(mIncomingBody)); + n = mSocket->read_some(asio::buffer(mIncomingBody), ec_body); + if (ec_body) + { + noteErrorReadBody(n, ec_body); + return; + } if (n != length) { - drop("error during body read_some", - Peer::DropDirection::WE_DROPPED_REMOTE, - Peer::DropMode::IGNORE_WRITE_QUEUE); + noteShortReadBody(n); return; } - receivedBytes(length, true); + noteFullyReadBody(length); recvMessage(); } - else - { - // We read a header synchronously, but don't have enough data in - // the buffered_stream to read the body synchronously. Pretend - // we just finished reading the header asynchronously, and punt - // to readHeaderHandler to let it re-read the header and issue - // an async read for the body. - readHeaderHandler(asio::error_code(), HDRSZ); - return; - } + } + else + { + // We read a header synchronously, but don't have enough data in the + // buffered_stream to read the body synchronously. Pretend we just + // finished reading the header asynchronously, and punt to + // readHeaderHandler to let it re-read the header and issue an async + // read for the body. + readHeaderHandler(asio::error_code(), HDRSZ); + return; } } @@ -405,8 +484,8 @@ TCPPeer::startRead() // header (message length), issue an async_read and hope that the buffering // pulls in much more than just the 4 bytes we ask for here. getOverlayMetrics().mAsyncRead.Mark(); - asio::async_read(*(self->mSocket.get()), - asio::buffer(self->mIncomingHeader), + auto self = static_pointer_cast(shared_from_this()); + asio::async_read(*(mSocket.get()), asio::buffer(mIncomingHeader), [self](asio::error_code ec, std::size_t length) { if (Logging::logTrace("Overlay")) CLOG(TRACE, "Overlay") @@ -416,10 +495,10 @@ TCPPeer::startRead() }); } -int +size_t TCPPeer::getIncomingMsgLength() { - int length = mIncomingHeader[0]; + size_t length = static_cast(mIncomingHeader[0]); length &= 0x7f; // clear the XDR 'continuation' bit length <<= 8; length |= mIncomingHeader[1]; @@ -458,39 +537,36 @@ TCPPeer::readHeaderHandler(asio::error_code const& error, // << " to " << mRemoteListeningPort // << (error ? "error " : "") << " bytes:" << bytes_transferred; - if (!error) + if (error) { - receivedBytes(bytes_transferred, false); - int length = getIncomingMsgLength(); - if (length != 0) - { - mIncomingBody.resize(length); - auto self = static_pointer_cast(shared_from_this()); - asio::async_read(*mSocket.get(), asio::buffer(mIncomingBody), - [self](asio::error_code ec, std::size_t length) { - self->readBodyHandler(ec, length); - }); - } + noteErrorReadHeader(bytes_transferred, error); + } + else if (bytes_transferred != HDRSZ) + { + noteShortReadHeader(bytes_transferred); } else { - if (isConnected()) + noteFullyReadHeader(); + size_t expected_length = getIncomingMsgLength(); + if (expected_length != 0) { - // Only emit a warning if we have an error while connected; - // errors during shutdown or connection are common/expected. - getOverlayMetrics().mErrorRead.Mark(); - CLOG(DEBUG, "Overlay") - << "readHeaderHandler error: " << error.message() << ": " - << toString(); + mIncomingBody.resize(expected_length); + auto self = static_pointer_cast(shared_from_this()); + asio::async_read(*mSocket.get(), asio::buffer(mIncomingBody), + [self, expected_length](asio::error_code ec, + std::size_t length) { + self->readBodyHandler(ec, length, + expected_length); + }); } - drop("error during read", Peer::DropDirection::WE_DROPPED_REMOTE, - Peer::DropMode::IGNORE_WRITE_QUEUE); } } void TCPPeer::readBodyHandler(asio::error_code const& error, - std::size_t bytes_transferred) + std::size_t bytes_transferred, + std::size_t expected_length) { assertThreadIsMain(); // LOG(DEBUG) << "TCPPeer::readBodyHandler " @@ -498,26 +574,20 @@ TCPPeer::readBodyHandler(asio::error_code const& error, // << " to " << mRemoteListeningPort // << (error ? "error " : "") << " bytes:" << bytes_transferred; - if (!error) + if (error) { - receivedBytes(bytes_transferred, true); - recvMessage(); - mIncomingHeader.clear(); - startRead(); + noteErrorReadBody(bytes_transferred, error); + } + else if (bytes_transferred != expected_length) + { + noteShortReadBody(bytes_transferred); } else { - if (isConnected()) - { - // Only emit a warning if we have an error while connected; - // errors during shutdown or connection are common/expected. - getOverlayMetrics().mErrorRead.Mark(); - CLOG(ERROR, "Overlay") - << "readBodyHandler error: " << error.message() << " :" - << toString(); - } - drop("error during read", Peer::DropDirection::WE_DROPPED_REMOTE, - Peer::DropMode::IGNORE_WRITE_QUEUE); + noteFullyReadBody(bytes_transferred); + recvMessage(); + mIncomingHeader.clear(); + startRead(); } } diff --git a/src/overlay/TCPPeer.h b/src/overlay/TCPPeer.h index 583d796784..a532378155 100644 --- a/src/overlay/TCPPeer.h +++ b/src/overlay/TCPPeer.h @@ -6,7 +6,7 @@ #include "overlay/Peer.h" #include "util/Timer.h" -#include +#include namespace medida { @@ -23,7 +23,7 @@ static auto const MAX_MESSAGE_SIZE = 0x1000000; class TCPPeer : public Peer { public: - typedef asio::buffered_stream SocketType; + typedef asio::buffered_read_stream SocketType; static constexpr size_t BUFSZ = 0x40000; // 256KB private: @@ -40,7 +40,8 @@ class TCPPeer : public Peer std::vector mIncomingHeader; std::vector mIncomingBody; - std::queue> mWriteQueue; + std::vector mWriteBuffers; + std::deque mWriteQueue; bool mWriting{false}; bool mDelayedShutdown{false}; bool mShutdownScheduled{false}; @@ -50,16 +51,26 @@ class TCPPeer : public Peer void messageSender(); - int getIncomingMsgLength(); + size_t getIncomingMsgLength(); virtual void connected() override; void startRead(); + static constexpr size_t HDRSZ = 4; + void noteErrorReadHeader(size_t nbytes, asio::error_code const& ec); + void noteShortReadHeader(size_t nbytes); + void noteFullyReadHeader(); + void noteErrorReadBody(size_t nbytes, asio::error_code const& ec); + void noteShortReadBody(size_t nbytes); + void noteFullyReadBody(size_t nbytes); + void writeHandler(asio::error_code const& error, - std::size_t bytes_transferred) override; + std::size_t bytes_transferred, + std::size_t messages_transferred) override; void readHeaderHandler(asio::error_code const& error, std::size_t bytes_transferred) override; void readBodyHandler(asio::error_code const& error, - std::size_t bytes_transferred) override; + std::size_t bytes_transferred, + std::size_t expected_length) override; void shutdown(); public: