Skip to content

Commit

Permalink
libcore|Socket: Use a background thread to compress large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
skyjake committed Oct 14, 2017
1 parent e823c82 commit cc54d02
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
4 changes: 4 additions & 0 deletions doomsday/sdk/libcore/include/de/net/socket.h
Expand Up @@ -42,6 +42,10 @@ class Message;
*
* ListenSocket constructs Socket instances for incoming connections.
*
* Note that Socket instances must always be used in the same thread as ListenSocket.
* Socket uses a background thread for compressing large messages before sending. This
* means they may be sent out-of-order with regards to all other messages.
*
* @ingroup net
*/
class DENG2_PUBLIC Socket : public QObject, public Transmitter
Expand Down
12 changes: 6 additions & 6 deletions doomsday/sdk/libcore/src/data/huffman.cpp
Expand Up @@ -19,7 +19,7 @@
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
* General Public License for more details. You should have received a copy of
* the GNU Lesser General Public License along with this program; if not, see:
* http://www.gnu.org/licenses</small>
* http://www.gnu.org/licenses</small>
*/

#include "de/data/huffman.h"
Expand Down Expand Up @@ -330,7 +330,7 @@ struct Huffman
* Checks if the encoding/decoding buffer can hold the given number of
* bytes. If not, reallocates the buffer.
*/
void Huff_ResizeBuffer(HuffBuffer *buffer, dsize neededSize)
static void Huff_ResizeBuffer(HuffBuffer *buffer, dsize neededSize)
{
while (neededSize > buffer->size)
{
Expand All @@ -339,7 +339,7 @@ struct Huffman
else
buffer->size *= 2;
}
buffer->data = (dbyte *) realloc(buffer->data, buffer->size);
buffer->data = reinterpret_cast<dbyte *>(realloc(buffer->data, buffer->size));
}

/**
Expand All @@ -364,7 +364,7 @@ struct Huffman
zapPtr(buffer);
}

dbyte *encode(dbyte const *data, dsize size, dsize *encodedSize)
dbyte *encode(dbyte const *data, dsize size, dsize *encodedSize) const
{
HuffBuffer huffEnc;
dsize i;
Expand Down Expand Up @@ -429,7 +429,7 @@ struct Huffman
return huffEnc.data;
}

dbyte *decode(dbyte const *data, dsize size, dsize *decodedSize)
dbyte *decode(dbyte const *data, dsize size, dsize *decodedSize) const
{
HuffBuffer huffDec;
HuffNode *node;
Expand All @@ -439,7 +439,7 @@ struct Huffman
dbyte bit = 3, lastByteBits;

if (!data || size == 0) return nullptr;

zap(huffDec);
Huff_ResizeBuffer(&huffDec, 256);

Expand Down
45 changes: 39 additions & 6 deletions doomsday/sdk/libcore/src/net/socket.cpp
Expand Up @@ -67,6 +67,7 @@
*/

#include "de/Socket"
#include "de/Async"
#include "de/Message"
#include "de/Writer"
#include "de/Reader"
Expand All @@ -81,6 +82,7 @@ static duint const MAX_CHANNELS = 2;

static int const MAX_SIZE_SMALL = 127; // bytes
static int const MAX_SIZE_MEDIUM = 4095; // bytes
static int const MAX_SIZE_BIG = 10*MAX_SIZE_MEDIUM;
static int const MAX_SIZE_LARGE = DENG2_SOCKET_MAX_PAYLOAD_SIZE;

/// Threshold for input data size: messages smaller than this are first compressed
Expand Down Expand Up @@ -227,11 +229,9 @@ DENG2_PIMPL_NOREF(Socket)
foreach (Message *msg, receivedMessages) delete msg;
}

void serializeAndSendMessage(IByteArray const &packet)
void serializeMessage(MessageHeader &header, Block &payload)
{
Block payload(packet);
Block huffData;
MessageHeader header;

// Let's find the appropriate compression method of the payload. First see
// if the encoded contents are under 128 bytes as Huffman codes.
Expand All @@ -255,7 +255,7 @@ DENG2_PIMPL_NOREF(Socket)

if (!header.size) // Try deflate.
{
int const level = (payload.size() < 10*MAX_SIZE_MEDIUM? 1 /*fast*/ : 9 /*best*/);
int const level = (payload.size() < MAX_SIZE_BIG? 1 /*fast*/ : 9 /*best*/);
Block const deflated = payload.compressed(level);

if (!deflated.size())
Expand Down Expand Up @@ -284,20 +284,54 @@ DENG2_PIMPL_NOREF(Socket)
payload = deflated;
}
}
}

void sendMessage(MessageHeader const &header,
Block const &payload)
{
DENG2_ASSERT(QThread::currentThread() == socket->thread());

// Write the message header.
Block dest;
Writer(dest) << header;
socket->write(dest);

// Update totals (for statistics).
dsize total = dest.size() + payload.size();
dsize const total = dest.size() + payload.size();
bytesToBeWritten += total;
totalBytesWritten += total;

socket->write(payload);
}

void serializeAndSendMessage(IByteArray const &packet)
{
Block payload = packet;

if (packet.size() >= MAX_SIZE_BIG)
{
async([this, payload] ()
{
// Prepare for sending in a background thread, since it may take a moment.
MessageHeader header;
Block msgData = payload;
serializeMessage(header, msgData);
return std::make_pair(header, msgData);
},
[this] (std::pair<MessageHeader, Block> msg)
{
// Write to socket in main thread.
sendMessage(msg.first, msg.second);
});
}
else
{
MessageHeader header;
serializeMessage(header, payload);
sendMessage(header, payload);
}
}

/**
* Checks the incoming bytes and sees if any messages can be formed.
*/
Expand All @@ -312,7 +346,6 @@ DENG2_PIMPL_NOREF(Socket)
// A message must be at least two bytes long (header + payload).
return;
}

try
{
Reader reader(receivedBytes);
Expand Down

0 comments on commit cc54d02

Please sign in to comment.