Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

wip: writable onebyte v8 strings

  • Loading branch information...
commit 2d605ba4308d7028d0a4a6cd25c463a9a9eb01f2 1 parent b9655fc
@bnoordhuis authored
View
123 benchmark/net/tcps-raw-pipe.js
@@ -0,0 +1,123 @@
+// In this benchmark, we connect a client to the server, and write
+// as many bytes as we can in the specified time (default = 10s)
+
+var common = require('../common.js');
+
+// if there are --dur=N and --len=N args, then
+// run the function with those settings.
+// if not, then queue up a bunch of child processes.
+var bench = common.createBenchmark(main, {
+ len: [102400, 1024 * 1024 * 16],
+ type: ['asc'],
+ dur: [5]
+});
+
+var TCP = process.binding('tcps_wrap').TCPS;
+var PORT = common.PORT;
+
+var dur;
+var len;
+var type;
+
+function main(conf) {
+ dur = +conf.dur;
+ len = +conf.len;
+ type = conf.type;
+ server();
+}
+
+
+function fail(syscall) {
+ var errno = process._errno;
+ var e = new Error(syscall + ' ' + errno);
+ e.errno = e.code = errno;
+ e.syscall = syscall;
+ throw e;
+}
+
+function server() {
+ var serverHandle = new TCP();
+ var rc;
+
+ rc = serverHandle.bind('127.0.0.1', PORT);
+ if (rc)
+ fail('bind');
+
+ rc = serverHandle.listen(511);
+ if (rc)
+ fail('listen');
+
+ serverHandle.onconnection = function(clientHandle) {
+ if (!clientHandle)
+ fail('connect');
+
+ clientHandle.onwrite = function(status) {
+ if (status)
+ fail('write');
+ };
+
+ clientHandle.onread = function(string, length) {
+ // we're not expecting to ever get an EOF from the client.
+ // just lots of data forever.
+ if (length < 0)
+ fail('read');
+
+ console.error('' + string);
+ var rc = clientHandle.write(string, length);
+ if (!rc)
+ fail('write');
+ console.error('server write');
+ };
+
+ clientHandle.readStart();
+ };
+
+ client();
+}
+
+function client() {
+ var chunk;
+ switch (type) {
+ case 'asc':
+ // latin1 strings are contigous chunks of memory when freshly allocated
+ chunk = Buffer(new Array(len + 1).join('x')).toString('latin1');
+ break;
+ default:
+ throw new Error('invalid type: ' + type);
+ break;
+ }
+
+ var clientHandle = new TCP();
+ var bytes = 0;
+ var rc;
+
+ rc = clientHandle.connect('127.0.0.1', PORT);
+ if (rc)
+ fail('connect');
+
+ rc = clientHandle.readStart();
+ if (rc)
+ fail('readStart');
+
+ clientHandle.onread = function(string, length) {
+ bytes += length;
+ };
+
+ clientHandle.onconnect = function() {
+ bench.start();
+
+ setTimeout(function() {
+ // multiply by 2 since we're sending it first one way
+ // then then back again.
+ bench.end(2 * (bytes * 8) / (1024 * 1024 * 1024));
+ }, dur * 1000);
+
+ write();
+ };
+
+ clientHandle.onwrite = write;
+
+ function write() {
+ clientHandle.write(chunk, chunk.length);
+ }
+}
View
6 deps/v8/include/v8.h
@@ -1179,6 +1179,7 @@ class V8EXPORT String : public Primitive {
int length = -1,
int* nchars_ref = NULL,
int options = NO_OPTIONS) const;
+ uint8_t* UnsafeMutablePointer();
/**
* A zero length string.
@@ -1300,6 +1301,11 @@ class V8EXPORT String : public Primitive {
V8_INLINE(static String* Cast(v8::Value* obj));
/**
+ * Allocates raw, unitialized memory for a string.
+ */
+ static Local<String> New(int length);
+
+ /**
* Allocates a new string from either UTF-8 encoded or ASCII data.
* The second parameter 'length' gives the buffer length. If omitted,
* the function calls 'strlen' to determine the buffer length.
View
21 deps/v8/src/api.cc
@@ -5193,6 +5193,27 @@ Local<String> v8::String::New(const char* data, int length) {
}
+Local<String> v8::String::New(int length) {
+ ASSERT(length >= 0);
+ i::Isolate* isolate = i::Isolate::Current();
+ EnsureInitializedForIsolate(isolate, "v8::String::New()");
+ LOG_API(isolate, "String::New(int)");
+ if (length == 0) return Empty();
+ ENTER_V8(isolate);
+ i::Handle<i::String> result =
+ isolate->factory()->NewStringFromOneByte(length);
+ return Utils::ToLocal(result);
+}
+
+
+uint8_t* v8::String::UnsafeMutablePointer() {
+ i::Isolate* isolate = Utils::OpenHandle(this)->GetIsolate();
+ if (IsDeadCheck(isolate, "v8::String::UnsafeMutablePointer()")) return 0;
+ i::Handle<i::Object> obj = Utils::OpenHandle(this);
+ return i::SeqOneByteString::cast(*obj)->GetChars();
+}
+
+
Local<String> v8::String::Concat(Handle<String> left, Handle<String> right) {
i::Handle<i::String> left_string = Utils::OpenHandle(*left);
i::Isolate* isolate = left_string->GetIsolate();
View
10 deps/v8/src/factory.cc
@@ -199,6 +199,15 @@ Handle<String> Factory::InternalizeTwoByteString(Vector<const uc16> string) {
}
+Handle<String> Factory::NewStringFromOneByte(int length,
+ PretenureFlag pretenure) {
+ CALL_HEAP_FUNCTION(
+ isolate(),
+ isolate()->heap()->AllocateRawOneByteString(length, pretenure),
+ String);
+}
+
+
Handle<String> Factory::NewStringFromOneByte(Vector<const uint8_t> string,
PretenureFlag pretenure) {
CALL_HEAP_FUNCTION(
@@ -207,6 +216,7 @@ Handle<String> Factory::NewStringFromOneByte(Vector<const uint8_t> string,
String);
}
+
Handle<String> Factory::NewStringFromUtf8(Vector<const char> string,
PretenureFlag pretenure) {
CALL_HEAP_FUNCTION(
View
5 deps/v8/src/factory.h
@@ -113,6 +113,11 @@ class Factory {
// two byte.
//
// ASCII strings are pretenured when used as keys in the SourceCodeCache.
+
+ // Allocates raw uninitialized memory for a one byte string.
+ Handle<String> NewStringFromOneByte(
+ int length,
+ PretenureFlag pretenure = NOT_TENURED);
Handle<String> NewStringFromOneByte(
Vector<const uint8_t> str,
PretenureFlag pretenure = NOT_TENURED);
View
11 lib/buffer.js
@@ -60,6 +60,9 @@ SlowBuffer.prototype.toString = function(encoding, start, end) {
case 'hex':
return this.hexSlice(start, end);
+ case 'latin1':
+ return this.latin1Slice(start, end);
+
case 'utf8':
case 'utf-8':
return this.utf8Slice(start, end);
@@ -409,6 +412,9 @@ Buffer.prototype.toString = function(encoding, start, end) {
case 'hex':
return this.parent.hexSlice(start, end);
+ case 'latin1':
+ return this.parent.latin1Slice(start, end);
+
case 'utf8':
case 'utf-8':
return this.parent.utf8Slice(start, end);
@@ -568,6 +574,11 @@ Buffer.prototype.asciiWrite = function(string, offset) {
return this.write(string, offset, 'ascii');
};
+// Not legacy but here for consistency.
+Buffer.prototype.latin1Slice = function(start, end) {
+ return this.toString('latin1', start, end);
+};
+
/*
* Need to make sure that buffer isn't trying to write out of bounds.
View
1  node.gyp
@@ -99,6 +99,7 @@
'src/stream_wrap.cc',
'src/slab_allocator.cc',
'src/tcp_wrap.cc',
+ 'src/tcps_wrap.cc',
'src/timer_wrap.cc',
'src/tty_wrap.cc',
'src/process_wrap.cc',
View
16 src/node_buffer.cc
@@ -370,6 +370,19 @@ Handle<Value> Buffer::AsciiSlice(const Arguments &args) {
}
+Handle<Value> Buffer::Latin1Slice(const Arguments &args) {
+ HandleScope scope(node_isolate);
+ Buffer* parent = ObjectWrap::Unwrap<Buffer>(args.This());
+ SLICE_ARGS(args[0], args[1])
+ char* data = parent->data_ + start;
+ int length = end - start;
+ Local<String> string = String::New(length);
+ uint8_t* ptr = string->UnsafeMutablePointer();
+ memcpy(ptr, data, length);
+ return scope.Close(string);
+}
+
+
Handle<Value> Buffer::Utf8Slice(const Arguments &args) {
HandleScope scope(node_isolate);
Buffer *parent = ObjectWrap::Unwrap<Buffer>(args.This());
@@ -1108,6 +1121,9 @@ void Buffer::Initialize(Handle<Object> target) {
// TODO NODE_SET_PROTOTYPE_METHOD(t, "utf16Slice", Utf16Slice);
// copy
NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Slice", Buffer::Utf8Slice);
+ NODE_SET_PROTOTYPE_METHOD(constructor_template,
+ "latin1Slice",
+ Buffer::Latin1Slice);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Write", Buffer::Utf8Write);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "asciiWrite", Buffer::AsciiWrite);
View
1  src/node_buffer.h
@@ -116,6 +116,7 @@ class NODE_EXTERN Buffer: public ObjectWrap {
static v8::Handle<v8::Value> BinarySlice(const v8::Arguments &args);
static v8::Handle<v8::Value> AsciiSlice(const v8::Arguments &args);
static v8::Handle<v8::Value> Base64Slice(const v8::Arguments &args);
+ static v8::Handle<v8::Value> Latin1Slice(const v8::Arguments &args);
static v8::Handle<v8::Value> Utf8Slice(const v8::Arguments &args);
static v8::Handle<v8::Value> Ucs2Slice(const v8::Arguments &args);
static v8::Handle<v8::Value> HexSlice(const v8::Arguments &args);
View
1  src/node_extensions.h
@@ -34,6 +34,7 @@ NODE_EXT_LIST_ITEM(node_zlib)
// libuv rewrite
NODE_EXT_LIST_ITEM(node_timer_wrap)
NODE_EXT_LIST_ITEM(node_tcp_wrap)
+NODE_EXT_LIST_ITEM(node_tcps_wrap) // string instead of buffer based
NODE_EXT_LIST_ITEM(node_udp_wrap)
NODE_EXT_LIST_ITEM(node_pipe_wrap)
NODE_EXT_LIST_ITEM(node_cares_wrap)
View
254 src/tcps_wrap.cc
@@ -0,0 +1,254 @@
+#include "node.h"
+
+namespace node {
+
+#define SET_ERRNO() \
+ do { \
+ uv_err_t err = uv_last_error(uv_default_loop()); \
+ SetErrno(err); \
+ } \
+ while (0)
+
+using namespace v8;
+
+class TCPSWrap {
+public:
+ static void Initialize(Handle<Object> target);
+ static Handle<Value> New(const Arguments& args);
+ static Handle<Value> Bind(const Arguments& args);
+ static Handle<Value> Listen(const Arguments& args);
+ static Handle<Value> Connect(const Arguments& args);
+ static Handle<Value> ReadStart(const Arguments& args);
+ static Handle<Value> Write(const Arguments& args);
+ static Handle<Value> Close(const Arguments& args);
+private:
+ static void OnConnection(uv_stream_t* handle, int status);
+ static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size);
+ static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
+ static void OnWrite(uv_write_t* req, int status);
+ static void OnConnect(uv_connect_t* req, int status);
+ static void OnClose(uv_handle_t* handle);
+ static void WeakCallback(Isolate*, Persistent<Value>, void* arg);
+ static Persistent<Function> constructor_;
+ static Persistent<String> onconnection_sym_;
+ static Persistent<String> onconnect_sym_;
+ static Persistent<String> onwrite_sym_;
+ static Persistent<String> onread_sym_;
+ Persistent<String> write_str_;
+ Persistent<String> read_str_;
+ Persistent<Object> object_;
+ uv_connect_t connect_req_;
+ uv_write_t write_req_;
+ uv_tcp_t handle_;
+};
+
+Persistent<Function> TCPSWrap::constructor_;
+Persistent<String> TCPSWrap::onconnection_sym_;
+Persistent<String> TCPSWrap::onconnect_sym_;
+Persistent<String> TCPSWrap::onwrite_sym_;
+Persistent<String> TCPSWrap::onread_sym_;
+
+
+void TCPSWrap::Initialize(Handle<Object> target) {
+ HandleScope handle_scope(node_isolate);
+ Local<FunctionTemplate> t = FunctionTemplate::New(New);
+ t->InstanceTemplate()->SetInternalFieldCount(1);
+ t->SetClassName(String::New("TCPS"));
+ t->PrototypeTemplate()->Set(String::New("bind"),
+ FunctionTemplate::New(Bind)->GetFunction());
+ t->PrototypeTemplate()->Set(String::New("listen"),
+ FunctionTemplate::New(Listen)->GetFunction());
+ t->PrototypeTemplate()->Set(String::New("connect"),
+ FunctionTemplate::New(Connect)->GetFunction());
+ t->PrototypeTemplate()->Set(String::New("readStart"),
+ FunctionTemplate::New(ReadStart)->GetFunction());
+ t->PrototypeTemplate()->Set(String::New("write"),
+ FunctionTemplate::New(Write)->GetFunction());
+ t->PrototypeTemplate()->Set(String::New("close"),
+ FunctionTemplate::New(Close)->GetFunction());
+ target->Set(String::New("TCPS"), t->GetFunction());
+ constructor_ = Persistent<Function>::New(node_isolate, t->GetFunction());
+ onconnection_sym_ = Persistent<String>::New(node_isolate,
+ String::New("onconnection"));
+ onconnect_sym_ = Persistent<String>::New(node_isolate,
+ String::New("onconnect"));
+ onwrite_sym_ = Persistent<String>::New(node_isolate, String::New("onwrite"));
+ onread_sym_ = Persistent<String>::New(node_isolate, String::New("onread"));
+}
+
+
+Handle<Value> TCPSWrap::New(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ assert(args.IsConstructCall() == true);
+ TCPSWrap* w = new TCPSWrap;
+ uv_tcp_init(uv_default_loop(), &w->handle_);
+ w->object_ = Persistent<Object>::New(node_isolate, args.This());
+ w->object_->SetAlignedPointerInInternalField(0, w);
+ return w->object_;
+}
+
+
+Handle<Value> TCPSWrap::Bind(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ assert(args.Length() == 2);
+ String::Utf8Value address(args[0]);
+ uint32_t port = args[1]->Uint32Value();
+ assert(address.length() != 0);
+ assert(port <= 0xffff);
+ sockaddr_in sa = uv_ip4_addr(*address, port);
+ int rc = uv_tcp_bind(&w->handle_, sa);
+ if (rc) SET_ERRNO();
+ return handle_scope.Close(Integer::New(rc));
+}
+
+
+Handle<Value> TCPSWrap::Listen(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ assert(args.Length() == 1);
+ uint32_t backlog = args[0]->Uint32Value();
+ int rc = uv_listen(reinterpret_cast<uv_stream_t*>(&w->handle_),
+ backlog,
+ OnConnection);
+ if (rc) SET_ERRNO();
+ return handle_scope.Close(Integer::New(rc));
+}
+
+
+Handle<Value> TCPSWrap::Connect(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ assert(args.Length() == 2);
+ String::Utf8Value address(args[0]);
+ uint32_t port = args[1]->Uint32Value();
+ assert(address.length() != 0);
+ assert(port <= 0xffff);
+ sockaddr_in sa = uv_ip4_addr(*address, port);
+ int rc = uv_tcp_connect(&w->connect_req_, &w->handle_, sa, OnConnect);
+ if (rc) SET_ERRNO();
+ return handle_scope.Close(Integer::New(rc));
+}
+
+
+Handle<Value> TCPSWrap::ReadStart(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ int rc = uv_read_start(reinterpret_cast<uv_stream_t*>(&w->handle_),
+ OnAlloc,
+ OnRead);
+ if (rc) SET_ERRNO();
+ return handle_scope.Close(Integer::New(rc));
+}
+
+
+Handle<Value> TCPSWrap::Write(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ assert(args.Length() == 2);
+ assert(args[0]->IsString());
+ assert(args[1]->IsUint32());
+ assert(w->write_str_.IsEmpty()); // Only one write at a time.
+ w->write_str_ = Persistent<String>::New(node_isolate, args[0].As<String>());
+ uint8_t* data = w->write_str_->UnsafeMutablePointer();
+ uint32_t length = args[1]->Uint32Value();
+ uv_buf_t buf = { reinterpret_cast<char*>(data), length };
+ int rc = uv_write(&w->write_req_,
+ reinterpret_cast<uv_stream_t*>(&w->handle_),
+ &buf,
+ 1,
+ OnWrite);
+ if (rc) SET_ERRNO();
+ return handle_scope.Close(Integer::New(rc));
+}
+
+
+Handle<Value> TCPSWrap::Close(const Arguments& args) {
+ HandleScope handle_scope(node_isolate);
+ void* ptr = args.This()->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* w = static_cast<TCPSWrap*>(ptr);
+ uv_close(reinterpret_cast<uv_handle_t*>(&w->handle_), OnClose);
+ w->object_->SetAlignedPointerInInternalField(0, NULL);
+ return Undefined();
+}
+
+
+void TCPSWrap::OnConnection(uv_stream_t* handle, int status) {
+ HandleScope handle_scope(node_isolate);
+ TCPSWrap* w = container_of(handle, TCPSWrap, handle_);
+ Local<Object> conn = constructor_->NewInstance();
+ void* ptr = conn->GetAlignedPointerFromInternalField(0);
+ TCPSWrap* nw = static_cast<TCPSWrap*>(ptr);
+ int rc = uv_accept(reinterpret_cast<uv_stream_t*>(&w->handle_),
+ reinterpret_cast<uv_stream_t*>(&nw->handle_));
+ assert(rc == 0);
+ MakeCallback(w->object_,
+ onconnection_sym_,
+ 1,
+ reinterpret_cast<Handle<Value>*>(&conn));
+}
+
+
+uv_buf_t TCPSWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
+ HandleScope handle_scope(node_isolate);
+ TCPSWrap* w = container_of(handle, TCPSWrap, handle_);
+ // TODO(bnoordhuis) Limit suggested_size to something reasonable.
+ w->read_str_ = Persistent<String>::New(node_isolate,
+ String::New(suggested_size));
+ uint8_t* data = w->read_str_->UnsafeMutablePointer();
+ uv_buf_t buf = { reinterpret_cast<char*>(data), suggested_size };
+ return buf;
+}
+
+
+void TCPSWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
+ HandleScope handle_scope(node_isolate);
+ TCPSWrap* w = container_of(handle, TCPSWrap, handle_);
+ Local<Integer> read_len = Integer::New(nread);
+ Local<String> read_str = *w->read_str_;
+ w->read_str_.Dispose(node_isolate);
+ w->read_str_.Clear();
+ Local<Value> argv[] = { read_str, read_len };
+ MakeCallback(w->object_, onread_sym_, ARRAY_SIZE(argv), argv);
+}
+
+
+void TCPSWrap::OnWrite(uv_write_t* req, int status) {
+ HandleScope handle_scope(node_isolate);
+ TCPSWrap* w = container_of(req, TCPSWrap, write_req_);
+ w->write_str_.Dispose(node_isolate);
+ w->write_str_.Clear();
+ Local<Value> argv[] = { Integer::New(status) };
+ MakeCallback(w->object_, onwrite_sym_, ARRAY_SIZE(argv), argv);
+}
+
+
+void TCPSWrap::OnConnect(uv_connect_t* req, int status) {
+ HandleScope handle_scope(node_isolate);
+ TCPSWrap* w = container_of(req, TCPSWrap, connect_req_);
+ Local<Value> argv[] = { Integer::New(status) };
+ MakeCallback(w->object_, onconnect_sym_, ARRAY_SIZE(argv), argv);
+}
+
+
+void TCPSWrap::OnClose(uv_handle_t* handle) {
+ TCPSWrap* w = container_of(handle, TCPSWrap, handle_);
+ w->object_.MakeWeak(node_isolate, w, WeakCallback);
+}
+
+
+void TCPSWrap::WeakCallback(Isolate*, Persistent<Value>, void* arg) {
+ TCPSWrap* w = static_cast<TCPSWrap*>(arg);
+ w->object_.Dispose(node_isolate);
+ delete w;
+}
+
+
+} // namespace node
+
+NODE_MODULE(node_tcps_wrap, node::TCPSWrap::Initialize)
Please sign in to comment.
Something went wrong with that request. Please try again.