Skip to content

Commit

Permalink
Refactor: Moved network protocol implementation to de::Socket
Browse files Browse the repository at this point in the history
The revised network protocol that supports deflated message payloads
was moved to libdeng2's de::Socket class.

Also cleaned up the implementation of de::Socket and removed the
now-unnecessary internal Doorman class (running under QApplication).
  • Loading branch information
skyjake committed Mar 25, 2012
1 parent cc8e7cf commit 3167356
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 268 deletions.
2 changes: 2 additions & 0 deletions doomsday/engine/portable/include/protocol.h
Expand Up @@ -26,9 +26,11 @@
/// Largest message sendable using the protocol.
#define PROTOCOL_MAX_DATAGRAM_SIZE (1 << 22) // 4 MB

/*
void Protocol_Init(void);
void Protocol_Shutdown(void);
*/

/**
* Send the data buffer over a TCP connection.
Expand Down
130 changes: 13 additions & 117 deletions doomsday/engine/portable/src/protocol.c
Expand Up @@ -17,55 +17,7 @@
* 02110-1301 USA</small>
*/

/**
* @page sysNetwork Low-Level Networking
*
* On server-side connected clients can be either in "unjoined" mode or
* "joined" mode. The former is for querying information about the server's
* status, while the latter one is for clients participating in the on-going
* game.
*
* Unjoined TCP sockets are periodically polled for activity
* (N_ListenUnjoinedNodes()). Joined TCP sockets are handled in a separate
* receiver thread (N_JoinedListenerThread()).
*
* @section netProtocol Network Protocol
*
* In joined mode, the network protocol works as follows. All messages are sent
* over a TCP socket. Every message consists of a header and the message payload.
* The content of these depends on the compressed message size.
*
* @par 1&ndash;127 bytes
* Very small messages, such as the position updates that a client streams
* to the server, are encoded with Huffman codes (see huffman.h). If
* the Huffman coded payload happens to exceed 127 bytes, the message is
* switched to the medium format (see below). Message structure:
* - 1 byte: payload size
* - @em n bytes: payload contents (Huffman)
*
* @par 128&ndash;4095 bytes
* Medium-sized messages are compressed either using a fast zlib deflate level,
* or Huffman codes if it yields better compression.
* If the deflated message size exceeds 4095 bytes, the message is switched to
* the large format (see below). Message structure:
* - 1 byte: 0x80 | (payload size & 0x7f)
* - 1 byte: (payload size >> 7) | (0x40 for deflated, otherwise Huffman)
* - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()).
*
* @par >= 4096 bytes (up to 4MB)
* Large messages are compressed using the best zlib deflate level.
* Message structure:
* - 1 byte: 0x80 | (payload size & 0x7f)
* - 1 byte: 0x80 | (payload size >> 7) & 0x7f
* - 1 byte: payload size >> 14
* - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()).
*
* Messages larger than or equal to 2^22 bytes (about 4MB) must be broken into
* smaller pieces before sending.
*
* @see Protocol_Send()
* @see Protocol_Receive()
*/


#include "de_base.h"
#include "de_console.h"
Expand All @@ -77,23 +29,7 @@
#include "huffman.h"
#include "zipfile.h"

#define MAX_SIZE_SMALL 127 // bytes
#define MAX_SIZE_MEDIUM 4095 // bytes
#define MAX_SIZE_LARGE PROTOCOL_MAX_DATAGRAM_SIZE

/// Threshold for input data size: messages smaller than this are first compressed
/// with Doomsday's Huffman codes. If the result is smaller than the deflated data,
/// the Huffman coded payload is used (unless it doesn't fit in a medium-sized packet).
#define MAX_HUFFMAN_INPUT_SIZE 4096 // bytes

#define DEFAULT_TRANSMISSION_SIZE 4096

#define TRMF_CONTINUE 0x80
#define TRMF_DEFLATED 0x40
#define TRMF_SIZE_MASK 0x7f
#define TRMF_SIZE_MASK_MEDIUM 0x3f
#define TRMF_SIZE_SHIFT 7

#if 0
static byte* transmissionBuffer;
static size_t transmissionBufferSize;

Expand All @@ -110,6 +46,7 @@ void Protocol_Shutdown(void)
transmissionBuffer = NULL;
transmissionBufferSize = 0;
}
#endif

