Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Implement datagram sockets
Browse files Browse the repository at this point in the history
- Adds new dgram module, for all data-gram type transports
- Supports both UDP client and servers
- Supports Unix Daemon sockets in DGRAM mode too (think syslog)
- Uses a shared Buffer and slices that as needed to be reasonably
  performant.
- One supplied test program so far, test-dgram-pingpong
- Passes test cases on osx 10.6 and ubuntu 9.10u
  • Loading branch information
pquerna authored and ry committed Jun 12, 2010
1 parent e65e603 commit 02da5ed
Show file tree
Hide file tree
Showing 4 changed files with 425 additions and 0 deletions.
184 changes: 184 additions & 0 deletions lib/dgram.js
@@ -0,0 +1,184 @@
var sys = require("sys");
var fs = require("fs");
var events = require("events");
var dns = require('dns');

var Buffer = require('buffer').Buffer;
var IOWatcher = process.IOWatcher;
var binding = process.binding('net');
var socket = binding.socket;
var bind = binding.bind;
var recvfrom = binding.recvfrom;
var sendto = binding.sendto;
var close = binding.close;
var ENOENT = binding.ENOENT;

function isPort (x) { return parseInt(x) >= 0; }
var pool = null;

function getPool() {
/* TODO: this effectively limits you to 8kb maximum packet sizes */
var minPoolAvail = 1024 * 8;

var poolSize = 1024 * 64;

if (pool === null || (pool.used + minPoolAvail > pool.length)) {
pool = new Buffer(poolSize);
pool.used = 0;
}

return pool;
}

function Socket (listener) {
events.EventEmitter.call(this);
var self = this;

if (listener) {
self.addListener('message', listener);
}

self.watcher = new IOWatcher();
self.watcher.host = self;
self.watcher.callback = function () {
while (self.fd) {
var p = getPool();
var rinfo = recvfrom(self.fd, p, p.used, p.length - p.used, 0);

if (!rinfo) return;

self.emit('message', p.slice(p.used, p.used + rinfo.size), rinfo);

p.used += rinfo.size;
}
};
}

sys.inherits(Socket, events.EventEmitter);
exports.Socket = Socket;

exports.createSocket = function (listener) {
return new Socket(listener);
};

Socket.prototype.bind = function () {
var self = this;
if (self.fd) throw new Error('Server already opened');

if (!isPort(arguments[0])) {
/* TODO: unix path dgram */
self.fd = socket('unix_dgram');
self.type = 'unix_dgram';
var path = arguments[0];
self.path = path;
// unlink sockfile if it exists
fs.stat(path, function (err, r) {
if (err) {
if (err.errno == ENOENT) {
bind(self.fd, path);
process.nextTick(function() {
self._startWatcher();
});
} else {
throw r;
}
} else {
if (!r.isFile()) {
throw new Error("Non-file exists at " + path);
} else {
fs.unlink(path, function (err) {
if (err) {
throw err;
} else {
bind(self.fd, path);
process.nextTick(function() {
self._startWatcher();
});
}
});
}
}
});
} else if (!arguments[1]) {
// Don't bind(). OS will assign a port with INADDR_ANY.
// The port can be found with server.address()
self.type = 'udp4';
self.fd = socket(self.type);
bind(self.fd, arguments[0]);
process.nextTick(function() {
self._startWatcher();
});
} else {
// the first argument is the port, the second an IP
var port = arguments[0];
dns.lookup(arguments[1], function (err, ip, addressType) {
if (err) {
self.emit('error', err);
} else {
self.type = addressType == 4 ? 'udp4' : 'udp6';
self.fd = socket(self.type);
bind(self.fd, port, ip);
process.nextTick(function() {
self._startWatcher();
});
}
});
}
};

Socket.prototype._startWatcher = function () {
this.watcher.set(this.fd, true, false);
this.watcher.start();
this.emit("listening");
};

Socket.prototype.address = function () {
return getsockname(this.fd);
};

Socket.prototype.send = function(port, addr, buffer, offset, length) {
var self = this;

if (!isPort(arguments[0])) {
if (!self.fd) {
self.type = 'unix_dgram';
self.fd = socket(self.type);
}
sendto(self.fd, buffer, offset, length, 0, port, addr);
}
else {
dns.lookup(arguments[1], function (err, ip, addressType) {
if (err) {
self.emit('error', err);
} else {
if (!self.fd) {
self.type = addressType == 4 ? 'udp4' : 'udp6';
self.fd = socket(self.type);
process.nextTick(function() {
self._startWatcher();
});
}
sendto(self.fd, buffer, offset, length, 0, port, ip);
}
});
}
};

