Skip to content
Browse files

Implemented minimal connect-only client socket.

Created an `sdk` directory to contain minimalist network programs
written in C++ using the C++ implementation of UDT. I've created a
minimal client and server program to upload a file from the client to
the server. The upload sends only the file, and the server writes the
file to the filesystem using a generated file name. These programs
illustrate client connections and listening. The sender's algorithm is
exercised in the client. The receiver's algorithm is exercised in the
server.

This allows me to build out connecting and sending by implementing the
client in JavaScript, before building out listening and receiving by
implementing the server in JavaScript.

Implemented client connections, sending handshakes repeatedly until
timeout. Implemented shutdown using a shutdown packet.

Updated Packet to Packet 0.0.4 to employ the new and reduced API.

Closes #13.
  • Loading branch information...
1 parent 3bf7ee3 commit ccc292e151686f96513afac8c9b4855ec188d294 @bigeasy committed Oct 30, 2012
Showing with 1,036 additions and 189 deletions.
  1. +4 −0 .gitignore
  2. +5 −0 NOTES
  3. +9 −0 client.js
  4. +41 −0 common.js
  5. +58 −0 heap.js
  6. +346 −157 index.js
  7. +1 −1 package.json
  8. +0 −30 packets.js
  9. +4 −1 proxy.js
  10. +121 −0 recvfile.js
  11. +57 −0 sdk/Makefile
  12. +109 −0 sdk/client.cpp
  13. +162 −0 sdk/server.cpp
  14. +119 −0 sendfile.js
