Permalink
Browse files

* 0.2.4: Switch to BSON instead of JSON for message serialization.

  • Loading branch information...
1 parent ce86f8a commit 0c36267e4363973325cbe3c2962284ccef8c1d73 @audreyt committed Nov 4, 2012
Showing with 1,462 additions and 36 deletions.
  1. +4 −0 CHANGES.md
  2. +6 −0 LICENSE
  3. +10 −1 binding.gyp
  4. +11 −2 package.json
  5. +5 −2 package.ls
  6. +125 −17 src/WebWorkerThreads.cc
  7. +1,016 −0 src/bson.cc
  8. +273 −0 src/bson.h
  9. +1 −3 src/thread_nextTick.js
  10. +1 −1 src/thread_nextTick.js.c
  11. +1 −1 src/thread_nextTick.ls
  12. +5 −5 src/worker.js
  13. +1 −1 src/worker.js.c
  14. +3 −3 src/worker.ls
View
@@ -1,3 +1,7 @@
+## 0.2.4
+
+* Switch to BSON instead of JSON for message serialization.
+
## 0.2.3
* Add SunOS to supported OSs; tested on Linux.
View
@@ -7,6 +7,12 @@ This work is published from Taiwan.
http://creativecommons.org/publicdomain/zero/1.0
+# This module includes src/bson.cc and src/bson.h, under the Apache License v2.0:
+
+https://github.com/mongodb/js-bson/
+
+Copyright by Christian Amor Kvalheim and other contributors.
+
# This module is based on Threads_a_gogo, under the MIT license:
https://github.com/xk/node-threads-a-gogo/blob/2665efb9d1d3cc0934ac2f5b6265b37a684d2e07/package.json#L24
View
@@ -2,7 +2,16 @@
'targets': [
{
'target_name': 'WebWorkerThreads',
- 'sources': [ 'src/WebWorkerThreads.cc' ]
+ 'sources': [ 'src/WebWorkerThreads.cc' ],
+ 'cflags!': [ '-fno-exceptions' ],
+ 'cflags_cc!': [ '-fno-exceptions' ],
+ 'conditions': [
+ ['OS=="mac"', {
+ 'xcode_settings': {
+ 'GCC_ENABLE_CPP_EXCEPTIONS': 'YES'
+ }
+ }]
+ ]
}
]
}
View
@@ -1,6 +1,6 @@
{
"name": "webworker-threads",
- "version": "0.3.0",
+ "version": "0.2.4",
"main": "build/Release/WebWorkerThreads.node",
"description": "Lightweight Web Worker API implementation with POSIX threads",
"keywords": [
@@ -18,7 +18,16 @@
"url": "http://github.com/audreyt/node-webworker-threads/issues",
"email": "audreyt@audreyt.org"
},
- "license": "MIT",
+ "licenses": [
+ {
+ "type": "Apache License, Version 2.0",
+ "url": "http://www.apache.org/licenses/LICENSE-2.0"
+ },
+ {
+ "type": "MIT",
+ "url": "file:LICENSE"
+ }
+ ],
"repository": {
"type": "git",
"url": "http://github.com/audreyt/node-webworker-threads.git"
View
@@ -1,5 +1,5 @@
name: 'webworker-threads'
-version: '0.3.0'
+version: '0.2.4'
main: 'build/Release/WebWorkerThreads.node'
description: 'Lightweight Web Worker API implementation with POSIX threads'
keywords: [ 'threads' 'web worker' 'a gogo' ]
@@ -11,7 +11,10 @@ homepage: 'https://github.com/audreyt/node-webworker-threads'
bugs:
url: 'http://github.com/audreyt/node-webworker-threads/issues'
email: 'audreyt@audreyt.org'
-license: 'MIT'
+licenses: [
+ { type: "Apache License, Version 2.0", url: "http://www.apache.org/licenses/LICENSE-2.0" }
+ { type: 'MIT', url: "file:LICENSE" }
+]
repository:
type: 'git'
url: 'http://github.com/audreyt/node-webworker-threads.git'
View
@@ -18,11 +18,13 @@ static int debug_allocs= 0;
*/
#include "queues_a_gogo.cc"
+#include "bson.cc"
//using namespace node;
using namespace v8;
static Persistent<String> id_symbol;
+static Persistent<String> require_symbol;
static Persistent<ObjectTemplate> threadTemplate;
static bool useLocker;
@@ -55,7 +57,8 @@ typedef struct {
enum jobTypes {
kJobTypeEval,
- kJobTypeEvent
+ kJobTypeEvent,
+ kJobTypeEventSerialized
};
typedef struct {
@@ -68,6 +71,12 @@ typedef struct {
String::Utf8Value** argumentos;
} typeEvent;
struct {
+ int length;
+ String::Utf8Value* eventName;
+ char* buffer;
+ size_t bufferSize;
+ } typeEventSerialized;
+ struct {
int error;
int tiene_callBack;
int useStringObject;
@@ -240,7 +249,6 @@ static void eventLoop (typeThread* thread) {
Local<Object> global= thread->context->Global();
global->Set(String::NewSymbol("self"), global);
-
global->Set(String::NewSymbol("puts"), FunctionTemplate::New(Puts)->GetFunction());
global->Set(String::NewSymbol("print"), FunctionTemplate::New(Print)->GetFunction());
@@ -335,6 +343,29 @@ static void eventLoop (typeThread* thread) {
queue_push(qitem, freeJobsQueue);
dispatchEvents->CallAsFunction(global, 2, args);
}
+ else if (job->jobType == kJobTypeEventSerialized) {
+ Local<Value> args[2];
+ str= job->typeEventSerialized.eventName;
+ args[0]= String::New(**str, (*str).length());
+ delete str;
+
+ int len = job->typeEventSerialized.length;
+ Local<Array> array= Array::New(len);
+ args[1]= array;
+
+ {
+ BSON *bson = new BSON();
+ char* data = job->typeEventSerialized.buffer;
+ size_t size = job->typeEventSerialized.bufferSize;
+ BSONDeserializer deserializer(bson, data, size);
+ Local<Object> result = deserializer.DeserializeDocument()->ToObject();
+ int i = 0; do { array->Set(i, result->Get(i)); } while (++i < len);
+ free(data);
+ }
+
+ queue_push(qitem, freeJobsQueue);
+ dispatchEvents->CallAsFunction(global, 2, args);
+ }
}
if (_ntq->Length()) {
@@ -476,6 +507,30 @@ static void Callback (uv_async_t *watcher, int revents) {
queue_push(qitem, freeJobsQueue);
thread->dispatchEvents->CallAsFunction(thread->JSObject, 2, args);
}
+ else if (job->jobType == kJobTypeEventSerialized) {
+ Local<Value> args[2];
+
+ str= job->typeEventSerialized.eventName;
+ args[0]= String::New(**str, (*str).length());
+ delete str;
+
+ int len = job->typeEventSerialized.length;
+ Local<Array> array= Array::New(len);
+ args[1]= array;
+
+ {
+ BSON *bson = new BSON();
+ char* data = job->typeEventSerialized.buffer;
+ size_t size = job->typeEventSerialized.bufferSize;
+ BSONDeserializer deserializer(bson, data, size);
+ Local<Object> result = deserializer.DeserializeDocument()->ToObject();
+ int i = 0; do { array->Set(i, result->Get(i)); } while (++i < len);
+ free(data);
+ }
+
+ queue_push(qitem, freeJobsQueue);
+ thread->dispatchEvents->CallAsFunction(thread->JSObject, 2, args);
+ }
}
}
@@ -639,36 +694,83 @@ static Handle<Value> processEmit (const Arguments &args) {
return scope.Close(args.This());
}
+static Handle<Value> processEmitSerialized (const Arguments &args) {
+ HandleScope scope;
+
+ //fprintf(stdout, "*** processEmit\n");
+ int len = args.Length();
+
+ if (!len) return scope.Close(args.This());
+
+ typeThread* thread= isAThread(args.This());
+ if (!thread) {
+ return ThrowException(Exception::TypeError(String::New("thread.emit(): the receiver must be a thread object")));
+ }
+
+ typeQueueItem* qitem= nuJobQueueItem();
+ typeJob* job= (typeJob*) qitem->asPtr;
+
+ job->jobType= kJobTypeEventSerialized;
+ job->typeEventSerialized.length= len-1;
+ job->typeEventSerialized.eventName= new String::Utf8Value(args[0]);
+ Local<Array> array= Array::New(len-1);
+ int i = 1; do { array->Set(i-1, args[i]); } while (++i < len);
+
+ {
+ char* buffer;
+ BSON *bson = new BSON();
+ size_t object_size;
+ Local<Object> object = bson->GetSerializeObject(array);
+ BSONSerializer<CountStream> counter(bson, false, false);
+ counter.SerializeDocument(object);
+ object_size = counter.GetSerializeSize();
+ buffer = (char *)malloc(object_size);
+ BSONSerializer<DataStream> data(bson, false, false, buffer);
+ data.SerializeDocument(object);
+ job->typeEventSerialized.buffer= buffer;
+ job->typeEventSerialized.bufferSize= object_size;
+ }
+ pushToInQueue(qitem, thread);
+ return scope.Close(args.This());
+}
static Handle<Value> postMessage (const Arguments &args) {
HandleScope scope;
+ int len = args.Length();
//fprintf(stdout, "*** threadEmit\n");
- if (!args.Length()) return scope.Close(args.This());
+ if (!len) return scope.Close(args.This());
typeThread* thread= (typeThread*) Isolate::GetCurrent()->GetData();
typeQueueItem* qitem= nuJobQueueItem();
typeJob* job= (typeJob*) qitem->asPtr;
- job->jobType= kJobTypeEvent;
- job->typeEvent.length= args.Length();
- job->typeEvent.eventName= new String::Utf8Value(String::New("message"));
- job->typeEvent.argumentos= (v8::String::Utf8Value**) malloc(job->typeEvent.length* sizeof(void*));
-
- Handle<Object> json = Handle<Object>::Cast(
- thread->context->Global()->Get(String::New("JSON")));
- Handle<Function> func = Handle<Function>::Cast(
- json->GetRealNamedProperty(String::New("stringify")));
- Handle<Value> jsonArgs[1];
- jsonArgs[0] = args[0];
- job->typeEvent.argumentos[0] = new String::Utf8Value(
- func->Call(thread->context->Global(), 1, jsonArgs)->ToString()
- );
+ job->jobType= kJobTypeEventSerialized;
+ job->typeEventSerialized.eventName= new String::Utf8Value(String::New("message"));
+ job->typeEventSerialized.length= len;
+
+ Local<Array> array= Array::New(len);
+ int i = 0; do { array->Set(i, args[i]); } while (++i < len);
+
+ {
+ char* buffer;
+ BSON *bson = new BSON();
+ size_t object_size;
+ Local<Object> object = bson->GetSerializeObject(array);
+ BSONSerializer<CountStream> counter(bson, false, false);
+ counter.SerializeDocument(object);
+ object_size = counter.GetSerializeSize();
+ buffer = (char *)malloc(object_size);
+ BSONSerializer<DataStream> data(bson, false, false, buffer);
+ data.SerializeDocument(object);
+ job->typeEventSerialized.buffer= buffer;
+ job->typeEventSerialized.bufferSize= object_size;
+ }
queue_push(qitem, &thread->outQueue);
if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); // wake up callback
@@ -741,6 +843,11 @@ static Handle<Value> Create (const Arguments &args) {
Local<Value> dispatchEvents= Script::Compile(String::New(kEvents_js))->Run()->ToObject()->CallAsFunction(thread->JSObject, 0, NULL);
thread->dispatchEvents= Persistent<Object>::New(dispatchEvents->ToObject());
+ if (args.Length() > 0) {
+ thread->JSObject->Set(require_symbol, args[0]);
+ }
+
+
uv_async_init(uv_default_loop(), &thread->async_watcher, Callback);
uv_ref((uv_handle_t*)&thread->async_watcher);
@@ -787,6 +894,7 @@ void Init (Handle<Object> target) {
threadTemplate->Set(String::NewSymbol("eval"), FunctionTemplate::New(Eval));
threadTemplate->Set(String::NewSymbol("load"), FunctionTemplate::New(Load));
threadTemplate->Set(String::NewSymbol("emit"), FunctionTemplate::New(processEmit));
+ threadTemplate->Set(String::NewSymbol("emitSerialized"), FunctionTemplate::New(processEmitSerialized));
threadTemplate->Set(String::NewSymbol("destroy"), FunctionTemplate::New(Destroy));
}
Oops, something went wrong.

0 comments on commit 0c36267

Please sign in to comment.