From cc54d02fe826b19116727368672034104b24f5ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaakko=20Kera=CC=88nen?= Date: Sat, 14 Oct 2017 16:33:33 +0300 Subject: [PATCH] libcore|Socket: Use a background thread to compress large messages --- doomsday/sdk/libcore/include/de/net/socket.h | 4 ++ doomsday/sdk/libcore/src/data/huffman.cpp | 12 +++--- doomsday/sdk/libcore/src/net/socket.cpp | 45 +++++++++++++++++--- 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/doomsday/sdk/libcore/include/de/net/socket.h b/doomsday/sdk/libcore/include/de/net/socket.h index 9447f99eb4..922779d813 100644 --- a/doomsday/sdk/libcore/include/de/net/socket.h +++ b/doomsday/sdk/libcore/include/de/net/socket.h @@ -42,6 +42,10 @@ class Message; * * ListenSocket constructs Socket instances for incoming connections. * + * Note that Socket instances must always be used in the same thread as ListenSocket. + * Socket uses a background thread for compressing large messages before sending. This + * means they may be sent out-of-order with regards to all other messages. + * * @ingroup net */ class DENG2_PUBLIC Socket : public QObject, public Transmitter diff --git a/doomsday/sdk/libcore/src/data/huffman.cpp b/doomsday/sdk/libcore/src/data/huffman.cpp index f18529149b..aa44b0a0a2 100644 --- a/doomsday/sdk/libcore/src/data/huffman.cpp +++ b/doomsday/sdk/libcore/src/data/huffman.cpp @@ -19,7 +19,7 @@ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser * General Public License for more details. You should have received a copy of * the GNU Lesser General Public License along with this program; if not, see: - * http://www.gnu.org/licenses + * http://www.gnu.org/licenses */ #include "de/data/huffman.h" @@ -330,7 +330,7 @@ struct Huffman * Checks if the encoding/decoding buffer can hold the given number of * bytes. If not, reallocates the buffer. */ - void Huff_ResizeBuffer(HuffBuffer *buffer, dsize neededSize) + static void Huff_ResizeBuffer(HuffBuffer *buffer, dsize neededSize) { while (neededSize > buffer->size) { @@ -339,7 +339,7 @@ struct Huffman else buffer->size *= 2; } - buffer->data = (dbyte *) realloc(buffer->data, buffer->size); + buffer->data = reinterpret_cast(realloc(buffer->data, buffer->size)); } /** @@ -364,7 +364,7 @@ struct Huffman zapPtr(buffer); } - dbyte *encode(dbyte const *data, dsize size, dsize *encodedSize) + dbyte *encode(dbyte const *data, dsize size, dsize *encodedSize) const { HuffBuffer huffEnc; dsize i; @@ -429,7 +429,7 @@ struct Huffman return huffEnc.data; } - dbyte *decode(dbyte const *data, dsize size, dsize *decodedSize) + dbyte *decode(dbyte const *data, dsize size, dsize *decodedSize) const { HuffBuffer huffDec; HuffNode *node; @@ -439,7 +439,7 @@ struct Huffman dbyte bit = 3, lastByteBits; if (!data || size == 0) return nullptr; - + zap(huffDec); Huff_ResizeBuffer(&huffDec, 256); diff --git a/doomsday/sdk/libcore/src/net/socket.cpp b/doomsday/sdk/libcore/src/net/socket.cpp index 977ac123bc..f27bd4ed17 100644 --- a/doomsday/sdk/libcore/src/net/socket.cpp +++ b/doomsday/sdk/libcore/src/net/socket.cpp @@ -67,6 +67,7 @@ */ #include "de/Socket" +#include "de/Async" #include "de/Message" #include "de/Writer" #include "de/Reader" @@ -81,6 +82,7 @@ static duint const MAX_CHANNELS = 2; static int const MAX_SIZE_SMALL = 127; // bytes static int const MAX_SIZE_MEDIUM = 4095; // bytes +static int const MAX_SIZE_BIG = 10*MAX_SIZE_MEDIUM; static int const MAX_SIZE_LARGE = DENG2_SOCKET_MAX_PAYLOAD_SIZE; /// Threshold for input data size: messages smaller than this are first compressed @@ -227,11 +229,9 @@ DENG2_PIMPL_NOREF(Socket) foreach (Message *msg, receivedMessages) delete msg; } - void serializeAndSendMessage(IByteArray const &packet) + void serializeMessage(MessageHeader &header, Block &payload) { - Block payload(packet); Block huffData; - MessageHeader header; // Let's find the appropriate compression method of the payload. First see // if the encoded contents are under 128 bytes as Huffman codes. @@ -255,7 +255,7 @@ DENG2_PIMPL_NOREF(Socket) if (!header.size) // Try deflate. { - int const level = (payload.size() < 10*MAX_SIZE_MEDIUM? 1 /*fast*/ : 9 /*best*/); + int const level = (payload.size() < MAX_SIZE_BIG? 1 /*fast*/ : 9 /*best*/); Block const deflated = payload.compressed(level); if (!deflated.size()) @@ -284,6 +284,12 @@ DENG2_PIMPL_NOREF(Socket) payload = deflated; } } + } + + void sendMessage(MessageHeader const &header, + Block const &payload) + { + DENG2_ASSERT(QThread::currentThread() == socket->thread()); // Write the message header. Block dest; @@ -291,13 +297,41 @@ DENG2_PIMPL_NOREF(Socket) socket->write(dest); // Update totals (for statistics). - dsize total = dest.size() + payload.size(); + dsize const total = dest.size() + payload.size(); bytesToBeWritten += total; totalBytesWritten += total; socket->write(payload); } + void serializeAndSendMessage(IByteArray const &packet) + { + Block payload = packet; + + if (packet.size() >= MAX_SIZE_BIG) + { + async([this, payload] () + { + // Prepare for sending in a background thread, since it may take a moment. + MessageHeader header; + Block msgData = payload; + serializeMessage(header, msgData); + return std::make_pair(header, msgData); + }, + [this] (std::pair msg) + { + // Write to socket in main thread. + sendMessage(msg.first, msg.second); + }); + } + else + { + MessageHeader header; + serializeMessage(header, payload); + sendMessage(header, payload); + } + } + /** * Checks the incoming bytes and sees if any messages can be formed. */ @@ -312,7 +346,6 @@ DENG2_PIMPL_NOREF(Socket) // A message must be at least two bytes long (header + payload). return; } - try { Reader reader(receivedBytes);