Permalink
Browse files

worker: add support for resource constraints/OOM tracking

  • Loading branch information...
addaleax committed May 13, 2018
1 parent 1d8b74d commit 9a725557d9229ce7a680675eac4872e783a3376a
Showing with 99 additions and 6 deletions.
  1. +4 −0 doc/api/worker.md
  2. +15 −1 lib/internal/worker.js
  3. +6 −2 src/node_messaging.h
  4. +67 −2 src/node_worker.cc
  5. +7 −1 src/node_worker.h
@@ -346,6 +346,10 @@ if (isMainThread) {
described in the [HTML structured clone algorithm][], and an error will be
thrown if the object can not be cloned (e.g. because it contains
`function`s).
* maxSemiSpaceSize {integer} An optional memory limit in MB for the thread’s
heap’s semi-space, which contains most short-lived objects.
* maxOldSpaceSize {integer} An optional memory limit in MB for the thread’s
main heap.
* stdin {stream.Writable} A writable stream, which is available in the
Worker thread as `process.stdin`. If this is not provided, `process.stdin`
will not contain any data.
@@ -10,6 +10,7 @@ const {
ERR_WORKER_NEED_ABSOLUTE_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
ERR_WORKER_OUT_OF_MEMORY
} = require('internal/errors').codes;

const { internalBinding } = require('internal/bootstrap/loaders');
@@ -26,6 +27,7 @@ const {
kMessageFlagNone,
kMessageFlagCustomOffset,
kMessageFlagCouldNotSerializeError,
kMessageFlagOutOfMemory,
kMessageFlagErrorMessage
} = internalBinding('worker');

@@ -41,6 +43,7 @@ const kDispose = Symbol('kDispose');
const kOnExit = Symbol('kOnExit');
const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnOutOfMemory = Symbol('kOnOutOfMemory');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kWritableCallback = Symbol('kWritableCallback');
@@ -204,8 +207,13 @@ class Worker extends EventEmitter {
}
}

const resourceLimits = {
maxSemiSpaceSize: options.maxSemiSpaceSize | 0,
maxOldSpaceSize: options.maxOldSpaceSize | 0
};

// Set up the C++ handle for the worker, as well as some internal wiring.
this[kHandle] = new WorkerImpl();
this[kHandle] = new WorkerImpl(resourceLimits);
this[kHandle].onexit = (code) => this[kOnExit](code);
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (payload) => this.emit('message', payload));
@@ -247,6 +255,10 @@ class Worker extends EventEmitter {
this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
}

[kOnOutOfMemory](serialized) {
this.emit('error', new ERR_WORKER_OUT_OF_MEMORY());
}

