diff --git a/doomsday/engine/portable/include/protocol.h b/doomsday/engine/portable/include/protocol.h index 341d9aa856..ae651633b7 100644 --- a/doomsday/engine/portable/include/protocol.h +++ b/doomsday/engine/portable/include/protocol.h @@ -26,9 +26,11 @@ /// Largest message sendable using the protocol. #define PROTOCOL_MAX_DATAGRAM_SIZE (1 << 22) // 4 MB +/* void Protocol_Init(void); void Protocol_Shutdown(void); +*/ /** * Send the data buffer over a TCP connection. diff --git a/doomsday/engine/portable/src/protocol.c b/doomsday/engine/portable/src/protocol.c index 9ecd7a2551..77f9732d48 100644 --- a/doomsday/engine/portable/src/protocol.c +++ b/doomsday/engine/portable/src/protocol.c @@ -17,55 +17,7 @@ * 02110-1301 USA */ -/** - * @page sysNetwork Low-Level Networking - * - * On server-side connected clients can be either in "unjoined" mode or - * "joined" mode. The former is for querying information about the server's - * status, while the latter one is for clients participating in the on-going - * game. - * - * Unjoined TCP sockets are periodically polled for activity - * (N_ListenUnjoinedNodes()). Joined TCP sockets are handled in a separate - * receiver thread (N_JoinedListenerThread()). - * - * @section netProtocol Network Protocol - * - * In joined mode, the network protocol works as follows. All messages are sent - * over a TCP socket. Every message consists of a header and the message payload. - * The content of these depends on the compressed message size. - * - * @par 1–127 bytes - * Very small messages, such as the position updates that a client streams - * to the server, are encoded with Huffman codes (see huffman.h). If - * the Huffman coded payload happens to exceed 127 bytes, the message is - * switched to the medium format (see below). Message structure: - * - 1 byte: payload size - * - @em n bytes: payload contents (Huffman) - * - * @par 128–4095 bytes - * Medium-sized messages are compressed either using a fast zlib deflate level, - * or Huffman codes if it yields better compression. - * If the deflated message size exceeds 4095 bytes, the message is switched to - * the large format (see below). Message structure: - * - 1 byte: 0x80 | (payload size & 0x7f) - * - 1 byte: (payload size >> 7) | (0x40 for deflated, otherwise Huffman) - * - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()). - * - * @par >= 4096 bytes (up to 4MB) - * Large messages are compressed using the best zlib deflate level. - * Message structure: - * - 1 byte: 0x80 | (payload size & 0x7f) - * - 1 byte: 0x80 | (payload size >> 7) & 0x7f - * - 1 byte: payload size >> 14 - * - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()). - * - * Messages larger than or equal to 2^22 bytes (about 4MB) must be broken into - * smaller pieces before sending. - * - * @see Protocol_Send() - * @see Protocol_Receive() - */ + #include "de_base.h" #include "de_console.h" @@ -77,23 +29,7 @@ #include "huffman.h" #include "zipfile.h" -#define MAX_SIZE_SMALL 127 // bytes -#define MAX_SIZE_MEDIUM 4095 // bytes -#define MAX_SIZE_LARGE PROTOCOL_MAX_DATAGRAM_SIZE - -/// Threshold for input data size: messages smaller than this are first compressed -/// with Doomsday's Huffman codes. If the result is smaller than the deflated data, -/// the Huffman coded payload is used (unless it doesn't fit in a medium-sized packet). -#define MAX_HUFFMAN_INPUT_SIZE 4096 // bytes - -#define DEFAULT_TRANSMISSION_SIZE 4096 - -#define TRMF_CONTINUE 0x80 -#define TRMF_DEFLATED 0x40 -#define TRMF_SIZE_MASK 0x7f -#define TRMF_SIZE_MASK_MEDIUM 0x3f -#define TRMF_SIZE_SHIFT 7 - +#if 0 static byte* transmissionBuffer; static size_t transmissionBufferSize; @@ -110,6 +46,7 @@ void Protocol_Shutdown(void) transmissionBuffer = NULL; transmissionBufferSize = 0; } +#endif #if 0 static boolean getBytesBlocking(TCPsocket sock, byte* buffer, size_t size) @@ -155,10 +92,6 @@ boolean Protocol_Receive(nodeid_t from) msg->size = size; msg->handle = packet; // needs to be freed -/*#ifdef _DEBUG - VERBOSE2(Con_Message("Protocol_Receive: Posting message, from=%i, size=%i\n", from, size)); -#endif*/ - // The message queue will handle the message from now on. N_PostMessage(msg); } @@ -264,59 +197,22 @@ static void checkTransmissionBufferSize(size_t atLeastBytes) } } -/** - * Copies the message payload @a data to the transmission buffer. - * - * @param data Data payload being send. - * @param size Size of the data payload. - * @param needInflate @c true, if the payload is deflated. - */ -static size_t prepareTransmission(void* data, size_t size, boolean needInflate) -{ - Writer* msg = 0; - size_t msgSize = 0; - - // The header is at most 3 bytes. - checkTransmissionBufferSize(size + 3); - - // Compose the header and payload into the transmission buffer. - msg = Writer_NewWithBuffer(transmissionBuffer, transmissionBufferSize); - - if(size <= MAX_SIZE_SMALL && !needInflate) - { - Writer_WriteByte(msg, size); - } - else if(size <= MAX_SIZE_MEDIUM) - { - Writer_WriteByte(msg, TRMF_CONTINUE | (size & TRMF_SIZE_MASK)); - Writer_WriteByte(msg, (needInflate? TRMF_DEFLATED : 0) | (size >> TRMF_SIZE_SHIFT)); - } - else if(size <= MAX_SIZE_LARGE) - { - assert(needInflate); - Writer_WriteByte(msg, TRMF_CONTINUE | (size & TRMF_SIZE_MASK)); - Writer_WriteByte(msg, TRMF_CONTINUE | ((size >> TRMF_SIZE_SHIFT) & TRMF_SIZE_MASK)); - Writer_WriteByte(msg, size >> (2*TRMF_SIZE_SHIFT)); - } - else - { - // Not supported. - assert(false); - } - - // The payload. - Writer_Write(msg, data, size); - - msgSize = Writer_Size(msg); - Writer_Delete(msg); - return msgSize; -} void Protocol_Send(void *data, size_t size, nodeid_t destination) { if(size == 0 || !N_GetNodeSocket(destination) || !N_HasNodeJoined(destination)) return; + if(size > DDMAXINT) + { + Con_Error("Protocol_Send: Trying to send an oversized data buffer.\n" + " Attempted size is %u bytes.\n", (unsigned long) size); + } + +#ifdef _DEBUG + Monitor_Add(data, size); +#endif + LegacyNetwork_Send(N_GetNodeSocket(destination), data, size); #if 0 diff --git a/doomsday/libdeng2/include/de/legacy/legacynetwork.h b/doomsday/libdeng2/include/de/legacy/legacynetwork.h index 6567862945..5f7815ee08 100644 --- a/doomsday/libdeng2/include/de/legacy/legacynetwork.h +++ b/doomsday/libdeng2/include/de/legacy/legacynetwork.h @@ -30,8 +30,8 @@ class Block; class IByteArray; /** - * Network communications for the legacy engine implementation. - * Implements simple socket networking for streaming blocks of data. + * Network communications for the legacy engine implementation. Implements + * simple socket networking for streaming blocks of data. * * There is a C wrapper for LegacyNetwork, @see c_wrapper.h */ diff --git a/doomsday/libdeng2/include/de/net/listensocket.h b/doomsday/libdeng2/include/de/net/listensocket.h index 0b3b03eaa4..e8d9749d4f 100644 --- a/doomsday/libdeng2/include/de/net/listensocket.h +++ b/doomsday/libdeng2/include/de/net/listensocket.h @@ -23,7 +23,6 @@ #include "../libdeng2.h" #include -#include #include #include #include @@ -68,7 +67,7 @@ class DENG2_PUBLIC ListenSocket : public QObject void incomingConnection(); protected slots: - void acceptNewConnection(int handle); + void acceptNewConnection(); private: struct Instance; diff --git a/doomsday/libdeng2/include/de/net/socket.h b/doomsday/libdeng2/include/de/net/socket.h index 06d98f587b..6186f380c2 100644 --- a/doomsday/libdeng2/include/de/net/socket.h +++ b/doomsday/libdeng2/include/de/net/socket.h @@ -60,8 +60,8 @@ class DENG2_PUBLIC Socket : public QObject, public Transmitter /// The TCP/IP connection was disconnected. @ingroup errors DENG2_SUB_ERROR(BrokenError, DisconnectedError) - /// Incoming packet has an unknown block protocol. @ingroup errors - DENG2_SUB_ERROR(BrokenError, UnknownProtocolError) + /// Encountered a problem related to the messaging protocol. @ingroup errors + DENG2_SUB_ERROR(BrokenError, ProtocolError) /// There is no peer connected. @ingroup errors DENG2_SUB_ERROR(BrokenError, PeerError) @@ -74,7 +74,17 @@ class DENG2_PUBLIC Socket : public QObject, public Transmitter Q_DECLARE_FLAGS(HeaderFlags, HeaderFlag) public: - Socket(const Address& address); + /** + * Opens a socket to @a address and waits (blocks) until the connection has + * been formed. The socket is ready to be used after the constructor + * returns. If the connection can be formed within the specified @a timeOut + * threshold, an exception is thrown. + * + * @param address Address to connect to. + * @param timeOut Maximum time to wait for connection. + */ + Socket(const Address& address, const Time::Delta& timeOut = 5); + virtual ~Socket(); /** @@ -171,16 +181,6 @@ private slots: void bytesWereWritten(qint64 bytes); protected: - /// Values for the transmission block header. - struct Header { - duint version; - bool huffman; - duint channel; - duint size; - - Header(); - }; - /// Create a Socket object for a previously opened socket. Socket(QTcpSocket* existingSocket); @@ -195,10 +195,6 @@ private slots: void send(const IByteArray& packet, duint channel); - duint32 packHeader(const Header& header); - - void unpackHeader(duint32 headerBytes, Header& header); - private: struct Instance; Instance* d; diff --git a/doomsday/libdeng2/network.pri b/doomsday/libdeng2/network.pri index e699d20045..b987baba73 100644 --- a/doomsday/libdeng2/network.pri +++ b/doomsday/libdeng2/network.pri @@ -13,8 +13,7 @@ HEADERS += \ include/de/net/transmitter.h \ # Private headers. -HEADERS += \ - src/net/doorman.h +HEADERS += SOURCES += \ src/net/address.cpp \ diff --git a/doomsday/libdeng2/src/net/listensocket.cpp b/doomsday/libdeng2/src/net/listensocket.cpp index 0cc6f4b02b..03ebd0e42d 100644 --- a/doomsday/libdeng2/src/net/listensocket.cpp +++ b/doomsday/libdeng2/src/net/listensocket.cpp @@ -20,13 +20,14 @@ #include "de/ListenSocket" #include "de/Address" #include "de/Socket" -#include "doorman.h" #include +#include #include using namespace de; +/* void internal::Doorman::run() { _socket = new internal::TcpServer; @@ -42,21 +43,24 @@ void internal::Doorman::run() delete _socket; } +*/ struct ListenSocket::Instance { +#if 0 /// Pointer to the internal socket data. internal::Doorman* doorman; +#endif + QTcpServer* socket; duint16 port; /// Incoming connections. QList incoming; - Instance() : doorman(0), port(0) {} + Instance() : socket(0), port(0) {} ~Instance() { - doorman->stopAndWait(); - delete doorman; + delete socket; } }; @@ -65,21 +69,21 @@ ListenSocket::ListenSocket(duint16 port) LOG_AS("ListenSocket"); d = new Instance; - +/* d->doorman = new internal::Doorman(port); connect(d->doorman, SIGNAL(incomingSocketDesc(int)), this, SLOT(acceptNewConnection(int))); d->doorman->start(); +*/ -/* if(!d->socket->listen(QHostAddress::Any, d->port)) + if(!d->socket->listen(QHostAddress::Any, d->port)) { /// @throw OpenError Opening the socket failed. throw OpenError("ListenSocket", "Port " + QString::number(d->port) + ": " + d->socket->errorString()); } -*/ - //connect(d->socket, SIGNAL(newConnection()), this, SLOT(acceptNewConnection())); + connect(d->socket, SIGNAL(newConnection()), this, SLOT(acceptNewConnection())); } ListenSocket::~ListenSocket() @@ -87,11 +91,11 @@ ListenSocket::~ListenSocket() delete d; } -void ListenSocket::acceptNewConnection(int handle) +void ListenSocket::acceptNewConnection() { LOG_AS("ListenSocket::acceptNewConnection"); - d->incoming.append(handle); + d->incoming.append(d->socket->nextPendingConnection()); emit incomingConnection(); } @@ -102,9 +106,12 @@ Socket* ListenSocket::accept() { return 0; } + /* QTcpSocket* s = new QTcpSocket; s->setSocketDescriptor(d->incoming.takeFirst()); + */ + QTcpSocket* s = d->incoming.takeFirst(); LOG_MSG("Accepted new connection from %s.") << s->peerAddress().toString(); // We can use this constructor because we are Socket's friend. diff --git a/doomsday/libdeng2/src/net/socket.cpp b/doomsday/libdeng2/src/net/socket.cpp index 95677c6a33..c5dfffad05 100644 --- a/doomsday/libdeng2/src/net/socket.cpp +++ b/doomsday/libdeng2/src/net/socket.cpp @@ -1,7 +1,7 @@ /* * The Doomsday Engine Project -- libdeng2 * - * Copyright (c) 2004-2011 Jaakko Keränen + * Copyright (c) 2004-2012 Jaakko Keränen * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -17,22 +17,169 @@ * along with this program; if not, see . */ +/** + * @page sysNetwork Low-Level Networking + * + * On server-side connected clients can be either in "unjoined" mode or + * "joined" mode. The former is for querying information about the server's + * status, while the latter one is for clients participating in the on-going + * game. + * + * In unjoined mode, messages are not compressed. + * @todo Document uncompressed message format. + * + * @section netProtocol Network Protocol + * + * In joined mode, the network protocol works as follows. All messages are sent + * over a TCP socket. Every message consists of a header and the message payload. + * The content of these depends on the compressed message size. + * + * @par 1–127 bytes + * Very small messages, such as the position updates that a client streams + * to the server, are encoded with Huffman codes (see huffman.h). If + * the Huffman coded payload happens to exceed 127 bytes, the message is + * switched to the medium format (see below). Message structure: + * - 1 byte: payload size + * - @em n bytes: payload contents (Huffman) + * + * @par 128–4095 bytes + * Medium-sized messages are compressed either using a fast zlib deflate level, + * or Huffman codes if it yields better compression. + * If the deflated message size exceeds 4095 bytes, the message is switched to + * the large format (see below). Message structure: + * - 1 byte: 0x80 | (payload size & 0x7f) + * - 1 byte: (payload size >> 7) | (0x40 for deflated, otherwise Huffman) + * - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()). + * + * @par >= 4096 bytes (up to 4MB) + * Large messages are compressed using the best zlib deflate level. + * Message structure: + * - 1 byte: 0x80 | (payload size & 0x7f) + * - 1 byte: 0x80 | (payload size >> 7) & 0x7f + * - 1 byte: payload size >> 14 + * - @em n bytes: payload contents (as produced by ZipFile_CompressAtLevel()). + * + * Messages larger than or equal to 2^22 bytes (about 4MB) must be broken into + * smaller pieces before sending. + * + * @see Protocol_Send() + * @see Protocol_Receive() + */ + #include "de/Socket" #include "de/Message" #include "de/Writer" #include "de/Reader" +#include "de/ByteRefArray" +#include "de/data/huffman.h" using namespace de; /// Version of the block transfer protocol. static const duint PROTOCOL_VERSION = 0; -/// Size of the message header in bytes. -static const duint HEADER_SIZE = 4; - /// Maximum number of channels. static const duint MAX_CHANNELS = 2; +#define MAX_SIZE_SMALL 127 // bytes +#define MAX_SIZE_MEDIUM 4095 // bytes +#define MAX_SIZE_LARGE PROTOCOL_MAX_DATAGRAM_SIZE + +/// Threshold for input data size: messages smaller than this are first compressed +/// with Doomsday's Huffman codes. If the result is smaller than the deflated data, +/// the Huffman coded payload is used (unless it doesn't fit in a medium-sized packet). +#define MAX_HUFFMAN_INPUT_SIZE 4096 // bytes + +#define DEFAULT_TRANSMISSION_SIZE 4096 + +#define TRMF_CONTINUE 0x80 +#define TRMF_DEFLATED 0x40 +#define TRMF_SIZE_MASK 0x7f +#define TRMF_SIZE_MASK_MEDIUM 0x3f +#define TRMF_SIZE_SHIFT 7 + +/** + * Network message header. + */ +struct Header : public ISerializable +{ + duint size; + bool isHuffmanCoded; + bool isDeflated; + duint channel; /// @todo include in the written header + + Header() : size(0), isHuffmanCoded(false), isDeflated(false), channel(0) + {} + + void operator >> (Writer& writer) const + { + if(size <= MAX_SIZE_SMALL && !isDeflated) + { + writer << dbyte(size); + } + else if(size <= MAX_SIZE_MEDIUM) + { + writer << dbyte(TRMF_CONTINUE | (size & TRMF_SIZE_MASK)); + writer << dbyte((isDeflated? TRMF_DEFLATED : 0) | (size >> TRMF_SIZE_SHIFT)); + } + else if(size <= MAX_SIZE_LARGE) + { + DENG2_ASSERT(isDeflated); + + writer << dbyte(TRMF_CONTINUE | (size & TRMF_SIZE_MASK)); + writer << dbyte(TRMF_CONTINUE | ((size >> TRMF_SIZE_SHIFT) & TRMF_SIZE_MASK)); + writer << dbyte(size >> (2*TRMF_SIZE_SHIFT)); + } + else + { + // Not supported. + DENG2_ASSERT(false); + } + } + + /** + * Throws an exception if the header is malformed/incomplete. + */ + void operator << (Reader& reader) + { + // Start reading the header. + dbyte b; + reader >> b; + + size = b & TRMF_SIZE_MASK; + + isDeflated = false; + isHuffmanCoded = true; + + if(b & TRMF_CONTINUE) // More follows... + { + reader >> b; + + if(b & TRMF_CONTINUE) // Yet more to come... + { + // Large header. + isDeflated = true; + isHuffmanCoded = false; + size |= ((b & TRMF_SIZE_MASK) << TRMF_SIZE_SHIFT); + + reader >> b; + + size |= (b << (2*TRMF_SIZE_SHIFT)); + } + else + { + // Medium header. + if(b & TRMF_DEFLATED) + { + isDeflated = true; + isHuffmanCoded = false; + } + size |= ((b & TRMF_SIZE_MASK_MEDIUM) << TRMF_SIZE_SHIFT); + } + } + } +}; + struct Socket::Instance { enum ReceptionState { @@ -40,15 +187,13 @@ struct Socket::Instance ReceivingPayload }; ReceptionState receptionState; - - /// Apply Huffman encoding to payload data. - bool useHuffman; + Block receivedBytes; + Header incomingHeader; /// Number of the active channel. + /// @todo Channel is not used at the moment. duint activeChannel; - Header incomingHeader; - /// Pointer to the internal socket data. QTcpSocket* socket; @@ -58,22 +203,176 @@ struct Socket::Instance /// Number of bytes waiting to be written to the socket. dint64 bytesToBeWritten; + /// Number of bytes written to the socket so far. + dint64 totalBytesWritten; + Instance() : receptionState(ReceivingHeader), - useHuffman(false), activeChannel(0), socket(0), - bytesToBeWritten(0) {} + bytesToBeWritten(0), + totalBytesWritten(0) {} - ~Instance() { + ~Instance() + { // Delete received messages left in the buffer. foreach(Message* msg, receivedMessages) delete msg; } -}; -Socket::Header::Header() : version(PROTOCOL_VERSION), huffman(false), channel(0), size(0) {} + void serializeAndSendMessage(const IByteArray& packet) + { + Header header; + Block payload(packet); + + dsize huffSize = 0; + dbyte* huffData = 0; + + // Let's find the appropriate compression method of the payload. First see + // if the encoded contents are under 128 bytes as Huffman codes. + if(payload.size() <= MAX_HUFFMAN_INPUT_SIZE) // Potentially short enough. + { + huffData = Huffman_Encode(payload.data(), payload.size(), &huffSize); + if(huffSize <= MAX_SIZE_SMALL) + { + // We'll use this. + header.isHuffmanCoded = true; + header.size = huffSize; + payload.copyFrom(ByteRefArray(huffData, huffSize)); + } + // Even if that didn't seem suitable, we'll keep it to compare against + // the deflated payload. + } + + /// @todo Messages broadcasted to multiple recipients are separately + /// compressed for each TCP send -- should do only one compression per + /// message. + + if(!header.size) // Try deflate. + { + const int level = (size < 2*MAX_SIZE_MEDIUM? 6 /*default*/ : 9 /*best*/); + QByteArray deflated = qCompressAtLevel(payload, level); -Socket::Socket(const Address& address) + if(!deflated.size()) + { + free(huffData); + throw ProtocolError("Socket::send:", "Failed to deflate message payload"); + } + if(deflated.size() > MAX_SIZE_LARGE) + { + free(huffData); + throw ProtocolError("Socket::send", + QString("Compressed payload is too large (%1 bytes)").arg(deflated.size())); + } + + // Choose the smallest compression. + if(huffData && huffSize <= deflated.size() && huffSize <= MAX_SIZE_MEDIUM) + { + // Huffman yielded smaller payload. + header.isHuffmanCoded = true; + header.size = huffSize; + payload.copyFrom(ByteRefArray(huffData, huffSize)); + } + else + { + // Use the deflated payload. + header.isDeflated = true; + header.size = deflated.size(); + payload = deflated; + } + } + + free(huffData); + huffData = 0; + + // Write the message header. + Block dest; + Writer(dest) << header; + socket->write(dest); + + // Update totals (for statistics). + dsize total = dest.size() + payload.size(); + bytesToBeWritten += total; + totalBytesWritten += total; + + socket->write(payload); + } + + /** + * Checks the incoming bytes and sees if any messages can be formed. + */ + void deserializeMessages() + { + forever + { + if(receptionState == ReceivingHeader) + { + if(receivedBytes.size() < 2) + { + // A message must be at least two bytes long (header + payload). + return; + } + + try + { + Reader reader(receivedBytes); + reader >> incomingHeader; + receptionState = ReceivingPayload; + } + catch(const de::Error&) + { + // It seems we don't have a full header yet. + return; + } + } + + if(receptionState == ReceivingPayload) + { + if(receivedBytes.size() >= incomingHeader.size) + { + // Extract the payload from the incoming buffer. + Block payload = receivedBytes.left(incomingHeader.size); + receivedBytes.remove(0, incomingHeader.size); + + // We have the full payload, but it still may need to uncompressed. + if(incomingHeader.isHuffmanCoded) + { + dsize huffSize = 0; + dbyte* decoded = Huffman_Decode(payload.data(), payload.size(), &huffSize); + if(!decoded) + { + throw ProtocolError("Socket::Instance::deserializeMessages", + "Huffman decoding failed"); + } + payload.copyFrom(ByteRefArray(decoded, huffSize)); + free(decoded); + } + else if(incomingHeader.isDeflated) + { + payload = qUncompress(payload); + if(!payload.size()) + { + throw ProtocolError("Socket::Instance::deserializeMessages", "Deflate failed"); + } + } + + receivedMessages << new Message(Address(socket->peerAddress(), socket->peerPort()), + incomingHeader.channel, payload); + + // We can proceed to the next message. + receptionState = ReceivingHeader; + incomingHeader = Header(); + } + else + { + // Let's wait until more is available. + return; + } + } + } + } +}; + +Socket::Socket(const Address& address, const Time::Delta& timeOut) { LOG_AS("Socket"); @@ -83,7 +382,7 @@ Socket::Socket(const Address& address) // Now that the signals have been set... d->socket->connectToHost(address.host(), address.port()); - if(!d->socket->waitForConnected(5000)) + if(!d->socket->waitForConnected(timeOut.asMilliSeconds())) { QString msg = d->socket->errorString(); delete d->socket; @@ -96,6 +395,7 @@ Socket::Socket(const Address& address) } LOG_MSG("Connection opened to %s.") << address.asText(); + DENG2_ASSERT(d->socket->isOpen() && d->socket->isWritable() && d->socket->state() == QAbstractSocket::ConnectedState); } @@ -117,7 +417,7 @@ Socket::~Socket() void Socket::initialize() { // Options. - d->socket->setSocketOption(QTcpSocket::LowDelayOption, 1); + d->socket->setSocketOption(QTcpSocket::LowDelayOption, 1); // prefer short buffering connect(d->socket, SIGNAL(bytesWritten(qint64)), this, SLOT(bytesWereWritten(qint64))); connect(d->socket, SIGNAL(disconnected()), this, SLOT(socketDisconnected())); @@ -127,9 +427,12 @@ void Socket::initialize() void Socket::close() { + // All pending data will be written to the socket before closing. d->socket->disconnectFromHost(); + if(d->socket->state() != QAbstractSocket::UnconnectedState) { + // Make sure the socket is disconnected before the return. d->socket->waitForDisconnected(); } @@ -158,118 +461,27 @@ Socket& Socket::operator << (const IByteArray& packet) return *this; } -duint32 Socket::packHeader(const Header& header) -{ - /** - * Composese the 4-byte header to the beginning of the buffer. - * - 3 bits for flags. - * - 2 bits for the protocol version number. - * - 16+11 bits for the packet length (max: 134 MB). - */ - - duint32 flags = - (header.huffman? Huffman : 0) | - (header.channel == 1? Channel1 : 0); - - duint32 bits = ( (header.size & 0x7ffffff) | - ((header.version & 3) << 27) | - (flags << 29) ); - - return bits; -} - -void Socket::unpackHeader(duint32 headerBytes, Header& header) -{ - duint flags = (headerBytes >> 29) & 7; - - header.version = (headerBytes >> 27) & 3; - header.huffman = ((flags & Huffman) != 0); - header.channel = (flags & Channel1? 1 : 0); - header.size = headerBytes & 0x7ffffff; -} - void Socket::send(const IByteArray& packet, duint channel) { if(!d->socket) { /// @throw DisconnectedError Sending is not possible because the socket has been closed. - throw DisconnectedError("Socket::operator << ", "Socket is unavailable"); + throw DisconnectedError("Socket::send", "Socket is unavailable"); } - // Keep track of where we are with the traffic. - // A header is also included. - d->bytesToBeWritten += packet.size() + 4; - - // Write the packet header: packet length, version, possible flags. - Header header; - header.channel = channel; - header.size = packet.size(); - - //qDebug() << "Socket:" << packet.size() + 4 << "bytes were prepared for" << d->socket->peerAddress().toString(); - - Block dest; - Writer(dest) << packHeader(header); - d->socket->write(dest); - - // Write the data itself. - d->socket->write(Block(packet)); - - // Wait until the data is sent. - /// @todo Blocking is not needed with the real event loop. - d->socket->waitForBytesWritten(-1); + d->serializeAndSendMessage(packet); } void Socket::readIncomingBytes() { - forever + int available = d->socket->bytesAvailable(); + if(available > 0) { - if(d->receptionState == Instance::ReceivingHeader) - { - if(d->socket->bytesAvailable() >= HEADER_SIZE) - { - // Get the header. - duint32 bits = 0; - Reader(Block(d->socket->read(HEADER_SIZE))) >> bits; - unpackHeader(bits, d->incomingHeader); - - // Check for valid protocols. - if(d->incomingHeader.version > PROTOCOL_VERSION) - { - /// @throw UnknownProtocolError The received data block's protocol version number - /// was not recognized. This is probably because the remote end is not a libdeng2 - /// application. - throw UnknownProtocolError("Socket::readIncomingBytes", "Incoming packet has unknown protocol"); - } - - d->receptionState = Instance::ReceivingPayload; - } - else - { - // Let's wait until more is available. - break; - } - } - - if(d->receptionState == Instance::ReceivingPayload) - { - if(d->socket->bytesAvailable() >= d->incomingHeader.size) - { - d->receivedMessages.append(new Message(Address(d->socket->peerAddress(), - d->socket->peerPort()), - d->incomingHeader.channel, - Block(d->socket->read(d->incomingHeader.size)))); - - // Get the next message. - d->receptionState = Instance::ReceivingHeader; - } - else - { - // Let's wait until more is available. - break; - } - } + d->receivedBytes += d->socket->read(d->socket->bytesAvailable()); } + d->deserializeMessages(); + // Notification about available messages. if(!d->receivedMessages.isEmpty()) { @@ -335,8 +547,6 @@ void Socket::socketError(QAbstractSocket::SocketError socketError) bool Socket::hasIncoming() const { - d->socket->waitForReadyRead(1); - return !d->receivedMessages.empty(); } @@ -347,7 +557,7 @@ dsize Socket::bytesBuffered() const void Socket::bytesWereWritten(qint64 bytes) { - //qDebug() << "Socket:" << bytes << "were written to" << d->socket->peerAddress().toString(); + qDebug() << "Socket:" << bytes << "were written to" << d->socket->peerAddress().toString(); d->bytesToBeWritten -= bytes; DENG2_ASSERT(d->bytesToBeWritten >= 0);