Permalink
Browse files

Send single data packet.

Created a minimal send only client socket that can send a single packet
to a server. Created an integer server to test the client socket. The
integer server accepts a single integer, prints it to standard out, adn
closes the connection.

Gathered the C++ example programs into separate directories so I could
reuse the names `client` and `server` for each pair, and the directory
name to describe the program's operation.

Implemented a binary heap. Has strange premonition of admonishments over
the use of socket object properties in the heap implementations. It may
offend the sensibilities of certain delicate souls because it uses a
property in the object that it stores to store a bit of its
implementation data — the array index.

It creates an arbitrary property name to store the index and deletes the
property when the object is removed. It knows little about the stored
object, just that it is an object and not a primitive, and it has some
sense of what sort of property name will not collide with the properties
already defined in the object.

The name binary heap uses a comparator function, as you might expect of
a generalized data structure. The difference being that the heap uses a
stored object property to store its data, establishing the requirement
that the stored type must be of type object, but it does not know
anything more about the object. It defers to a comparator to define the
ordering.

There is a lot more to do to implement the senders algorithm, but that
would be a huge commits that would bury the incremental progress being
made. Now that I have a better understanding of how the protocol works,
I can better define the steps to implement it.

Closes #9.
  • Loading branch information...
1 parent 24087a2 commit 2739d7413de6e668654327a8c88ea15f19eb2a17 @bigeasy committed Nov 18, 2012
Showing with 387 additions and 33 deletions.
  1. +4 −2 NOTES
  2. +3 −1 client.js
  3. +4 −1 common.js
  4. +220 −20 index.js
  5. +1 −1 package.json
  6. +0 −2 proxy.js
  7. +10 −6 sdk/Makefile
  8. 0 sdk/{ → file}/client.cpp
  9. 0 sdk/{ → file}/server.cpp
  10. +10 −0 sdk/{connect.cpp → integer/client.cpp}
  11. +135 −0 sdk/integer/server.cpp