#if 0
static boolean getBytesBlocking(TCPsocket sock, byte* buffer, size_t size)
Expand Down Expand Up @@ -155,10 +92,6 @@ boolean Protocol_Receive(nodeid_t from)
msg->size = size;
msg->handle = packet; // needs to be freed

/*#ifdef _DEBUG
VERBOSE2(Con_Message("Protocol_Receive: Posting message, from=%i, size=%i\n", from, size));
#endif*/

// The message queue will handle the message from now on.
N_PostMessage(msg);
}
Expand Down Expand Up @@ -264,59 +197,22 @@ static void checkTransmissionBufferSize(size_t atLeastBytes)
}
}

/**
* Copies the message payload @a data to the transmission buffer.
*
* @param data Data payload being send.
* @param size Size of the data payload.
* @param needInflate @c true, if the payload is deflated.
*/
static size_t prepareTransmission(void* data, size_t size, boolean needInflate)
{
Writer* msg = 0;
size_t msgSize = 0;

// The header is at most 3 bytes.
checkTransmissionBufferSize(size + 3);

// Compose the header and payload into the transmission buffer.
msg = Writer_NewWithBuffer(transmissionBuffer, transmissionBufferSize);

if(size <= MAX_SIZE_SMALL && !needInflate)
{
Writer_WriteByte(msg, size);
}
else if(size <= MAX_SIZE_MEDIUM)
{
Writer_WriteByte(msg, TRMF_CONTINUE | (size & TRMF_SIZE_MASK));
Writer_WriteByte(msg, (needInflate? TRMF_DEFLATED : 0) | (size >> TRMF_SIZE_SHIFT));
}
else if(size <= MAX_SIZE_LARGE)
{
assert(needInflate);
Writer_WriteByte(msg, TRMF_CONTINUE | (size & TRMF_SIZE_MASK));
Writer_WriteByte(msg, TRMF_CONTINUE | ((size >> TRMF_SIZE_SHIFT) & TRMF_SIZE_MASK));
Writer_WriteByte(msg, size >> (2*TRMF_SIZE_SHIFT));
}
else
{
// Not supported.
assert(false);
}

// The payload.
Writer_Write(msg, data, size);

msgSize = Writer_Size(msg);
Writer_Delete(msg);
return msgSize;
}