[kOnErrorMessage](serialized) {
// This is what is called for uncaught exceptions.
const error = deserializeError(serialized);
@@ -262,6 +274,8 @@ class Worker extends EventEmitter {
return this.emit('online');
case kMessageFlagCouldNotSerializeError:
return this[kOnCouldNotSerializeErr](payload);
case kMessageFlagOutOfMemory:
return this[kOnOutOfMemory](payload);
case kMessageFlagErrorMessage:
return this[kOnErrorMessage](payload);
case kStdioPayload:
@@ -32,11 +32,15 @@ enum MessageFlag {
// messages to a FlaggedMessageListener rather than emitting it to JS land.
kMessageFlagMaxHandledInternally = 2,

// Used by the Workers implementation:
// Worker went out of memory.
kMessageFlagOutOfMemory = 3,

// Worker terminated with an error, which is passed along as the payload.
kMessageFlagErrorMessage = 3,
kMessageFlagErrorMessage = 4,

// Worker terminated with an error, and serializing that error failed.
kMessageFlagCouldNotSerializeError = 4,
kMessageFlagCouldNotSerializeError = 5,

// Generic offset for use by the JS core modules.
kMessageFlagCustomOffset = 100
@@ -14,6 +14,7 @@ using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::HeapStatistics;
using v8::Integer;
using v8::Isolate;
using v8::Local;
@@ -69,6 +70,37 @@ Worker* Worker::ForIsolate(Isolate* isolate) {
return by_isolate_[isolate];
}

void Worker::AfterGC(bool was_low_memory_notification) {
if (max_old_space_size_ == 0)
return;
HeapStatistics heap_stats;
isolate_->GetHeapStatistics(&heap_stats);
double ratio = static_cast<double>(heap_stats.used_heap_size())
/ max_old_space_size_;
if (ratio >= 0.9) {
if (!was_low_memory_notification) {
isolate_->LowMemoryNotification();
AfterGC(true);
} else {
isolate_->TerminateExecution();
OnOOM();
}
}
}

void Worker::OnOOM() {
// Called by V8 when there is no memory left.
{
Mutex::ScopedLock lock(mutex_);
uv_stop(&loop_);
Mutex::ScopedLock stopped_lock(stopped_mutex_);
if (stopped_) return;
stopped_ = true;
}
if (child_port_->IsSiblingClosed()) return;
child_port_->Send(Message(kMessageFlagOutOfMemory));
}

void Worker::OnErrorMessage(Local<v8::Message> message, Local<Value> error) {
// An uncaught exception happened. Call into JS to do some fancy
// serialization, then send the error report to the parent Environment.
@@ -104,7 +136,9 @@ void Worker::OnErrorMessage(Local<v8::Message> message, Local<Value> error) {
env_->async_hooks()->clear_async_id_stack();
}

Worker::Worker(Environment* env, Local<Object> wrap)
Worker::Worker(Environment* env,
Local<Object> wrap,
const v8::ResourceConstraints& resource_constraints)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) {
MakeWeak();

@@ -139,6 +173,7 @@ Worker::Worker(Environment* env, Local<Object> wrap)

Isolate::CreateParams params;
params.array_buffer_allocator = array_buffer_allocator_;
params.constraints = resource_constraints;

isolate_ = Isolate::New(params);
CHECK_NE(isolate_, nullptr);
@@ -149,6 +184,14 @@ Worker::Worker(Environment* env, Local<Object> wrap)
by_isolate_[isolate_] = this;
}

max_old_space_size_ = resource_constraints.max_old_space_size() * 1024 * 1024;
isolate_->AddGCEpilogueCallback([](Isolate* isolate,
v8::GCType type,
v8::GCCallbackFlags flags,
void* data) {
static_cast<Worker*>(data)->AfterGC(false);
}, static_cast<void*>(this));

isolate_->AddMessageListener([](Local<v8::Message> message,
Local<Value> data) {
Worker::ForIsolate(Isolate::GetCurrent())->OnErrorMessage(message, data);
@@ -376,7 +419,28 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
return;
}

new Worker(env, args.This());
v8::ResourceConstraints rc;
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsObject());
Local<Object> options = args[0].As<Object>();
int64_t max_semi_space_size;
int64_t max_old_space_size;

max_semi_space_size =
options->Get(env->context(),
env->max_semi_space_size_string())
.ToLocalChecked()->IntegerValue(env->context())
.FromJust();
max_old_space_size =
options->Get(env->context(),
env->max_old_space_size_string())
.ToLocalChecked()->IntegerValue(env->context())
.FromJust();

rc.set_max_semi_space_size(max_semi_space_size);
rc.set_max_old_space_size(max_old_space_size * 1.1);

new Worker(env, args.This(), rc);
}

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
@@ -465,6 +529,7 @@ void InitWorker(Local<Object> target,

NODE_DEFINE_CONSTANT(target, kMessageFlagNone);
NODE_DEFINE_CONSTANT(target, kMessageFlagCouldNotSerializeError);
NODE_DEFINE_CONSTANT(target, kMessageFlagOutOfMemory);
NODE_DEFINE_CONSTANT(target, kMessageFlagErrorMessage);
NODE_DEFINE_CONSTANT(target, kMessageFlagCustomOffset);
}
@@ -12,7 +12,9 @@ namespace worker {
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
Worker(Environment* env, v8::Local<v8::Object> wrap);
Worker(Environment* env,
v8::Local<v8::Object> wrap,
const v8::ResourceConstraints& resource_constraints);
~Worker();

// Run the worker. This is only called from the worker thread.
@@ -42,12 +44,16 @@ class Worker : public AsyncWrap {
static Worker* ForIsolate(v8::Isolate* isolate);

private:
void AfterGC(bool was_low_memory_notification);
void OnOOM();

uv_loop_t loop_;
IsolateData* isolate_data_ = nullptr;
v8::Isolate* isolate_ = nullptr;
Environment* env_ = nullptr;
v8::ArrayBuffer::Allocator* array_buffer_allocator_ = nullptr;
uv_thread_t tid_;
double max_old_space_size_;

// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;

0 comments on commit 9a72555

Please sign in to comment.