View
6 NOTES
@@ -1,5 +1,7 @@
Things that I'm running on occasion.
(cd sdk && make -e os=OSX sdk=~/src/udt4/src)
-DYLD_LIBRARY_PATH=~/src/udt4/src sdk/server
-DYLD_LIBRARY_PATH=~/src/udt4/src sdk/client 127.0.0.1 9293 index.js
+DYLD_LIBRARY_PATH=~/src/udt4/src sdk/file/server
+DYLD_LIBRARY_PATH=~/src/udt4/src sdk/file/client 127.0.0.1 9293 index.js
+
+(node --debug-brk client.js &); node-inspector
View
@@ -5,5 +5,7 @@ var socket = new udt.Socket;
socket.connect(9293);
socket.on('connect', function () {
console.log('connected');
- socket.destroy();
+ var buffer = new Buffer(4);
+ buffer.writeInt32LE(5, 0);
+ socket.write(buffer);
});
View
@@ -18,7 +18,7 @@ var packets = exports.packets =
, handshake: '\
-b32 => version \
, -b32 => socketType \
- , -b32 => sequence \
+ , b32 => sequence \
, -b32 => maxPacketSize \
, -b32 => windowSize \
, -b32 => connectionType \
@@ -27,6 +27,9 @@ var packets = exports.packets =
, b32 => address \
, x96{0} \
'
+, acknowledgement: '\
+ b32 => sequence \
+ '
}
var parser = exports.parser = packet.createParser();
View
240 index.js
@@ -3,13 +3,13 @@ var dgram = require('dgram')
, packet = require('packet')
, common = require('./common')
, crypto = require('crypto')
- , Heap = require('./heap').Heap
, dns = require('dns')
, __slice = [].slice;
-const CONTROL_TYPES = 'handshake'.split(/\s+/);
+const CONTROL_TYPES = 'handshake keep-alive acknowledgement'.split(/\s+/);
const MAX_MSG_NO = 0x1FFFFFFF;
const MAX_SEQ_NO = Math.pow(2, 31) - 1;
+const IMMEDIATE = [ 0, 0 ];
var socketId = crypto.randomBytes(4).readUInt32BE(0);
@@ -26,11 +26,103 @@ var net = require('net');
// The start of time used in our high resolution timings.
var epoch = process.hrtime();
-// Comparison operator for high-resolution time for use with heap.
-function before (a, b) {
- if (a.when[0] < b.when[0]) return true;
- if (a.when[0] > b.when[0]) return false;
- return a.when[1] < b.when[1];
+var heapIndexKeyCounter = 0;
+
+function Heap (before) {
+ this.array = [];
+ this.indexKey = '_Heap_index_key_' + (heapIndexKeyCounter++);
+ this.before = before;
+}
+
+Heap.prototype.__defineGetter__("length", function () {
+ return this.array.length;
+});
+
+Heap.prototype.peek = function () {
+ return this.array[0];
+}
+
+Heap.prototype.bubbleUp = function (index) {
+ var before = this.before, array = this.array, indexKey = this.indexKey, node = array[index];
+ while (index > 0) {
+ var parent = index - 1 >> 1;
+ if (before(node, array[parent])) {
+ array[index] = array[parent];
+ array[parent] = node;
+ array[index][indexKey] = index;
+ array[parent][indexKey] = parent;
+ index = parent;
+ } else {
+ break;
+ }
+ }
+}
+
+Heap.prototype.sinkDown = function (index) {
+ var array = this.array, indexKey = this.indexKey,
+ length = array.length, node = array[index],
+ left, right, child;
+ for (left = index * 2 + 1; left < length; l = index * 2 + 1) {
+ child = left;
+ right = left + 1;
+ if (right < length && before(array[right], array[left])) {
+ child = right;
+ }
+ if (before(array[child][indexKey], node[indexKey])) {
+ array[index] = array[child];
+ array[child] = node;
+ array[index][indexKey] = index;
+ array[child][indexKey] = child;
+ index = child;
+ } else {
+ break;
+ }
+ }
+}
+
+Heap.prototype.remove = function (node) {
+ var array = this.array, indexKey = this.indexKey, last = array.pop(), index = node[indexKey];
+ if (index != array.length) {
+ array[index] = last;
+ if (less(end, node)) {
+ this.bubbleUp(index);
+ } else {
+ this.sinkDown(index);
+ }
+ }
+ delete node[indexKey];
+}
+
+Heap.prototype.push = function (node, value) {
+ var array = this.array, indexKey = this.indexKey, index = array.length;
+ if (node[indexKey] != null) {
+ this.remove(node);
+ this.push(node);
+ } else {
+ array.push(node);
+ node[indexKey] = index;
+ this.bubbleUp(index);
+ }
+}
+
+Heap.prototype.pop = function (node) {
+ var array = this.array, indexKey = this.indexKey, result = array[0], last = array.pop();
+ if (array.length) {
+ array[0] = last;
+ last[indexKey] = 0;
+ this.sinkDown(0);
+ }
+ delete result[indexKey];
+ return result;
+}
+
+// Comparison operator generator for high-resolution time for use with heap.
+function sooner (property) {
+ return function (a, b) {
+ if (a[property][0] < b[property][0]) return true;
+ if (a[property][0] > b[property][0]) return false;
+ return a[property][1] < b[property][1];
+ }
}
function die () {
@@ -66,28 +158,42 @@ function NativeControlAlgorithm () {
util.inherits(NativeControlAlgorithm, events.EventEmitter);
var sendQueue = new (function () {
- var queue = new Heap(before), sending = false;
+ var before = sooner('_sendTime'), queue = new Heap(before), sending = false;
function enqueue (socket, packet, when) {
queue.add({ socket: socket, packet: packet, when: when });
if (!sending) poll();
}
+ function schedule (socket, timestamp) {
+ // This gave me a funny feeling, one of violating encapsulation by using a
+ // property in the socket object from the send queue, except that am I
+ // supposed to do? This is what I would have called violating encapsulation
+ // in my Java days, it triggers the creation of a dozen new types to
+ // preserve encapsulation. I've yet to completely de-program myself of this
+ // sort of rote programming. The send queue is within the same capsule as
+ // the socket. They are interdependent. They existing for each other. The
+ // socket object's underscorred proroperties are part of its implementation,
+ // in fact, the socket is not the implementation, the whole API is.
+ socket._sendTime = timestamp;
+ queue.push(socket);
+ if (!sending) poll();
+ }
function poll () {
sending = true;
- if (queue.empty()) {
+ if (! queue.length) {
sending = false;
} else {
send();
}
}
function send () {
- var now = process.hrtime();
- if (before(queue.buf[0], { when: now })) {
- var entry = queue.pop(), socket = entry.socket, endPoint = socket._endPoint;
- endPoint.send(socket, packet);
- process.nextTick(poll);
+ var socket;
+ if (before(queue.peek(), { _sendTime: process.hrtime() })) {
+ socket = queue.pop();
+ socket._endPoint.transmit(socket);
}
+ process.nextTick(poll);
}
- extend(this, { enqueue: enqueue });
+ extend(this, { schedule: schedule });
})();
function Socket (options) {
@@ -98,12 +204,10 @@ function Socket (options) {
Stream.call(this);
this._socketId = nextSocketId();
+ this._messageNumber = 1;
this._ccc = options.ccc || new NativeControlAlgorithm;
- this._serializer = common.serializer.createSerializer();
- this._parser = common.parser.createParser();
this._packet = new Buffer(1500);
- this._sendQueue = new Heap;
- this._receiveQueue = new Heap;
+ this._pending = [];
}
util.inherits(Socket, Stream);
@@ -234,11 +338,12 @@ EndPoint.prototype.receive = function (msg, rinfo) {
break;
default:
var name = CONTROL_TYPES[header.type];
+ console.log(header);
parser.extract(name, endPoint[name].bind(endPoint, socket, header))
}
// Hmm... Do you explicitly enable rendezvous?
} else if (header.type == 0 && endPoint.server) {
- parser.extract('handshake', endPoint.connect.bind(endPoint, rinfo, header))
+ parser.extract('handshake', endPoint.connect.bind(endPoint, rinfo, header, msg.length))
}
} else {
}
@@ -276,12 +381,16 @@ EndPoint.prototype.handshake = function (socket, header, handshake) {
clearInterval(socket._handshakeInterval);
socket._status = 'connected';
+ socket._handshake = handshake;
socket._peer.socketId = handshake.socketId;
socket.emit('connect');
break;
}
}
+EndPoint.prototype.acknowledgement = function (socket, header, ack, length) {
+ say(ack);
+};
EndPoint.prototype.connect = function (rinfo, header, handshake) {
var endPoint = this, server = endPoint.server, timestamp = Math.floor(Date.now() / 6e4);
@@ -323,6 +432,42 @@ EndPoint.prototype.connect = function (rinfo, header, handshake) {
return false;
}
}
+EndPoint.prototype.transmit = function (socket) {
+ var serializer = common.serializer, dgram = this.dgram, pending = socket._pending, peer = socket._peer;
+
+ // If we have data packets to retransmit, they go first, otherwise send a new
+ // data packet.
+ if (false) {
+
+ } else {
+ if (pending.length && !pending[0].length) {
+ pending.shift();
+ }
+
+ if (pending.length) {
+ // TODO: Is pop faster?
+ message = pending[0].shift();
+
+ // TODO: Wrap sequence number. See issue #24.
+ message.sequence = socket._sequence++;
+ }
+ }
+
+ if (message) {
+ serializer.reset();
+ say(message);
+ serializer.serialize('header', extend({ control: 0, timestamp: 0 }, message));
+ serializer.write(message.buffer);
+
+ dgram.send(message.buffer, 0, message.buffer.length, peer.port, peer.address);
+ }
+
+ // TODO: Something like this, but after actually calculating the time of the
+ // next packet using the congestion control algorithm.
+ if (pending.length > 1 || pending[0].length) {
+ sendQueue.schedule(socket, 0);
+ }
+}
// Reference counted cache of UDP datagram sockets.
var endPoints = {};
@@ -458,6 +603,61 @@ Socket.prototype.connect = function (options) {
socket._endPoint.shakeHands(socket);
}
}
+// There is no way to send the UDP packets without copying the user buffer into
+// new buffers. The UDP packets need a header before a chunk of the user data,
+// so we need to write the header, which means we need a buffer we can alter. We
+// cannot borrow the user's buffer.
+//
+// According to documentation, write returns false if the buffer cannot be
+// written to kernel, if it is queued in user memory, so we can hold onto it for
+// a while if we like. We pushback when the UDT send buffer, as defined by the
+// count of packets, is full.
+//
+// All this copying and allocation is disheartening. This is a place that needs
+// the attention of some benchmarks. If you can think of a way to avoid the
+// copying, please let me know. Nothing's occurring to me.
+
+
+// Total size of UDT data packet overhead, UDP header plus UDT data header.
+const UDP_HEADER_SIZE = 28;
+const UDT_DATA_HEADER_SIZE = 16;
+
+//
+Socket.prototype.write = function (buffer) {
+ var socket = this,
+ handshake = socket._handshake,
+ size = handshake.maxPacketSize - (UDT_DATA_HEADER_SIZE + UDT_DATA_HEADER_SIZE),
+ packet, count, i, length, message = [];
+
+ count = Math.floor(buffer.length / size);
+ if (buffer.length % size) count++;
+
+ for (i = 0; i < count; i++) {
+ packet = {
+ control: 0,
+ position: 0,
+ inOrder: 1,
+ number: socket._messageNumber,
+ destination: handshake.socketId,
+ buffer: new Buffer(UDT_DATA_HEADER_SIZE + Math.min(buffer.length - i * size, size))
+ };
+ // TODO: Does `Buffer.copy` choose the lesser of source length and
+ // destination length?
+ buffer.copy(packet.buffer, UDT_DATA_HEADER_SIZE, i * size);
+ if (i == 0) packet.position |= 0x2;
+ if (i == count - 1) packet.position |= 0x1;
+ message.push(packet);
+ }
+
+ socket._messageNumber++;
+ if (socket._messageNumber > MAX_MSG_NO) socket._messageNumber = 1;
+
+ socket._pending.push(message);
+
+ sendQueue.schedule(socket, [ 0, 0 ]);
+
+ return true;
+}
Socket.prototype.destroy = function () {
this._endPoint.shutdown(this);
}
View
@@ -17,6 +17,6 @@
{ "proof": "0.0.15"
}
, "dependencies":
- { "packet": "0.0.4"
+ { "packet": "git://github.com/bigeasy/packet.git#e04287354e20dceb1dd0436eb858732363936367"
}
}
View
@@ -39,7 +39,6 @@ var epoch = process.hrtime();
function log (participant, buffer, callback) {
parser.reset();
parser.extract('header', function (header) {
- console.log(process.hrtime(epoch));
epoch = process.hrtime();
if (header.control) {
console.log(participant + ': Control ' + types[header.type], header);
@@ -58,7 +57,6 @@ function log (participant, buffer, callback) {
console.log(participant + ': Data');
console.log(extend(header, { parser: parser.length, buffer: buffer.length }));
if (participant == 'Client' && buffer.length - parser.length == 4) {
- console.log("CLIENT %d", buffer.readInt32LE(parser.length));
callback();
} else {
// console.log(toArray(buffer));
Oops, something went wrong.

0 comments on commit 2739d74

Please sign in to comment.