View
4 .gitignore
@@ -1 +1,5 @@
node_modules/*
+sdk/client
+sdk/client.o
+sdk/server
+sdk/server.o
View
5 NOTES
@@ -0,0 +1,5 @@
+Things that I'm running on occasion.
+
+(cd sdk && make -e os=OSX sdk=~/src/udt4/src)
+DYLD_LIBRARY_PATH=~/src/udt4/src sdk/server
+DYLD_LIBRARY_PATH=~/src/udt4/src sdk/client 127.0.0.1 9293 index.js
View
9 client.js
@@ -0,0 +1,9 @@
+var udt = require('./index');
+
+var socket = new udt.Socket;
+
+socket.connect(9293);
+socket.on('connect', function () {
+ console.log('connected');
+ socket.destroy();
+});
View
41 common.js
@@ -0,0 +1,41 @@
+var packet = require('packet');
+
+var packets = exports.packets =
+{ header: '\
+ b8( \
+ &0x80/1: \
+ b16{b1 => control, b15 => type}, x16{0} \
+ , -b32 => additional \
+ , b32 => timestamp \
+ , b32 => destination \
+ | \
+ b32{b1 => control, b31 => sequence} \
+ , b32{b2 => position, b1 => inOrder, b29 => number} \
+ , b32 => timestamp \
+ , b32 => destination \
+ ) \
+ '
+, handshake: '\
+ -b32 => version \
+ , -b32 => socketType \
+ , -b32 => sequence \
+ , -b32 => maxPacketSize \
+ , -b32 => windowSize \
+ , -b32 => connectionType \
+ , b32 => socketId \
+ , b32 => synCookie \
+ , b32 => address \
+ , x96{0} \
+ '
+}
+
+var parser = exports.parser = packet.createParser();
+var serializer = exports.serializer = packet.createSerializer();
+
+// TODO: Ugly. Parser cannot parse an empty packet.
+serializer.packet('header', packets.header);
+for (var name in packets) {
+ parser.packet(name, packets[name]);
+ if (name == 'header') continue;
+ serializer.packet(name, packets.header + ',' + packets[name]);
+}
View
58 heap.js
@@ -0,0 +1,58 @@
+// Taken from: https://gist.github.com/3088684
+
+// Heap Queue for JavaScript: smallest as head
+var Heap = function (less) {
+ less = less || function (a, b) {return a < b};
+ return Object.create(Heap.prototype, {
+ buf: {value: []},
+ less: {value: less},
+ });
+};
+Heap.prototype.push = function (elem) {
+ this.buf.push(elem);
+ var index = this.buf.length - 1;
+ while (index > 0) {
+ var parent = (index - 1) >> 1;
+ if (this.less(this.buf[index], this.buf[parent])) {
+ this.buf[index] = this.buf[parent];
+ this.buf[parent] = elem;
+ index = parent;
+ } else break;
+ }
+};
+Heap.prototype.pop = function () {
+ var ret = this.buf[0];
+ var elem = this.buf.pop();
+ if (this.buf.length === 0) return ret;
+ this.buf[0] = elem;
+ var index = 0;
+ for (var l = index * 2 + 1; l < this.buf.length; l = index * 2 + 1) {
+ var child = l;
+ var r = l + 1;
+ if (r < this.buf.length && this.less(this.buf[r], this.buf[l])) {
+ child = r;
+ }
+ if (this.less(this.buf[child], elem)) {
+ this.buf[index] = this.buf[child];
+ this.buf[child] = elem;
+ index = child;
+ } else break;
+ }
+ return ret;
+};
+Heap.prototype.empty = function () {
+ return this.buf.length === 0;
+}
+
+//example
+/*
+var data = [5,3,2,10,1,55,10,0,4];
+var h = Heap();
+data.forEach(h.push.bind(h));
+var sorted = [];
+while (!h.empty()) sorted.push(h.pop());
+console.log(data);
+console.log(sorted);
+*/
+
+exports.Heap = Heap;
View
503 index.js
@@ -1,204 +1,393 @@
var dgram = require('dgram')
, socket = dgram.createSocket('udp4')
, packet = require('packet')
+ , common = require('./common')
+ , crypto = require('crypto')
+ , Heap = require('./heap').Heap
, __slice = [].slice;
- function die () {
- console.log.apply(console, __slice.call(arguments, 0));
- return process.exit(1);
- }
+const CONTROL_TYPES = 'handshake'.split(/\s+/);
+const MAX_MSG_NO = 0x1FFFFFFF;
+const MAX_SEQ_NO = Math.pow(2, 31) - 1;
- function say () { return console.log.apply(console, __slice.call(arguments, 0)) }
+var Stream = require('stream');
+var util = require('util');
+var events = require('events');
+var net = require('net');
- function extend (to, from) {
- for (var key in from) to[key] = from[key];
- return to;
- }
+// The start of time used in our high resolution timings.
+var epoch = process.hrtime();
+
+// Comparison operator for high-resolution time for use with heap.
+function before (a, b) {
+ if (a.when[0] < b.when[0]) return true;
+ if (a.when[0] > b.when[0]) return false;
+ return a.when[1] < b.when[1];
+}
-var role = process.argv[2] ? 'client' : 'server';
-socket.bind(role == 'client' ? 9001 : 9000);
+function die () {
+ console.log.apply(console, __slice.call(arguments, 0));
+ return process.exit(1);
+}
-var packets = {};
+function say () { return console.log.apply(console, __slice.call(arguments, 0)) }
-function formatQuad (address) {
- var quad = [];
- for (i = 3; i >= 0; i--) {
- quad[i] = Math.floor(address / ( Math.pow(256, i) ))
- address = address % Math.pow(256, i)
- }
- return quad.join('.');
+function extend (to, from) {
+ for (var key in from) to[key] = from[key];
+ return to;
}
-function parseQuad (quad) {
- var address = 0;
+function parseDotDecimal (quad) {
quad = quad.split('.');
- for (i = 3; i >= 0; i--) {
+ for (var i = 3, address = 0; i >= 0; i--) {
address = address + quad[i] * Math.pow(256, i);
}
- return 0;
-}
-
-// No good place to put this yet, but it is nice that we get a full alternation
-// pattern one way, but we can't get it going back. We could also extract the
-// switch condition, then also skip it. `b8(&8x80 this| that) => switch`.
-// Otherwise, we need to dip into that first bit unpacking, pack it, then use
-// what we need to use for the test.
-//
-// Not sure how names were supposed work with alternation, actually.
-//
-// In the mean time, we can use Parser to break apart the header, but we can be
-// specific about how it is ressembled. Or, we can just dip into each alternate
-// and get the first option and test that using read/write. Wow. Someone was
-// thinking about this.
-var packets =
-{ header: '\
- b8( \
- &0x80/1: \
- b16{b1 => control, b15 => type}, x16{0} \
- , -b32 => additional \
- , -b32 => timestamp \
- , -b32 => destination \
- | \
- b32{b1 => control, b31 => sequence} \
- , b32{b2 => position, b1 => inOrder, b29 => number} \
- , -b32 => timestamp \
- , -b32 => destination \
- ) \
- '
-, handshake: '\
- -b32 => version \
- , -b32 => socketType \
- , -b32 => sequence \
- , -b32 => maxPacketSize \
- , -b32 => windowSize \
- , -b32 => connectionType \
- , -b32 => socketId \
- , -b32 => synCookie \
- , b32 => address \
- , x96{0} \
- '
-}
-
-var parser = new packet.Parser(), serializer = new packet.Serializer();
-
-for (var name in packets) {
- parser.packet(name, packets[name]);
- serializer.packet(name, packets.header + ',' + packets[name]);
-}
-
-var got;
-socket.on('message', function (buffer, remote) {
- got = buffer;
- parser.extract('header', recvHeader);
- parser.parse(buffer);
-});
-
-if (process.argv[2]) clientStart();
-function clientStart () {
- var handshake =
+ return address;
+}
+
+// Native control algorithm is an event emitter with certain properties. Ought
+// to be simple enough for the user to implement a native control algorithm as
+// an event emitter.
+function NativeControlAlgorithm () {
+ this.roundTripTime = 0;
+ this.maximumSegmentSize = 0;
+ this.estimatedBandwidth = 0;
+ this.latestPacketSequenceNo = 0;
+ this.windowSize = 16;
+}
+util.inherits(NativeControlAlgorithm, events.EventEmitter);
+
+var sendQueue = new (function () {
+ var queue = new Heap(before), sending = false;
+ function enqueue (socket, packet, when) {
+ queue.add({ socket: socket, packet: packet, when: when });
+ if (!sending) poll();
+ }
+ function poll () {
+ sending = true;
+ if (queue.empty()) {
+ sending = false;
+ } else {
+ send();
+ }
+ }
+ function send () {
+ var now = process.hrtime();
+ if (before(queue.buf[0], { when: now })) {
+ var entry = queue.pop(), socket = entry.socket, endPoint = socket._endPoint;
+ endPoint.send(socket, packet);
+ process.nextTick(poll);
+ }
+ }
+ extend(this, { enqueue: enqueue });
+})();
+
+function Socket (options) {
+ if (!(this instanceof Socket)) return new Socket();
+
+ if (options === void(0)) options = {};
+
+ Stream.call(this);
+
+ this._ccc = options.ccc || new NativeControlAlgorithm;
+ this._serializer = common.serializer.createSerializer();
+ this._parser = common.parser.createParser();
+ this._packet = new Buffer(1500);
+ this._sendQueue = new Heap;
+ this._receiveQueue = new Heap;
+}
+util.inherits(Socket, Stream);
+
+exports.Socket = Socket;
+
+function exceptional () { return new Error(); }
+
+// Wrapper around an underlying UDP datagram socket.
+function EndPoint (options) {
+ this.references = 1;
+ this.dgram = dgram.createSocket('udp4');
+ this.dgram.on('message', EndPoint.prototype.receive.bind(this));
+ this.dgram.bind(options.localPort, options.localAddress);
+ var address = this.dgram.address();
+ this.localPort = address.port;
+ this.localAddress = options.localAddress;
+ this.packet = new Buffer(2048);
+ this.sockets = {};
+}
+EndPoint.prototype.shakeHands = function (socket) {
+ // Stash the socket so we can track it by the socket identifier.
+ this.sockets[socket._socketId] = socket;
+
+ // Start of client handshake.
+ socket._status = "syn";
+
+ // Send a handshake. Use hard-coded defaults for packet and window size.
+ this.sendHandshake(socket,
{ control: 1
, type: 0
, additional: 0
, timestamp: 0
, destination: 0
, version: 4
, socketType: 1
- , sequence: 1582673141
+ , sequence: socket._sequence
, maxPacketSize: 1500
, windowSize: 8192
, connectionType: 1
- , socketId: 13
+ , socketId: socket._socketId
, synCookie: 0
- , address: 16777343
- };
- serializer.buffer('handshake', handshake, function (buffer) {
- socket.send(buffer, 0, buffer.length, 9000, '127.0.0.1');
+ , address: parseDotDecimal(socket._peer.host)
});
}
+EndPoint.prototype.shutdown = function (socket) {
+ // Remove the socket from the stash.
+ delete this.sockets[socket._socketId];
+
+ // Zero the status.
+ delete socket._status;
+
+ var serializer = common.serializer,
+ packet = this.packet, dgram = this.dgram,
+ count = 0, peer = socket._peer;
+
+ // Format a shutdown packet, simply a header packet of type shutdown.
+ serializer.reset();
+ serializer.serialize('header',
+ { control: 1
+ , type: 0x5
+ , additional: 0
+ , timestamp: 0
+ , destination: peer.socketId
+ });
+ serializer.write(packet);
+ var callback = function () {};
-function recvHeader (header) {
- if (header.control) {
- switch (header.type) {
- case 0x0:
- parser.extract('handshake', dialog[role](header));
- break;
- default:
- say(header);
+ // Dispose of the end point and UDP socket if it is no longer referenced.
+ if (--this.references == 0) {
+ var address = this.dgram.address();
+ delete endPoints[this.localPort][this.localAddress];
+ if (Object.keys(endPoints[this.localPort]).length == 0) {
+ delete endPoints[this.localPort];
}
+ callback = function () { dgram.close() };
}
+
+ dgram.send(packet, 0, serializer.length, peer.port, peer.host, callback);
}
+// Send the handshake four times a second until we get a response, or until four
+// seconds is up.
+EndPoint.prototype.sendHandshake = function (socket, handshake) {
+ var serializer = common.serializer,
+ packet = this.packet, dgram = this.dgram,
+ count = 0, peer = socket._peer;
+ socket._handshakeInterval = setInterval(function () {
+ if (++count == 12) {
+ clearInterval(socket._handshakeInterval);
+ socket.emit('error', new Error('connection timeout'));
+ } else {
+ serializer.reset();
+ serializer.serialize('handshake', handshake);
+ serializer.write(packet);
-var dialog =
-{ client: function (header) {
- return function (handshake) {
- if (count) return;
- count++;
- handshake = extend(header, handshake);
- say(extend({}, handshake));
- handshake.destination = 0;
- handshake.socketId = 13;
- handshake.connectionType = -1;
- serializer.buffer('handshake', handshake, function (buffer) {
- // say(toArray(buffer));
- // say(toArray(got));
- say(buffer.length);
- socket.send(buffer, 0, buffer.length, 9000, '127.0.0.1', function (e) {
- console.log('sendt');
- });
- socket.on('error', function (e) { throw e });
- });
- }
+ dgram.send(packet, 0, serializer.length, peer.port, peer.host);
}
-, server: function (header) {
- return function (handshake) {
- handshake = extend(header, handshake);
- say(extend({}, handshake));
- if (count == 1) {
- socket.close();
- } else {
- // say(toArray(buffer));
- // say(toArray(got));
- count++;
- var response =
- { control: 1
- , type: 0
- , additional: 0
- , timestamp: 0
- , destination: handshake.destination
- , version: 4
- , socketType: 1
- , sequence: 3
- , maxPacketSize: 1500
- , windowSize: 8192
- , connectionType: -1
- , socketId: handshake.destination
- , synCookie: 9
- , address: 16777343
- };
- say(extend({}, response));
- serializer.buffer('handshake', response, function (buffer) {
- socket.send(buffer, 0, buffer.length, 9000, '127.0.0.1');
- });
+ }, 250);
+}
+EndPoint.prototype.receive = function (msg, rinfo) {
+ var endPoint = this, parser = common.parser, handler;
+ parser.extract('header', function (header) {
+ if (header.control) {
+ // TODO: Socket not found...
+ var socket = endPoint.sockets[header.destination];
+ switch (header.type) {
+ case 0x1:
+ // Keep-alive.
+ break;
+ case 0x5:
+ // Shutdown.
+ break;
+ case 0x6:
+ // Notifications from Bill the Cat. (Ack-ack.)
+ break;
+ default:
+ var name = CONTROL_TYPES[header.type];
+ parser.extract(name, endPoint[name].bind(endPoint, socket, header))
}
+ } else {
+ }
+ });
+ parser.parse(msg);
+}
+EndPoint.prototype.handshake = function (socket, header, handshake) {
+ switch (socket._status) {
+ case "syn":
+ // Only respond to an initial handshake.
+ if (handshake.connectionType != 1) break;
+
+ clearInterval(socket._handshakeInterval);
+
+ socket._status = "syn-ack";
+
+ // Unify the packet object for serialization.
+ handshake = extend(handshake, header);
+
+ // Set the destination to nothing.
+ handshake.destination = 0;
+
+ // Select the lesser of the negotiated values.
+ // TODO: Constants are a bad thing...
+ handshake.maxPacketSize = Math.min(handshake.maxPacketSize, 1500);
+ handshake.windowSize = Math.min(handshake.windowSize, 8192);
+ handshake.connectionType = -1;
+
+ this.sendHandshake(socket, handshake);
+ break;
+ case "syn-ack":
+ // Only respond to an follow-up handshake.
+ if (handshake.connectionType != -1) break;
+
+ clearInterval(socket._handshakeInterval);
+
+ socket._status = "connected";
+ socket._peer.socketId = handshake.socketId;
+
+ socket.emit('connect');
+ break;
+ }
+}
+
+// Reference counted cache of UDP datagram sockets.
+var endPoints = {};
+
+// Create a new UDP datagram socket from the user specified port and address.
+
+//
+function createEndPoint (options) {
+ var endPoint = new EndPoint(options), address = endPoint.dgram.address();
+ if (!endPoints[endPoint.localPort]) endPoints[endPoint.localPort] = {};
+ return endPoints[endPoint.localPort][endPoint.localAddress] = endPoint;
+}
+
+// Look up an UDP datagram socket in the cache of bound UDP datagram sockets by
+// the user specified port and address.
+
+//
+function lookupEndPoint (options) {
+ // No interfaces bound by the desired port. Note that this would also work for
+ // zero, which indicates an ephemeral binding, but we check for that case
+ // explicitly before calling this function.
+ if (!endPoints[options.localPort]) return null;
+
+ // Read datagram socket from cache.
+ var endPoint = endPoints[options.localPort][options.localAddress];
+
+ // If no datagram exists, ensure that we'll be able to create one. This only
+ // inspects ports that have been bound by UDT, not by other protocols, so
+ // there is still an opportuntity for error when the UDP bind is invoked.
+ if (!endPoint) {
+ if (endPoints[options.localPort][0]) {
+ throw new Error('Already bound to all interfaces.');
+ }
+ if (options.localAddress == 0) {
+ throw new Error('Cannot bind to all interfaces because some interfaces are already bound.');
}
}
+
+ // Return cached datagram socket or nothing.
+ return endPoint;
}
-var count = 0;
+function validator (ee) {
+ return function (forward) { return check(ee, forward) }
+}
-function toArray (buffer) {
- return buffer.toString('hex').replace(/(..)/g, ':$1').replace(/(.{12})/g, '\n$1').replace(/\n:/g, '\n');
+function check (ee, forward) {
+ return function (error) {
+ if (error) {
+ process.nextTick(function () {
+ ee.emit('error', error);
+ ee._destroy();
+ });
+ } else {
+ try {
+ forward.apply(null, __slice.call(arguments, 1));
+ } catch (error) {
+ ee.emit('error', error);
+ }
+ }
+ }
}
-function sendHeader (sendHandshake) {
- return function () {
- say(toArray(arguments[0]))
- say(toArray(got));
- die('here');
+Socket.prototype.connect = function (options) {
+ // Convert legacy 'net' module parameters to an options object.
+ if (typeof options != 'object') {
+ var args = net._normalizeConnectArgs(arguments);
+ return Socket.prototype.connect.apply(this, args);
}
+
+ var socket = this;
+
+ if (socket._dgram) throw new Error('Already connected');
+
+ socket._options = options = extend({}, options);
+
+ if (options.path) throw new Error('UNIX domain sockets are not supported.');
+ if (!options.port) throw new Error('Remote port is required.');
+
+ // Assign reasonable defaults for unspecified connection properties.
+ if (!options.host) options.host = '127.0.0.1';
+ if (!options.localAddress) options.localAddress = 0;
+ if (!options.localPort) options.localPort = 0;
+
+ // Convert local address to a 32 bit integer.
+ if (typeof options.localAddress == 'string') {
+ options.localAddress = parseDotDecimal(options.localAddress);
+ }
+
+ // Use an existing datagram socket if one exists.
+ if (options.localPort == 0) {
+ socket._endPoint = createEndPoint(options);
+ } else if (!(socket._endPoint = lookupEndPoint(options))) {
+ socket._endPoint = createEndPoint(options);
+ }
+
+ socket._connecting = true;
+
+ var valid = validator(socket);
+
+ require('dns').lookup(options.host, valid(resolved));
+
+ // Record the DNS resolved IP address.
+ function resolved (ip, addressType) {
+ // Possible cancelation during DNS lookup.
+ if (!socket._connecting) return;
+
+ socket._peer = { host: ip || '127.0.0.1', port: options.port };
+
+ // Generate random bytes used to set randomized socket properties.
+ // `crypto.randomBytes` calls OpenSSL `RAND_bytes` to generate the bytes.
+ //
+ // * [RAND_bytes](http://www.openssl.org/docs/crypto/RAND_bytes.html).
+ // * [node_crypto.cc](https://github.com/joyent/node/blob/v0.8/src/node_crypto.cc#L4517)
+ crypto.randomBytes(8, valid(randomzied));
+ }
+
+ // Initialize the randomized socket properies.
+ function randomzied (buffer) {
+ // Randomly generated randomness.
+ socket._sequence = buffer.readUInt32BE(0) % MAX_SEQ_NO;
+ socket._socketId = buffer.readUInt32BE(4);
+
+ // The end point sends a packet on our behalf.
+ socket._endPoint.shakeHands(socket);
+ }
+}
+Socket.prototype.destroy = function () {
+ this._endPoint.shutdown(this);
}
+Socket.prototype._destroy = Socket.prototype.destroy;
-function sendHandshake (handshake) {
-
+function toArray (buffer) {
+ return buffer.toString('hex').replace(/(..)/g, ':$1').replace(/(.{12})/g, '\n$1').replace(/\n:/g, '\n');
}
View
2 package.json
@@ -17,6 +17,6 @@
{ "proof": "0.0.15"
}
, "dependencies":
- { "packet": "git://github.com/bigeasy/packet.git#6881dcfed57184a9744d1000cd3ca47f5cf127b5"
+ { "packet": "0.0.4"
}
}
View
30 packets.js
@@ -1,30 +0,0 @@
-var packets =
-{ header: '\
- b8( \
- &0x80/1: \
- b16{b1 => control, b15 => type}, x16{0} \
- , -b32 => additional \
- , -b32 => timestamp \
- , -b32 => destination \
- | \
- b32{b1 => control, b31 => sequence} \
- , b32{b2 => position, b1 => inOrder, b29 => number} \
- , -b32 => timestamp \
- , -b32 => destination \
- ) \
- '
-, handshake: '\
- -b32 => version \
- , -b32 => socketType \
- , -b32 => sequence \
- , -b32 => maxPacketSize \
- , -b32 => windowSize \
- , -b32 => connectionType \
- , -b32 => socketId \
- , b32 => synCookie \
- , b32 => address \
- , x96{0} \
- '
-}
-
-module.exports = packets;
View
5 proxy.js
@@ -22,7 +22,7 @@ var types = "Handshake Keep-alive Acknowledgement Negative-acknowledgement \
var parser = packet.createParser();
-packets = require('./packets');
+packets = require('./common').packets;
parser.packet('header', packets.header);
parser.packet('handshake', packets.handshake);
@@ -35,9 +35,12 @@ client.on('message', function (buffer, $info) {
log('Client', buffer, function () { server.send(buffer, 0, buffer.length, 9000, '127.0.0.1'); });
});
+var epoch = process.hrtime();
function log (participant, buffer, callback) {
parser.reset();
parser.extract('header', function (header) {
+ console.log(process.hrtime(epoch));
+ epoch = process.hrtime();
if (header.control) {
console.log(participant + ': Control ' + types[header.type], header);
switch (header.type) {
View
121 recvfile.js
@@ -0,0 +1,121 @@
+var dgram = require('dgram')
+ , socket = dgram.createSocket('udp4')
+ , packet = require('packet')
+ , packets = require('./packets')
+ , __slice = [].slice;
+
+const MAX_MSG_NO = 0x1FFFFFFF;
+
+var Socket = require('./index').Socket;
+
+function die () {
+ console.log.apply(console, __slice.call(arguments, 0));
+ return process.exit(1);
+}
+
+function say () { return console.log.apply(console, __slice.call(arguments, 0)) }
+
+function extend (to, from) {
+ for (var key in from) to[key] = from[key];
+ return to;
+}
+
+function formatQuad (address) {
+ var quad = [];
+ for (i = 3; i >= 0; i--) {
+ quad[i] = Math.floor(address / ( Math.pow(256, i) ))
+ address = address % Math.pow(256, i)
+ }
+ return quad.join('.');
+}
+
+function parseQuad (quad) {
+ var address = 0;
+ quad = quad.split('.');
+ for (i = 3; i >= 0; i--) {
+ address = address + quad[i] * Math.pow(256, i);
+ }
+ return 0;
+}
+
+var socket = new Socket;
+
+socket.connect(9293, function () {
+});
+
+var parser = new packet.createParser(), serializer = packet.createSerializer();
+
+for (var name in packets) {
+ parser.packet(name, packets[name]);
+ if (name == 'header') continue;
+ serializer.packet(name, packets.header + ',' + packets[name]);
+}
+
+var got;
+socket.on('message', function (buffer, remote) {
+ got = buffer;
+ parser.extract('header', receive);
+ parser.parse(buffer);
+});
+
+var buffer = new Buffer(8192);
+
+if (process.argv[2]) clientStart();
+
+function clientStart () {
+ var handshake =
+ { control: 1
+ , type: 0
+ , additional: 0
+ , timestamp: 0
+ , destination: 0
+ , version: 4
+ , socketType: 1
+ , sequence: 1582673141
+ , maxPacketSize: 1500
+ , windowSize: 8192
+ , connectionType: 1
+ , socketId: 13
+ , synCookie: 0
+ , address: 16777343
+ };
+ serializer.serialize('handshake', handshake);
+ serializer.write(buffer);
+ socket.send(buffer, 0, serializer.length, 9293, '127.0.0.1');
+}
+
+var controlTypes = 'handshake'.split(/\s+/);
+
+function receive (header) {
+ if (header.control) {
+ if (!controlTypes[header.type]) throw new Error("bogotronic");
+ parser.extract(controlTypes[header.type], receptionist[role][header.type](header));
+ }
+}
+
+var receptionist = { client: [] };
+
+receptionist.client.push(function (header) { return function (handshake) {
+ switch (handshake.connectionType) {
+ case -1:
+ die('here', header, handshake);
+ break;
+ case 1:
+ handshake = extend(handshake, header);
+ handshake.connectionType = -1;
+ handshake.destination = 0;
+
+ serializer.reset();
+ serializer.serialize('handshake', handshake);
+ serializer.write(buffer);
+
+ socket.send(buffer, 0, serializer.length, 9293, '127.0.0.1');
+ break;
+ default:
+ throw new Error("badtastic");
+ }
+}});
+
+function toArray (buffer) {
+ return buffer.toString('hex').replace(/(..)/g, ':$1').replace(/(.{12})/g, '\n$1').replace(/\n:/g, '\n');
+}
View
57 sdk/Makefile
@@ -0,0 +1,57 @@
+C++ = g++
+
+ifndef os
+ os = LINUX
+endif
+
+ifndef arch
+ arch = IA32
+endif
+
+CCFLAGS = -Wall -D$(os) -I$(sdk) -finline-functions -O3
+
+ifeq ($(arch), IA32)
+ CCFLAGS += -DIA32 #-mcpu=pentiumpro -march=pentiumpro -mmmx -msse
+endif
+
+ifeq ($(arch), POWERPC)
+ CCFLAGS += -mcpu=powerpc
+endif
+
+ifeq ($(arch), IA64)
+ CCFLAGS += -DIA64
+endif
+
+ifeq ($(arch), SPARC)
+ CCFLAGS += -DSPARC
+endif
+
+LDFLAGS = -L$(sdk) -ludt -lstdc++ -lpthread -lm
+
+ifeq ($(os), UNIX)
+ LDFLAGS += -lsocket
+endif
+
+ifeq ($(os), SUNOS)
+ LDFLAGS += -lrt -lsocket
+endif
+
+DIR = $(shell pwd)
+
+APP = server client
+
+all: $(APP)
+
+%.o: %.cpp
+ $(C++) $(CCFLAGS) $< -c
+
+server: server.o
+ $(C++) $^ -o $@ $(LDFLAGS)
+client: client.o
+ $(C++) $^ -o $@ $(LDFLAGS)
+
+clean:
+ rm -f *.o $(APP)
+
+install:
+ export PATH=$(DIR):$$PATH
View
109 sdk/client.cpp
@@ -0,0 +1,109 @@
+#ifndef WIN32
+ #include <arpa/inet.h>
+ #include <netdb.h>
+#else
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+#endif
+#include <fstream>
+#include <iostream>
+#include <cstdlib>
+#include <cstring>
+#include <udt.h>
+
+#define BIND_EXCEPTION 0
+
+using namespace std;
+
+int main(int argc, char* argv[])
+{
+ if ((argc != 4) || (0 == atoi(argv[2])))
+ {
+ cout << "usage: client server_ip server_port local_filename" << endl;
+ return -1;
+ }
+
+ // use this function to initialize the UDT library
+ UDT::startup();
+
+ struct addrinfo hints, *peer;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+
+
+ UDTSOCKET fhandle1 = UDT::socket(hints.ai_family, hints.ai_socktype, hints.ai_protocol);
+ if (BIND_EXCEPTION) {
+ struct sockaddr_in ip4addr;
+ ip4addr.sin_family = AF_INET;
+ ip4addr.sin_port = htons(9557);
+ inet_pton(AF_INET, "192.168.56.1", &ip4addr.sin_addr);
+ UDT::bind(fhandle1, (struct sockaddr*)&ip4addr, sizeof(ip4addr));
+ }
+
+ UDTSOCKET fhandle = UDT::socket(hints.ai_family, hints.ai_socktype, hints.ai_protocol);
+
+ if (BIND_EXCEPTION) {
+ struct sockaddr_in ip4addr;
+ ip4addr.sin_family = AF_INET;
+ ip4addr.sin_port = htons(9557);
+ inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
+ UDT::bind(fhandle, (struct sockaddr*)&ip4addr, sizeof(ip4addr));
+ }
+
+ if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer))
+ {
+ cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
+ return -1;
+ }
+
+ // connect to the server, implict bind
+ if (UDT::ERROR == UDT::connect(fhandle, peer->ai_addr, peer->ai_addrlen))
+ {
+ cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
+ return -1;
+ }
+
+ freeaddrinfo(peer);
+
+ // open the file
+ fstream ifs(argv[3], ios::in | ios::binary);
+
+ ifs.seekg(0, ios::end);
+ int64_t size = ifs.tellg();
+ ifs.seekg(0, ios::beg);
+
+ cout << "file size: " << size << endl;
+
+ // send file size information
+ if (UDT::ERROR == UDT::send(fhandle, (char*)&size, sizeof(int64_t), 0))
+ {
+ cout << "send: " << UDT::getlasterror().getErrorMessage() << endl;
+ return -1;
+ }
+
+ UDT::TRACEINFO trace;
+ UDT::perfmon(fhandle, &trace);
+
+ // send the file
+ int64_t offset = 0;
+ if (UDT::ERROR == UDT::sendfile(fhandle, ifs, offset, size))
+ {
+ cout << "sendfile: " << UDT::getlasterror().getErrorMessage() << endl;
+ return -1;
+ }
+
+ UDT::perfmon(fhandle, &trace);
+ cout << "speed = " << trace.mbpsSendRate << "Mbits/sec" << endl;
+
+ UDT::close(fhandle);
+ UDT::close(fhandle1);
+
+ ifs.close();
+
+ // use this function to release the UDT library
+ UDT::cleanup();
+
+ return 0;
+}
View
162 sdk/server.cpp
@@ -0,0 +1,162 @@
+#ifndef WIN32
+ #include <cstdlib>
+ #include <netdb.h>
+#else
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+#endif
+#include <fstream>
+#include <iostream>
+#include <sstream>
+#include <cstring>
+#include <udt.h>
+
+using namespace std;
+
+#ifndef WIN32
+void* server(void*);
+#else
+DWORD WINAPI server(LPVOID);
+#endif
+
+int main(int argc, char* argv[])
+{
+ //usage: sendfile [server_port]
+ if ((2 < argc) || ((2 == argc) && (0 == atoi(argv[1]))))
+ {
+ cout << "usage: sendfile [server_port]" << endl;
+ return 0;
+ }
+
+ // use this function to initialize the UDT library
+ UDT::startup();
+
+ addrinfo hints;
+ addrinfo* res;
+
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+
+ string service("9000");
+ if (2 == argc)
+ service = argv[1];
+
+ if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res))
+ {
+ cout << "illegal port number or port is busy.\n" << endl;
+ return 0;
+ }
+
+ UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+
+ // Windows UDP issue
+ // For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold
+#ifdef WIN32
+ int mss = 1052;
+ UDT::setsockopt(serv, 0, UDT_MSS, &mss, sizeof(int));
+#endif
+
+ if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen))
+ {
+ cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
+ return 0;
+ }
+
+ freeaddrinfo(res);
+
+ cout << "server is ready at port: " << service << endl;
+
+ UDT::listen(serv, 10);
+
+ sockaddr_storage clientaddr;
+ int addrlen = sizeof(clientaddr);
+
+ UDTSOCKET fhandle;
+
+ while (true)
+ {
+ if (UDT::INVALID_SOCK == (fhandle = UDT::accept(serv, (sockaddr*)&clientaddr, &addrlen)))
+ {
+ cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
+ return 0;
+ }
+
+ char clienthost[NI_MAXHOST];
+ char clientservice[NI_MAXSERV];
+ getnameinfo((sockaddr *)&clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST|NI_NUMERICSERV);
+ cout << "new connection: " << clienthost << ":" << clientservice << endl;
+
+ #ifndef WIN32
+ pthread_t filethread;
+ pthread_create(&filethread, NULL, server, new UDTSOCKET(fhandle));
+ pthread_detach(filethread);
+ #else
+ CreateThread(NULL, 0, server, new UDTSOCKET(fhandle), 0, NULL);
+ #endif
+ }
+
+ UDT::close(serv);
+
+ // use this function to release the UDT library
+ UDT::cleanup();
+
+ return 0;
+}
+
+#ifndef WIN32
+void* server(void* usocket)
+#else
+DWORD WINAPI server(LPVOID usocket)
+#endif
+{
+ UDTSOCKET fhandle = *(UDTSOCKET*)usocket;
+ delete (UDTSOCKET*)usocket;
+
+ // aquiring file name information from client
+ int64_t size;
+
+ cout << "here" << endl;
+
+ if (UDT::ERROR == UDT::recv(fhandle, (char*)&size, sizeof(int64_t), 0))
+ {
+ cout << "write: " << UDT::getlasterror().getErrorMessage() << endl;
+ return 0;
+ }
+
+ cout << "there" << endl;
+
+ UDT::TRACEINFO trace;
+ UDT::perfmon(fhandle, &trace);
+
+ std::stringstream filename;
+
+ filename << "upload." << time(0) << ".out";
+
+ cout << "filename:" << filename.str() << endl;
+
+ // receive the file
+ fstream ofs(filename.str().c_str(), ios::out | ios::binary | ios::trunc);
+ int64_t recvsize;
+ int64_t offset = 0;
+
+ if (UDT::ERROR == (recvsize = UDT::recvfile(fhandle, ofs, offset, size)))
+ {
+ cout << "recvfile: " << UDT::getlasterror().getErrorMessage() << endl;
+ return 0;
+ }
+
+ UDT::perfmon(fhandle, &trace);
+ cout << "speed = " << trace.mbpsSendRate << "Mbits/sec" << endl;
+
+ UDT::close(fhandle);
+
+ ofs.close();
+
+ #ifndef WIN32
+ return NULL;
+ #else
+ return 0;
+ #endif
+}
View
119 sendfile.js
@@ -0,0 +1,119 @@
+var dgram = require('dgram')
+ , socket = dgram.createSocket('udp4')
+ , packet = require('packet')
+ , packets = require('./packets')
+ , __slice = [].slice;
+
+const MAX_MSG_NO = 0x1FFFFFFF;
+
+var Socket = require('./index').Socket;
+
+function die () {
+ console.log.apply(console, __slice.call(arguments, 0));
+ return process.exit(1);
+}
+
+function say () { return console.log.apply(console, __slice.call(arguments, 0)) }
+
+function extend (to, from) {
+ for (var key in from) to[key] = from[key];
+ return to;
+}
+
+var role = process.argv[2] ? 'client' : 'server';
+socket.bind(role == 'client' ? 9001 : 9000);
+
+function formatQuad (address) {
+ var quad = [];
+ for (i = 3; i >= 0; i--) {
+ quad[i] = Math.floor(address / ( Math.pow(256, i) ))
+ address = address % Math.pow(256, i)
+ }
+ return quad.join('.');
+}
+
+function parseQuad (quad) {
+ var address = 0;
+ quad = quad.split('.');
+ for (i = 3; i >= 0; i--) {
+ address = address + quad[i] * Math.pow(256, i);
+ }
+ return 0;
+}
+
+var parser = new packet.createParser(), serializer = packet.createSerializer();
+
+for (var name in packets) {
+ parser.packet(name, packets[name]);
+ if (name == 'header') continue;
+ serializer.packet(name, packets.header + ',' + packets[name]);
+}
+
+var got;
+socket.on('message', function (buffer, remote) {
+ got = buffer;
+ parser.extract('header', receive);
+ parser.parse(buffer);
+});
+
+var buffer = new Buffer(8192);
+
+if (process.argv[2]) clientStart();
+
+function clientStart () {
+ var handshake =
+ { control: 1
+ , type: 0
+ , additional: 0
+ , timestamp: 0
+ , destination: 0
+ , version: 4
+ , socketType: 1
+ , sequence: 1582673141
+ , maxPacketSize: 1500
+ , windowSize: 8192
+ , connectionType: 1
+ , socketId: 13
+ , synCookie: 0
+ , address: 16777343
+ };
+ serializer.serialize('handshake', handshake);
+ serializer.write(buffer);
+ socket.send(buffer, 0, serializer.length, 9293, '127.0.0.1');
+}
+
+var controlTypes = 'handshake'.split(/\s+/);
+
+function receive (header) {
+ if (header.control) {
+ if (!controlTypes[header.type]) throw new Error("bogotronic");
+ parser.extract(controlTypes[header.type], receptionist[role][header.type](header));
+ }
+}
+
+var receptionist = { client: [] };
+
+receptionist.client.push(function (header) { return function (handshake) {
+ switch (handshake.connectionType) {
+ case -1:
+ die('here', header, handshake);
+ break;
+ case 1:
+ handshake = extend(handshake, header);
+ handshake.connectionType = -1;
+ handshake.destination = 0;
+
+ serializer.reset();
+ serializer.serialize('handshake', handshake);
+ serializer.write(buffer);
+
+ socket.send(buffer, 0, serializer.length, 9293, '127.0.0.1');
+ break;
+ default:
+ throw new Error("badtastic");
+ }
+}});
+
+function toArray (buffer) {
+ return buffer.toString('hex').replace(/(..)/g, ':$1').replace(/(.{12})/g, '\n$1').replace(/\n:/g, '\n');
+}

0 comments on commit ccc292e

Please sign in to comment.
Something went wrong with that request. Please try again.