Skip to content
Browse files

Implement light acknowledgement.

Implement the acknowledgement acknowledgement in response to an
acknowledgement. This works for a single packet. The `client.js` program
correctly connects to the C++ implementation of the integer server,
sends and integer, and acknowledges the server's acknowledgement of
packet receipt.

Added an optional five second sleep to the integer server's response
thread between the socket read and the socket close to give the socket a
chance to preform the acknowledgement exchange before hanging up.
Without it, when the C++ implementation of the client is used, a
shutdown is sent to the client before it has a chance to send an
acknowledgement. The option is tiggered by a command line argument.

Closes #26.
  • Loading branch information...
1 parent 3f65f66 commit 54e3896822aab290aa4570d6b986322b05d7ee3c @bigeasy committed Nov 19, 2012
Showing with 139 additions and 16 deletions.
  1. +3 −6 .gitignore
  2. +7 −0 common.js
  3. +64 −5 index.js
  4. +58 −0 sdk/connect/client.cpp
  5. +7 −5 sdk/integer/server.cpp
View
9 .gitignore
@@ -1,7 +1,4 @@
node_modules/*
-sdk/client
-sdk/client.o
-sdk/connect
-sdk/connect.o
-sdk/server
-sdk/server.o
+sdk/*/*.o
+sdk/*/client
+sdk/*/server
View
7 common.js
@@ -30,6 +30,13 @@ var packets = exports.packets =
, acknowledgement: '\
b32 => sequence \
'
+, statistics: '\
+ b32 => rtt \
+ , b32 => rttVariance \
+ , b32 => availableBufferSize \
+ , b32 => receivingRate \
+ , b32 => linkCapacity \
+ '
}
var parser = exports.parser = packet.createParser();
View
69 index.js
@@ -205,9 +205,11 @@ function Socket (options) {
this._socketId = nextSocketId();
this._messageNumber = 1;
+ this._flowWindowSize = 0;
this._ccc = options.ccc || new NativeControlAlgorithm;
this._packet = new Buffer(1500);
this._pending = [];
+ this._sent = [];
}
util.inherits(Socket, Stream);
@@ -250,6 +252,21 @@ EndPoint.prototype.shakeHands = function (socket) {
, address: parseDotDecimal(socket._peer.address)
});
}
+EndPoint.prototype.control = function (socket, pattern, message, callback) {
+ var serializer = common.serializer, dgram = this.dgram, packet = new Buffer(64), peer = socket._peer;
+
+ message.control = 1;
+ message.destination = peer.socketId;
+ // TODO: Implement timestamp.
+ message.timestamp = 0;
+
+ // Format a shutdown packet, simply a header packet of type shutdown.
+ serializer.reset();
+ serializer.serialize(pattern, message);
+ serializer.write(packet);
+
+ dgram.send(packet, 0, serializer.length, peer.port, peer.address, callback);
+}
EndPoint.prototype.shutdown = function (socket, send) {
// Remove the socket from the stash.
delete this.sockets[socket._socketId];
@@ -320,7 +337,10 @@ EndPoint.prototype.send = function (packetType, object, peer) {
}
EndPoint.prototype.receive = function (msg, rinfo) {
var endPoint = this, parser = common.parser, handler;
+ parser.reset();
parser.extract('header', function (header) {
+ header.rinfo = rinfo;
+ header.length = msg.length;
if (header.control) {
if (header.destination) {
// TODO: Socket not found...
@@ -339,7 +359,7 @@ EndPoint.prototype.receive = function (msg, rinfo) {
default:
var name = CONTROL_TYPES[header.type];
console.log(header);
- parser.extract(name, endPoint[name].bind(endPoint, socket, header))
+ parser.extract(name, endPoint[name].bind(endPoint, parser, socket, header))
}
// Hmm... Do you explicitly enable rendezvous?
} else if (header.type == 0 && endPoint.server) {
@@ -350,7 +370,7 @@ EndPoint.prototype.receive = function (msg, rinfo) {
});
parser.parse(msg);
}
-EndPoint.prototype.handshake = function (socket, header, handshake) {
+EndPoint.prototype.handshake = function (parser, socket, header, handshake) {
switch (socket._status) {
case 'syn':
// Only respond to an initial handshake.
@@ -388,9 +408,46 @@ EndPoint.prototype.handshake = function (socket, header, handshake) {
break;
}
}
-EndPoint.prototype.acknowledgement = function (socket, header, ack, length) {
- say(ack);
+
+// Binary search, implemented, as always, by taking a [peek at
+// Sedgewick](http://algs4.cs.princeton.edu/11model/BinarySearch.java.html).
+function binarySearch (comparator, array, key) {
+ var low = 0, high = array.length - 1, partition, compare;
+ while (low <= high) {
+ partition = Math.floor(low + (high - low) / 2);
+ compare = comparator(key, array[partition]);
+ if (compare < 0) high = partition - 1;
+ else if (compare > 0) low = partition + 1;
+ else return partition;
+ }
+ return low;
+}
+
+// Compare two objects by their sequence property.
+function bySequence (left, right) { return left.sequence - right.sequence }
+
+EndPoint.prototype.acknowledgement = function (parser, socket, header, ack) {
+ // All parsing in one fell swoop so we don't do something that causes a next
+ // tick which might cause the parser to be reused.
+ if (header.length == 40) {
+ parser.extract('statistics', this.fullAcknowledgement.bind(this, socket, header, ack));
+ } else {
+ this.lightAcknowledgement(socket, header, ack);
+ }
};
+
+// Remove the sent packets that have been received.
+EndPoint.prototype.fullAcknowledgement = function (socket, header, ack, stats) {
+ this.lightAcknowledgement(socket, header, ack);
+ say(socket._flowWindowSize, socket._sent.length, header, ack, stats);
+}
+
+EndPoint.prototype.lightAcknowledgement = function (socket, header, ack) {
+ var endPoint = this, sent = socket._sent, index = binarySearch(bySequence, sent, ack);
+ socket._flowWindowSize -= sent.splice(0, index).length;
+ endPoint.control(socket, 'header', { type: 0x6, additional: header.additional });
+}
+
EndPoint.prototype.connect = function (rinfo, header, handshake) {
var endPoint = this, server = endPoint.server, timestamp = Math.floor(Date.now() / 6e4);
@@ -455,11 +512,13 @@ EndPoint.prototype.transmit = function (socket) {
if (message) {
serializer.reset();
- say(message);
serializer.serialize('header', extend({ control: 0, timestamp: 0 }, message));
serializer.write(message.buffer);
dgram.send(message.buffer, 0, message.buffer.length, peer.port, peer.address);
+
+ socket._flowWindowSize++;
+ socket._sent.push(message);
}
// TODO: Something like this, but after actually calculating the time of the
View
58 sdk/connect/client.cpp
@@ -0,0 +1,58 @@
+#ifndef WIN32
+ #include <arpa/inet.h>
+ #include <netdb.h>
+#else
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+#endif
+#include <fstream>
+#include <iostream>
+#include <cstdlib>
+#include <cstring>
+#include <udt.h>
+
+using namespace std;
+
+int main(int argc, char* argv[])
+{
+ if ((argc != 3) || (0 == atoi(argv[2])))
+ {
+ cout << "usage: client server_ip server_port" << endl;
+ return -1;
+ }
+
+ // use this function to initialize the UDT library
+ UDT::startup();
+
+ struct addrinfo hints, *peer;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+
+
+ UDTSOCKET fhandle = UDT::socket(hints.ai_family, hints.ai_socktype, hints.ai_protocol);
+
+ if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer))
+ {
+ cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
+ return -1;
+ }
+
+ // connect to the server, implict bind
+ if (UDT::ERROR == UDT::connect(fhandle, peer->ai_addr, peer->ai_addrlen))
+ {
+ cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
+ return -1;
+ }
+
+ freeaddrinfo(peer);
+
+
+ UDT::close(fhandle);
+
+ // use this function to release the UDT library
+ UDT::cleanup();
+
+ return 0;
+}
View
12 sdk/integer/server.cpp
@@ -19,14 +19,16 @@ void* server(void*);
DWORD WINAPI server(LPVOID);
#endif
+static int sleepy = 0;
+
int main(int argc, char* argv[])
{
- //usage: sendfile [server_port]
- if ((2 < argc) || ((2 == argc) && (0 == atoi(argv[1]))))
+ sleepy = argc > 1;
+/* if ((2 < argc) || ((2 == argc) && (0 == atoi(argv[1]))))
{
cout << "usage: sendfile [server_port]" << endl;
return 0;
- }
+ }*/
// use this function to initialize the UDT library
UDT::startup();
@@ -40,8 +42,6 @@ int main(int argc, char* argv[])
hints.ai_socktype = SOCK_STREAM;
string service("9000");
- if (2 == argc)
- service = argv[1];
if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res))
{
@@ -125,6 +125,8 @@ DWORD WINAPI server(LPVOID usocket)
cout << "number: " << size << endl;
+ if (sleepy) sleep(5);
+
UDT::close(fhandle);
#ifndef WIN32

0 comments on commit 54e3896

Please sign in to comment.
Something went wrong with that request. Please try again.