void Protocol_Send(void *data, size_t size, nodeid_t destination)
{
if(size == 0 || !N_GetNodeSocket(destination) || !N_HasNodeJoined(destination))
return;

if(size > DDMAXINT)
{
Con_Error("Protocol_Send: Trying to send an oversized data buffer.\n"
" Attempted size is %u bytes.\n", (unsigned long) size);
}

#ifdef _DEBUG
Monitor_Add(data, size);
#endif

LegacyNetwork_Send(N_GetNodeSocket(destination), data, size);

#if 0
Expand Down
4 changes: 2 additions & 2 deletions doomsday/libdeng2/include/de/legacy/legacynetwork.h
Expand Up @@ -30,8 +30,8 @@ class Block;
class IByteArray;

/**
* Network communications for the legacy engine implementation.
* Implements simple socket networking for streaming blocks of data.
* Network communications for the legacy engine implementation. Implements
* simple socket networking for streaming blocks of data.
*
* There is a C wrapper for LegacyNetwork, @see c_wrapper.h
*/
Expand Down
3 changes: 1 addition & 2 deletions doomsday/libdeng2/include/de/net/listensocket.h
Expand Up @@ -23,7 +23,6 @@
#include "../libdeng2.h"

#include <QObject>
#include <QTcpServer>
#include <QList>
#include <QThread>
#include <QDebug>
Expand Down Expand Up @@ -68,7 +67,7 @@ class DENG2_PUBLIC ListenSocket : public QObject
void incomingConnection();

protected slots:
void acceptNewConnection(int handle);
void acceptNewConnection();

private:
struct Instance;
Expand Down
30 changes: 13 additions & 17 deletions doomsday/libdeng2/include/de/net/socket.h
Expand Up @@ -60,8 +60,8 @@ class DENG2_PUBLIC Socket : public QObject, public Transmitter
/// The TCP/IP connection was disconnected. @ingroup errors
DENG2_SUB_ERROR(BrokenError, DisconnectedError)

/// Incoming packet has an unknown block protocol. @ingroup errors
DENG2_SUB_ERROR(BrokenError, UnknownProtocolError)
/// Encountered a problem related to the messaging protocol. @ingroup errors
DENG2_SUB_ERROR(BrokenError, ProtocolError)

/// There is no peer connected. @ingroup errors
DENG2_SUB_ERROR(BrokenError, PeerError)
Expand All @@ -74,7 +74,17 @@ class DENG2_PUBLIC Socket : public QObject, public Transmitter
Q_DECLARE_FLAGS(HeaderFlags, HeaderFlag)

public:
Socket(const Address& address);
/**
* Opens a socket to @a address and waits (blocks) until the connection has
* been formed. The socket is ready to be used after the constructor
* returns. If the connection can be formed within the specified @a timeOut
* threshold, an exception is thrown.
*
* @param address Address to connect to.
* @param timeOut Maximum time to wait for connection.
*/
Socket(const Address& address, const Time::Delta& timeOut = 5);

virtual ~Socket();

/**
Expand Down Expand Up @@ -171,16 +181,6 @@ private slots:
void bytesWereWritten(qint64 bytes);

protected:
/// Values for the transmission block header.
struct Header {
duint version;
bool huffman;
duint channel;
duint size;

Header();
};

/// Create a Socket object for a previously opened socket.
Socket(QTcpSocket* existingSocket);

Expand All @@ -195,10 +195,6 @@ private slots:

void send(const IByteArray& packet, duint channel);

duint32 packHeader(const Header& header);

void unpackHeader(duint32 headerBytes, Header& header);

private:
struct Instance;
Instance* d;
Expand Down
3 changes: 1 addition & 2 deletions doomsday/libdeng2/network.pri
Expand Up @@ -13,8 +13,7 @@ HEADERS += \
include/de/net/transmitter.h \

# Private headers.
HEADERS += \
src/net/doorman.h
HEADERS +=

SOURCES += \
src/net/address.cpp \
Expand Down
27 changes: 17 additions & 10 deletions doomsday/libdeng2/src/net/listensocket.cpp
Expand Up @@ -20,13 +20,14 @@
#include "de/ListenSocket"
#include "de/Address"
#include "de/Socket"
#include "doorman.h"

#include <QCoreApplication>
#include <QTcpServer>
#include <QThread>

using namespace de;

/*
void internal::Doorman::run()
{
_socket = new internal::TcpServer;
Expand All @@ -42,21 +43,24 @@ void internal::Doorman::run()
delete _socket;
}
*/

struct ListenSocket::Instance
{
#if 0
/// Pointer to the internal socket data.
internal::Doorman* doorman;
#endif

QTcpServer* socket;
duint16 port;

/// Incoming connections.
QList<int> incoming;

Instance() : doorman(0), port(0) {}
Instance() : socket(0), port(0) {}
~Instance() {
doorman->stopAndWait();
delete doorman;
delete socket;
}
};

Expand All @@ -65,33 +69,33 @@ ListenSocket::ListenSocket(duint16 port)
LOG_AS("ListenSocket");

d = new Instance;

/*
d->doorman = new internal::Doorman(port);
connect(d->doorman, SIGNAL(incomingSocketDesc(int)), this, SLOT(acceptNewConnection(int)));
d->doorman->start();
*/

/* if(!d->socket->listen(QHostAddress::Any, d->port))
if(!d->socket->listen(QHostAddress::Any, d->port))
{
/// @throw OpenError Opening the socket failed.
throw OpenError("ListenSocket", "Port " + QString::number(d->port) + ": " +
d->socket->errorString());
}
*/

//connect(d->socket, SIGNAL(newConnection()), this, SLOT(acceptNewConnection()));
connect(d->socket, SIGNAL(newConnection()), this, SLOT(acceptNewConnection()));
}

ListenSocket::~ListenSocket()
{
delete d;
}

void ListenSocket::acceptNewConnection(int handle)
void ListenSocket::acceptNewConnection()
{
LOG_AS("ListenSocket::acceptNewConnection");

d->incoming.append(handle);
d->incoming.append(d->socket->nextPendingConnection());

emit incomingConnection();
}
Expand All @@ -102,9 +106,12 @@ Socket* ListenSocket::accept()
{
return 0;
}
/*
QTcpSocket* s = new QTcpSocket;
s->setSocketDescriptor(d->incoming.takeFirst());
*/

QTcpSocket* s = d->incoming.takeFirst();
LOG_MSG("Accepted new connection from %s.") << s->peerAddress().toString();

// We can use this constructor because we are Socket's friend.
Expand Down

0 comments on commit 3167356

Please sign in to comment.