Socket.prototype.close = function () {
var self = this;

if (!self.fd) throw new Error('Not running');

self.watcher.stop();

close(self.fd);
self.fd = null;

if (self.type === "unix_dgram") {
fs.unlink(self.path, function () {
self.emit("close");
});
} else {
self.emit("close");
}
};
1 change: 1 addition & 0 deletions src/node.cc
Expand Up @@ -1845,6 +1845,7 @@ static Handle<Value> Binding(const Arguments& args) {
exports->Set(String::New("assert"), String::New(native_assert));
exports->Set(String::New("buffer"), String::New(native_buffer));
exports->Set(String::New("child_process"),String::New(native_child_process));
exports->Set(String::New("dgram"), String::New(native_dgram));
exports->Set(String::New("dns"), String::New(native_dns));
exports->Set(String::New("events"), String::New(native_events));
exports->Set(String::New("file"), String::New(native_file));
Expand Down
157 changes: 157 additions & 0 deletions src/node_net.cc
Expand Up @@ -42,6 +42,7 @@ static Persistent<String> errno_symbol;
static Persistent<String> syscall_symbol;

static Persistent<String> fd_symbol;
static Persistent<String> size_symbol;
static Persistent<String> address_symbol;
static Persistent<String> port_symbol;
static Persistent<String> type_symbol;
Expand Down Expand Up @@ -147,7 +148,16 @@ static Handle<Value> Socket(const Arguments& args) {
} else if (0 == strcasecmp(*t, "UNIX")) {
domain = PF_UNIX;
type = SOCK_STREAM;
} else if (0 == strcasecmp(*t, "UNIX_DGRAM")) {
domain = PF_UNIX;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP")) {
domain = PF_INET;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP4")) {
domain = PF_INET;
type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP6")) {
domain = PF_INET6;
type = SOCK_DGRAM;
} else {
Expand Down Expand Up @@ -520,6 +530,63 @@ static Handle<Value> Read(const Arguments& args) {
return scope.Close(Integer::New(bytes_read));
}

// var info = t.recvfrom(fd, buffer, offset, length, flags);
// info.size // bytes read
// info.port // from port
// info.address // from address
// returns null on EAGAIN or EINTR, raises an exception on all other errors
// returns object otherwise
static Handle<Value> RecvFrom(const Arguments& args) {
HandleScope scope;

if (args.Length() < 5) {
return ThrowException(Exception::TypeError(
String::New("Takes 5 parameters")));
}

FD_ARG(args[0])

if (!Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Second argument should be a buffer")));
}

Buffer * buffer = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());

size_t off = args[2]->Int32Value();
if (off >= buffer->length()) {
return ThrowException(Exception::Error(
String::New("Offset is out of bounds")));
}

size_t len = args[3]->Int32Value();
if (off + len > buffer->length()) {
return ThrowException(Exception::Error(
String::New("Length is extends beyond buffer")));
}

int flags = args[4]->Int32Value();

struct sockaddr_storage address_storage;
socklen_t addrlen = sizeof(struct sockaddr_storage);

ssize_t bytes_read = recvfrom(fd, (char*)buffer->data() + off, len, flags,
(struct sockaddr*) &address_storage, &addrlen);

if (bytes_read < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "read"));
}

Local<Object> info = Object::New();

info->Set(size_symbol, Integer::New(bytes_read));

ADDRESS_TO_JS(info, address_storage);

return scope.Close(info);
}


// bytesRead = t.recvMsg(fd, buffer, offset, length)
// if (recvMsg.fd) {
Expand Down Expand Up @@ -780,6 +847,93 @@ static Handle<Value> SendMsg(const Arguments& args) {
return scope.Close(Integer::New(written));
}

// var bytes = sendto(fd, buf, off, len, flags, destination port, desitnation address);
//
// Write a buffer with optional offset and length to the given file
// descriptor. Note that we refuse to send 0 bytes.
//
// The 'fd' parameter is a numerical file descriptor, or the undefined value
// to send none.
//
// The 'flags' parameter is a number representing a bitmask of MSG_* values.
// This is passed directly to sendmsg().
//
// The destination port can either be an int port, or a path.
//
// Returns null on EAGAIN or EINTR, raises an exception on all other errors
static Handle<Value> SendTo(const Arguments& args) {
HandleScope scope;

if (args.Length() < 5) {
return ThrowException(Exception::TypeError(
String::New("Takes 5 or 6 parameters")));
}

// The first argument should be a file descriptor
FD_ARG(args[0])

// Grab the actul data to be written
if (!Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(
String::New("Expected either a string or a buffer")));
}

Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());

size_t offset = 0;
if (args.Length() >= 3 && !args[2]->IsUndefined()) {
if (!args[2]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for offset")));
}

offset = args[2]->Uint32Value();
if (offset >= buf->length()) {
return ThrowException(Exception::Error(
String::New("Offset into buffer too large")));
}
}

size_t length = buf->length() - offset;
if (args.Length() >= 4 && !args[3]->IsUndefined()) {
if (!args[3]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for length")));
}

length = args[3]->Uint32Value();
if (offset + length > buf->length()) {
return ThrowException(Exception::Error(
String::New("offset + length beyond buffer length")));
}
}

int flags = 0;
if (args.Length() >= 5 && !args[4]->IsUndefined()) {
if (!args[4]->IsUint32()) {
return ThrowException(Exception::TypeError(
String::New("Expected unsigned integer for a flags argument")));
}

flags = args[4]->Uint32Value();
}

Handle<Value> error = ParseAddressArgs(args[5], args[6], false);
if (!error.IsEmpty()) return ThrowException(error);

ssize_t written = sendto(fd, buf->data() + offset, length, flags, addr, addrlen);

if (written < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
return ThrowException(ErrnoException(errno, "sendmsg"));
}

/* Note that the FD isn't explicitly closed here, this
* happens in the JS */

return scope.Close(Integer::New(written));
}


// Probably only works for Linux TCP sockets?
// Returns the amount of data on the read queue.
Expand Down Expand Up @@ -891,6 +1045,8 @@ void InitNet(Handle<Object> target) {
NODE_SET_METHOD(target, "read", Read);

NODE_SET_METHOD(target, "sendMsg", SendMsg);
NODE_SET_METHOD(target, "recvfrom", RecvFrom);
NODE_SET_METHOD(target, "sendto", SendTo);

recv_msg_template =
Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
Expand Down Expand Up @@ -927,6 +1083,7 @@ void InitNet(Handle<Object> target) {
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
size_symbol = NODE_PSYMBOL("size");
address_symbol = NODE_PSYMBOL("address");
port_symbol = NODE_PSYMBOL("port");
}
Expand Down

0 comments on commit 02da5ed

Please sign in to comment.