Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

tcp: write and read

  • Loading branch information...
commit 20d208fe605df40f3332d7c2a1f27b259d54640d 1 parent 8bdd9c6
@indutny authored
2  deps/candor
@@ -1 +1 @@
-Subproject commit e245d592fe52cd3a1372f031f5410452a2c1050c
+Subproject commit 144fb753e56ec493a387cdbccbda867ff8afdf26
View
5 lib/events.can
@@ -1,11 +1,10 @@
// EventEmitter
exports = {}
-EventEmitter = exports.EventEmitter = {
- _listeners: {}
-}
+EventEmitter = exports.EventEmitter = {}
EventEmitter.init = (self) {
+ self._listeners = {}
}
EventEmitter.on = (self, event, listener) {
View
40 lib/tcp.can
@@ -1,19 +1,17 @@
// TCP Module
tcp = global._bindings.tcp
+buffer = global._bindings.buffer
EventEmitter = global._natives.events.EventEmitter
exports = {}
-Socket = exports.Socket = clone EventEmitter
-Socket.init = (self, handle) {
- EventEmitter.init(self)
- self._handle = hande
-}
-
Server = exports.Server = clone EventEmitter
+
Server.init = (self, listener) {
+ EventEmitter.init(self)
self._handle = tcp.new()
+ self._isServer = true
if (listener) self:on('connection', listener)
@@ -22,7 +20,9 @@ Server.init = (self, listener) {
Server.listen = (self, port, host) {
tcp.bind(self._handle, port, host || '0.0.0.0')
- tcp.listen(self._handle, 511, (handle) {
+ tcp.listen(self._handle, 511, (err, handle) {
+ if (err) return self:emit('error', err)
+
socket = clone Socket
self:emit('connection', socket:init(handle))
})
@@ -30,6 +30,11 @@ Server.listen = (self, port, host) {
return self
}
+exports.createServer = (listener) {
+ s = clone Server
+ return s:init(listener)
+}
+
Server.close = (self, cb) {
if (cb) self:on('close', cb)
tcp.close(self._handle, () {
@@ -37,9 +42,24 @@ Server.close = (self, cb) {
})
}
-exports.createServer = (listener) {
- s = clone Server
- return s:init(listener)
+exports.Socket = Socket = clone EventEmitter
+
+Socket.init = (self, handle) {
+ EventEmitter.init(self)
+ self._handle = handle
+ self._isServer = false
+
+ tcp.readStart(handle, (size, data) {
+ if (size > 0) self:emit('data', data)
+ })
+
+ return self
}
+Socket.write = (self, data, cb) {
+ return tcp.write(self._handle, buffer.new(data), cb)
+}
+
+Socket.close = Server.close
+
return exports
View
6 src/bindings/buffer.cc
@@ -20,6 +20,12 @@ Value* Buffer::New(uint32_t argc, Value** argv) {
if (argv[0]->Is<Number>()) {
b = new Buffer(argv[0]->As<Number>()->IntegralValue());
+ } else if (argv[0]->Is<String>()) {
+ String* str = argv[0]->As<String>();
+ b = new Buffer(str->Length());
+ memcpy(b->data(), str->Value(), b->size());
+ } else if (Buffer::HasInstance(argv[0])) {
+ return argv[0];
} else {
return Nil::New();
}
View
14 src/bindings/buffer.h
@@ -10,11 +10,22 @@ namespace can {
class Buffer : public candor::CWrapper {
public:
Buffer(ssize_t size) : candor::CWrapper(&magic), size_(size > 0 ? size : 0) {
+ if (size_ == 0) {
+ data_ = NULL;
+ allocated_ = false;
+ }
data_ = new char[size_];
+ allocated_ = true;
+ }
+
+ Buffer(char* data, ssize_t size) : candor::CWrapper(&magic),
+ allocated_(true),
+ data_(data),
+ size_(size) {
}
~Buffer() {
- delete[] data_;
+ if (allocated_) delete[] data_;
data_ = NULL;
}
@@ -35,6 +46,7 @@ class Buffer : public candor::CWrapper {
static candor::Value* Slice(uint32_t argc, candor::Value** argv);
static candor::Value* Concat(uint32_t argc, candor::Value** argv);
+ bool allocated_;
char* data_;
ssize_t size_;
};
View
105 src/bindings/tcp.cc
@@ -6,6 +6,7 @@ namespace can {
using namespace candor;
const int TCP::magic = 0;
+const int TCPWrite::magic = 0;
bool TCP::HasInstance(Value* value) {
@@ -91,9 +92,104 @@ void TCP::OnClose(uv_handle_t* handle) {
if (!s->close_cb_.IsEmpty()) {
s->close_cb_->Call(0, NULL);
- s->close_cb_.Unwrap();
- s->Unref();
}
+ s->Unref();
+ s->connection_cb_.Unwrap();
+ s->close_cb_.Unwrap();
+ s->read_cb_.Unwrap();
+}
+
+
+uv_buf_t TCP::AllocCallback(uv_handle_t* handle, size_t size) {
+ // XXX Use slab allocator
+ uv_buf_t res;
+ res.base = new char[size];
+ res.len = size;
+
+ return res;
+}
+
+
+void TCP::ReadCallback(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
+ TCP* t = reinterpret_cast<TCP*>(stream->data);
+
+ Buffer* b = new Buffer(buf.base, nread);
+
+ Value* argv[2] = { Number::NewIntegral(nread), b->Wrap() };
+ if (!t->read_cb_.IsEmpty()) t->read_cb_->Call(2, argv);
+}
+
+
+Value* TCP::ReadStart(uint32_t argc, Value** argv) {
+ if (argc < 2 || !TCP::HasInstance(argv[0]) || !argv[1]->Is<Function>()) {
+ return Nil::New();
+ }
+
+ TCP* t = CWrapper::Unwrap<TCP>(argv[0]);
+ Function* cb = argv[1]->As<Function>();
+
+ t->read_cb_.Wrap(cb);
+ int r = uv_read_start(reinterpret_cast<uv_stream_t*>(&t->handle_),
+ AllocCallback,
+ ReadCallback);
+
+ return Number::NewIntegral(r);
+}
+
+
+TCPWrite::TCPWrite(TCP* tcp, Buffer* b, Function* cb) : CWrapper(&magic),
+ tcp_(tcp),
+ b_(b),
+ cb_(cb) {
+ Ref();
+ b->Ref();
+
+ buf_.base = b->data();
+ buf_.len = b->size();
+
+ req_.data = this;
+ tcp_->Ref();
+ uv_write(&req_,
+ reinterpret_cast<uv_stream_t*>(&tcp_->handle_),
+ &buf_,
+ 1,
+ OnWrite);
+}
+
+
+TCPWrite::~TCPWrite() {
+ b_->Unref();
+ tcp_->Unref();
+}
+
+
+void TCPWrite::OnWrite(uv_write_t* req, int status) {
+ Value* argv[1] = { Nil::New() };
+ if (status) {
+ argv[0] = Number::NewIntegral(status);
+ }
+
+ TCPWrite* w = reinterpret_cast<TCPWrite*>(req->data);
+ w->cb_->Call(2, argv);
+ w->Unref();
+}
+
+
+Value* TCP::Write(uint32_t argc, Value** argv) {
+ if (argc < 3 ||
+ !TCP::HasInstance(argv[0]) ||
+ !Buffer::HasInstance(argv[1]) ||
+ !argv[2]->Is<Function>()) {
+ return Nil::New();
+ }
+
+ TCP* t = CWrapper::Unwrap<TCP>(argv[0]);
+ Buffer* b = CWrapper::Unwrap<Buffer>(argv[1]);
+ Function* cb = argv[2]->As<Function>();
+
+ new TCPWrite(t, b, cb);
+
+ return Nil::New();
}
@@ -106,10 +202,11 @@ Value* TCP::Close(uint32_t argc, Value** argv) {
TCP* t = CWrapper::Unwrap<TCP>(argv[0]);
// Already closing/closed - just return
- if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&t->handle_))) {
+ if (t->closing_) {
return Number::NewIntegral(-1);
}
+ t->closing_ = true;
t->close_cb_.Wrap(cb);
uv_close(reinterpret_cast<uv_handle_t*>(&t->handle_), OnClose);
@@ -122,6 +219,8 @@ void TCP::Init(Object* target) {
target->Set("bind", Function::New(TCP::Bind));
target->Set("listen", Function::New(TCP::Listen));
target->Set("close", Function::New(TCP::Close));
+ target->Set("readStart", Function::New(TCP::ReadStart));
+ target->Set("write", Function::New(TCP::Write));
}
} // namespace can
View
32 src/bindings/tcp.h
@@ -3,14 +3,18 @@
#include <candor.h>
#include <uv.h>
+#include "buffer.h" // Buffer
#include <sys/types.h> // size_t
#include <stdlib.h> // NULL
namespace can {
+// Forward declarations
+class TCPWrite;
+
class TCP : public candor::CWrapper {
public:
- TCP() : candor::CWrapper(&magic) {
+ TCP() : candor::CWrapper(&magic), closing_(false) {
Ref();
}
@@ -24,16 +28,42 @@ class TCP : public candor::CWrapper {
protected:
static void OnConnection(uv_stream_t* server, int status);
static void OnClose(uv_handle_t* server);
+ static uv_buf_t AllocCallback(uv_handle_t* handle, size_t size);
+ static void ReadCallback(uv_stream_t* stream, ssize_t nread, uv_buf_t buf);
static candor::Value* New(uint32_t argc, candor::Value** argv);
static candor::Value* Bind(uint32_t argc, candor::Value** argv);
static candor::Value* Listen(uint32_t argc, candor::Value** argv);
static candor::Value* Close(uint32_t argc, candor::Value** argv);
+ static candor::Value* ReadStart(uint32_t argc, candor::Value** argv);
+ static candor::Value* Write(uint32_t argc, candor::Value** argv);
+ bool closing_;
uv_tcp_t handle_;
candor::Handle<candor::Function> connection_cb_;
candor::Handle<candor::Function> close_cb_;
+ candor::Handle<candor::Function> read_cb_;
+
+ friend class TCPWrite;
+};
+
+class TCPWrite : public candor::CWrapper {
+ public:
+ TCPWrite(TCP* tcp, Buffer* b, candor::Function* cb);
+ ~TCPWrite();
+
+ // Magic word
+ static const int magic;
+
+ protected:
+ static void OnWrite(uv_write_t* req, int status);
+
+ TCP* tcp_;
+ Buffer* b_;
+ candor::Handle<candor::Function> cb_;
+ uv_write_t req_;
+ uv_buf_t buf_;
};
} // namespace can
Please sign in to comment.
Something went wrong with that request. Please try again.