From 02da5ed4a1f63bb0990b8e7b1fd0793cd045cbb0 Mon Sep 17 00:00:00 2001 From: Paul Querna Date: Sat, 12 Jun 2010 02:40:24 -0700 Subject: [PATCH] Implement datagram sockets - Adds new dgram module, for all data-gram type transports - Supports both UDP client and servers - Supports Unix Daemon sockets in DGRAM mode too (think syslog) - Uses a shared Buffer and slices that as needed to be reasonably performant. - One supplied test program so far, test-dgram-pingpong - Passes test cases on osx 10.6 and ubuntu 9.10u --- lib/dgram.js | 184 +++++++++++++++++++++++++++++ src/node.cc | 1 + src/node_net.cc | 157 ++++++++++++++++++++++++ test/simple/test-dgram-pingpong.js | 83 +++++++++++++ 4 files changed, 425 insertions(+) create mode 100644 lib/dgram.js create mode 100644 test/simple/test-dgram-pingpong.js diff --git a/lib/dgram.js b/lib/dgram.js new file mode 100644 index 00000000000..0a047672243 --- /dev/null +++ b/lib/dgram.js @@ -0,0 +1,184 @@ +var sys = require("sys"); +var fs = require("fs"); +var events = require("events"); +var dns = require('dns'); + +var Buffer = require('buffer').Buffer; +var IOWatcher = process.IOWatcher; +var binding = process.binding('net'); +var socket = binding.socket; +var bind = binding.bind; +var recvfrom = binding.recvfrom; +var sendto = binding.sendto; +var close = binding.close; +var ENOENT = binding.ENOENT; + +function isPort (x) { return parseInt(x) >= 0; } +var pool = null; + +function getPool() { + /* TODO: this effectively limits you to 8kb maximum packet sizes */ + var minPoolAvail = 1024 * 8; + + var poolSize = 1024 * 64; + + if (pool === null || (pool.used + minPoolAvail > pool.length)) { + pool = new Buffer(poolSize); + pool.used = 0; + } + + return pool; +} + +function Socket (listener) { + events.EventEmitter.call(this); + var self = this; + + if (listener) { + self.addListener('message', listener); + } + + self.watcher = new IOWatcher(); + self.watcher.host = self; + self.watcher.callback = function () { + while (self.fd) { + var p = getPool(); + var rinfo = recvfrom(self.fd, p, p.used, p.length - p.used, 0); + + if (!rinfo) return; + + self.emit('message', p.slice(p.used, p.used + rinfo.size), rinfo); + + p.used += rinfo.size; + } + }; +} + +sys.inherits(Socket, events.EventEmitter); +exports.Socket = Socket; + +exports.createSocket = function (listener) { + return new Socket(listener); +}; + +Socket.prototype.bind = function () { + var self = this; + if (self.fd) throw new Error('Server already opened'); + + if (!isPort(arguments[0])) { + /* TODO: unix path dgram */ + self.fd = socket('unix_dgram'); + self.type = 'unix_dgram'; + var path = arguments[0]; + self.path = path; + // unlink sockfile if it exists + fs.stat(path, function (err, r) { + if (err) { + if (err.errno == ENOENT) { + bind(self.fd, path); + process.nextTick(function() { + self._startWatcher(); + }); + } else { + throw r; + } + } else { + if (!r.isFile()) { + throw new Error("Non-file exists at " + path); + } else { + fs.unlink(path, function (err) { + if (err) { + throw err; + } else { + bind(self.fd, path); + process.nextTick(function() { + self._startWatcher(); + }); + } + }); + } + } + }); + } else if (!arguments[1]) { + // Don't bind(). OS will assign a port with INADDR_ANY. + // The port can be found with server.address() + self.type = 'udp4'; + self.fd = socket(self.type); + bind(self.fd, arguments[0]); + process.nextTick(function() { + self._startWatcher(); + }); + } else { + // the first argument is the port, the second an IP + var port = arguments[0]; + dns.lookup(arguments[1], function (err, ip, addressType) { + if (err) { + self.emit('error', err); + } else { + self.type = addressType == 4 ? 'udp4' : 'udp6'; + self.fd = socket(self.type); + bind(self.fd, port, ip); + process.nextTick(function() { + self._startWatcher(); + }); + } + }); + } +}; + +Socket.prototype._startWatcher = function () { + this.watcher.set(this.fd, true, false); + this.watcher.start(); + this.emit("listening"); +}; + +Socket.prototype.address = function () { + return getsockname(this.fd); +}; + +Socket.prototype.send = function(port, addr, buffer, offset, length) { + var self = this; + + if (!isPort(arguments[0])) { + if (!self.fd) { + self.type = 'unix_dgram'; + self.fd = socket(self.type); + } + sendto(self.fd, buffer, offset, length, 0, port, addr); + } + else { + dns.lookup(arguments[1], function (err, ip, addressType) { + if (err) { + self.emit('error', err); + } else { + if (!self.fd) { + self.type = addressType == 4 ? 'udp4' : 'udp6'; + self.fd = socket(self.type); + process.nextTick(function() { + self._startWatcher(); + }); + } + sendto(self.fd, buffer, offset, length, 0, port, ip); + } + }); + } +}; + +Socket.prototype.close = function () { + var self = this; + + if (!self.fd) throw new Error('Not running'); + + self.watcher.stop(); + + close(self.fd); + self.fd = null; + + if (self.type === "unix_dgram") { + fs.unlink(self.path, function () { + self.emit("close"); + }); + } else { + self.emit("close"); + } +}; diff --git a/src/node.cc b/src/node.cc index d06925c664a..a83cd7b9114 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1845,6 +1845,7 @@ static Handle Binding(const Arguments& args) { exports->Set(String::New("assert"), String::New(native_assert)); exports->Set(String::New("buffer"), String::New(native_buffer)); exports->Set(String::New("child_process"),String::New(native_child_process)); + exports->Set(String::New("dgram"), String::New(native_dgram)); exports->Set(String::New("dns"), String::New(native_dns)); exports->Set(String::New("events"), String::New(native_events)); exports->Set(String::New("file"), String::New(native_file)); diff --git a/src/node_net.cc b/src/node_net.cc index e414c876bb1..d7b75c83394 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -42,6 +42,7 @@ static Persistent errno_symbol; static Persistent syscall_symbol; static Persistent fd_symbol; +static Persistent size_symbol; static Persistent address_symbol; static Persistent port_symbol; static Persistent type_symbol; @@ -147,7 +148,16 @@ static Handle Socket(const Arguments& args) { } else if (0 == strcasecmp(*t, "UNIX")) { domain = PF_UNIX; type = SOCK_STREAM; + } else if (0 == strcasecmp(*t, "UNIX_DGRAM")) { + domain = PF_UNIX; + type = SOCK_DGRAM; } else if (0 == strcasecmp(*t, "UDP")) { + domain = PF_INET; + type = SOCK_DGRAM; + } else if (0 == strcasecmp(*t, "UDP4")) { + domain = PF_INET; + type = SOCK_DGRAM; + } else if (0 == strcasecmp(*t, "UDP6")) { domain = PF_INET6; type = SOCK_DGRAM; } else { @@ -520,6 +530,63 @@ static Handle Read(const Arguments& args) { return scope.Close(Integer::New(bytes_read)); } +// var info = t.recvfrom(fd, buffer, offset, length, flags); +// info.size // bytes read +// info.port // from port +// info.address // from address +// returns null on EAGAIN or EINTR, raises an exception on all other errors +// returns object otherwise +static Handle RecvFrom(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 5) { + return ThrowException(Exception::TypeError( + String::New("Takes 5 parameters"))); + } + + FD_ARG(args[0]) + + if (!Buffer::HasInstance(args[1])) { + return ThrowException(Exception::TypeError( + String::New("Second argument should be a buffer"))); + } + + Buffer * buffer = ObjectWrap::Unwrap(args[1]->ToObject()); + + size_t off = args[2]->Int32Value(); + if (off >= buffer->length()) { + return ThrowException(Exception::Error( + String::New("Offset is out of bounds"))); + } + + size_t len = args[3]->Int32Value(); + if (off + len > buffer->length()) { + return ThrowException(Exception::Error( + String::New("Length is extends beyond buffer"))); + } + + int flags = args[4]->Int32Value(); + + struct sockaddr_storage address_storage; + socklen_t addrlen = sizeof(struct sockaddr_storage); + + ssize_t bytes_read = recvfrom(fd, (char*)buffer->data() + off, len, flags, + (struct sockaddr*) &address_storage, &addrlen); + + if (bytes_read < 0) { + if (errno == EAGAIN || errno == EINTR) return Null(); + return ThrowException(ErrnoException(errno, "read")); + } + + Local info = Object::New(); + + info->Set(size_symbol, Integer::New(bytes_read)); + + ADDRESS_TO_JS(info, address_storage); + + return scope.Close(info); +} + // bytesRead = t.recvMsg(fd, buffer, offset, length) // if (recvMsg.fd) { @@ -780,6 +847,93 @@ static Handle SendMsg(const Arguments& args) { return scope.Close(Integer::New(written)); } +// var bytes = sendto(fd, buf, off, len, flags, destination port, desitnation address); +// +// Write a buffer with optional offset and length to the given file +// descriptor. Note that we refuse to send 0 bytes. +// +// The 'fd' parameter is a numerical file descriptor, or the undefined value +// to send none. +// +// The 'flags' parameter is a number representing a bitmask of MSG_* values. +// This is passed directly to sendmsg(). +// +// The destination port can either be an int port, or a path. +// +// Returns null on EAGAIN or EINTR, raises an exception on all other errors +static Handle SendTo(const Arguments& args) { + HandleScope scope; + + if (args.Length() < 5) { + return ThrowException(Exception::TypeError( + String::New("Takes 5 or 6 parameters"))); + } + + // The first argument should be a file descriptor + FD_ARG(args[0]) + + // Grab the actul data to be written + if (!Buffer::HasInstance(args[1])) { + return ThrowException(Exception::TypeError( + String::New("Expected either a string or a buffer"))); + } + + Buffer *buf = ObjectWrap::Unwrap(args[1]->ToObject()); + + size_t offset = 0; + if (args.Length() >= 3 && !args[2]->IsUndefined()) { + if (!args[2]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for offset"))); + } + + offset = args[2]->Uint32Value(); + if (offset >= buf->length()) { + return ThrowException(Exception::Error( + String::New("Offset into buffer too large"))); + } + } + + size_t length = buf->length() - offset; + if (args.Length() >= 4 && !args[3]->IsUndefined()) { + if (!args[3]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for length"))); + } + + length = args[3]->Uint32Value(); + if (offset + length > buf->length()) { + return ThrowException(Exception::Error( + String::New("offset + length beyond buffer length"))); + } + } + + int flags = 0; + if (args.Length() >= 5 && !args[4]->IsUndefined()) { + if (!args[4]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for a flags argument"))); + } + + flags = args[4]->Uint32Value(); + } + + Handle error = ParseAddressArgs(args[5], args[6], false); + if (!error.IsEmpty()) return ThrowException(error); + + ssize_t written = sendto(fd, buf->data() + offset, length, flags, addr, addrlen); + + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) return Null(); + return ThrowException(ErrnoException(errno, "sendmsg")); + } + + /* Note that the FD isn't explicitly closed here, this + * happens in the JS */ + + return scope.Close(Integer::New(written)); +} + // Probably only works for Linux TCP sockets? // Returns the amount of data on the read queue. @@ -891,6 +1045,8 @@ void InitNet(Handle target) { NODE_SET_METHOD(target, "read", Read); NODE_SET_METHOD(target, "sendMsg", SendMsg); + NODE_SET_METHOD(target, "recvfrom", RecvFrom); + NODE_SET_METHOD(target, "sendto", SendTo); recv_msg_template = Persistent::New(FunctionTemplate::New(RecvMsg)); @@ -927,6 +1083,7 @@ void InitNet(Handle target) { errno_symbol = NODE_PSYMBOL("errno"); syscall_symbol = NODE_PSYMBOL("syscall"); fd_symbol = NODE_PSYMBOL("fd"); + size_symbol = NODE_PSYMBOL("size"); address_symbol = NODE_PSYMBOL("address"); port_symbol = NODE_PSYMBOL("port"); } diff --git a/test/simple/test-dgram-pingpong.js b/test/simple/test-dgram-pingpong.js new file mode 100644 index 00000000000..61a736ae123 --- /dev/null +++ b/test/simple/test-dgram-pingpong.js @@ -0,0 +1,83 @@ +require("../common"); +var Buffer = require('buffer').Buffer; +var dgram = require("dgram"); + +var tests_run = 0; + +function pingPongTest (port, host) { + var N = 500; + var count = 0; + var sent_final_ping = false; + + var server = dgram.createSocket(function (msg, rinfo) { + puts("connection: " + rinfo.address + ":"+ rinfo.port); + + puts("server got: " + msg); + + if (/PING/.exec(msg)) { + var buf = new Buffer(4); + buf.write('PONG'); + server.send(rinfo.port, rinfo.address, buf, 0, buf.length); + } + + }); + + server.addListener("error", function (e) { + throw e; + }); + + server.bind(port, host); + + server.addListener("listening", function () { + puts("server listening on " + port + " " + host); + + var buf = new Buffer(4); + buf.write('PING'); + + var client = dgram.createSocket(); + + client.addListener("message", function (msg, rinfo) { + puts("client got: " + msg); + assert.equal("PONG", msg.toString('ascii')); + + count += 1; + + if (count < N) { + client.send(port, host, buf, 0, buf.length); + } else { + sent_final_ping = true; + client.send(port, host, buf, 0, buf.length); + process.nextTick(function() { + client.close(); + }); + } + }); + + client.addListener("close", function () { + puts('client.close'); + assert.equal(N, count); + tests_run += 1; + server.close(); + }); + + client.addListener("error", function (e) { + throw e; + }); + + client.send(port, host, buf, 0, buf.length); + count += 1; + }); + +} + +/* All are run at once, so run on different ports */ +pingPongTest(20989, "localhost"); +pingPongTest(20990, "localhost"); +pingPongTest(20988); +pingPongTest(20997, "::1"); +//pingPongTest("/tmp/pingpong.sock"); + +process.addListener("exit", function () { + assert.equal(4, tests_run); + puts('done'); +});