From 3f9d1dd6efd62a6f5ceba9a55bb9df97a4731bb3 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 Jan 2020 21:34:41 +0100 Subject: [PATCH] src: remove AsyncRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove `AsyncRequest` from the source code, and replace its usage with threadsafe `SetImmediate()` calls. This has the advantage of being able to pass in any function, rather than one that is defined when the `AsyncRequest` is “installed”. This necessitates two changes: - The stopping flag (which was only used in one case and ignored in the other) is now a direct member of the `Environment` class. - Workers no longer have their own libuv handles, requiring manual management of their libuv ref count. As a drive-by fix, the `can_call_into_js` variable was turned into an atomic variable. While there have been no bug reports, the flag is set from `Stop(env)` calls, which are supposed to be possible from any thread. PR-URL: https://github.com/nodejs/node/pull/31386 Refs: https://github.com/openjs-foundation/summit/pull/240 Reviewed-By: Gireesh Punathil Reviewed-By: James M Snell Reviewed-By: Colin Ihrig Reviewed-By: Rich Trott --- src/env-inl.h | 23 ++++++++++++++--------- src/env.cc | 46 ++------------------------------------------- src/env.h | 47 ++++++++++++---------------------------------- src/node_worker.cc | 34 +++++++++++++++++++++------------ src/node_worker.h | 5 ++--- 5 files changed, 52 insertions(+), 103 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index 895111b408747c..b4fbc192cc21d0 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -881,8 +881,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } +inline void Environment::add_refs(int64_t diff) { + task_queues_async_refs_ += diff; + CHECK_GE(task_queues_async_refs_, 0); + if (task_queues_async_refs_ == 0) + uv_unref(reinterpret_cast(&task_queues_async_)); + else + uv_ref(reinterpret_cast(&task_queues_async_)); +} + inline bool Environment::is_stopping() const { - return thread_stopper_.is_stopped(); + return is_stopping_.load(); +} + +inline void Environment::set_stopping(bool value) { + is_stopping_.store(value); } inline std::list* Environment::extra_linked_bindings() { @@ -1192,14 +1205,6 @@ int64_t Environment::base_object_count() const { return base_object_count_; } -bool AsyncRequest::is_stopped() const { - return stopped_.load(); -} - -void AsyncRequest::set_stopped(bool flag) { - stopped_.store(flag); -} - #define VP(PropertyName, StringValue) V(v8::Private, PropertyName) #define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName) #define VS(PropertyName, StringValue) V(v8::String, PropertyName) diff --git a/src/env.cc b/src/env.cc index 31000d43f00b61..72e2cb09744eba 100644 --- a/src/env.cc +++ b/src/env.cc @@ -473,14 +473,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_unref(reinterpret_cast(&idle_check_handle_)); uv_unref(reinterpret_cast(&task_queues_async_)); - thread_stopper()->Install( - this, static_cast(this), [](uv_async_t* handle) { - Environment* env = static_cast(handle->data); - uv_stop(env->event_loop()); - }); - thread_stopper()->set_stopped(false); - uv_unref(reinterpret_cast(thread_stopper()->GetHandle())); - // Register clean-up cb to be called to clean up the handles // when the environment is freed, note that they are not cleaned in // the one environment per process setup, but will be called in @@ -498,8 +490,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { void Environment::ExitEnv() { set_can_call_into_js(false); - thread_stopper()->Stop(); + set_stopping(true); isolate_->TerminateExecution(); + SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); }); } void Environment::RegisterHandleCleanups() { @@ -604,7 +597,6 @@ void Environment::RunCleanup() { started_cleanup_ = true; TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunCleanup", this); - thread_stopper()->Uninstall(); CleanupHandles(); while (!cleanup_hooks_.empty()) { @@ -993,7 +985,6 @@ inline size_t Environment::SelfSize() const { // TODO(joyeecheung): refactor the MemoryTracker interface so // this can be done for common types within the Track* calls automatically // if a certain scope is entered. - size -= sizeof(thread_stopper_); size -= sizeof(async_hooks_); size -= sizeof(tick_info_); size -= sizeof(immediate_info_); @@ -1015,7 +1006,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("fs_stats_field_array", fs_stats_field_array_); tracker->TrackField("fs_stats_field_bigint_array", fs_stats_field_bigint_array_); - tracker->TrackField("thread_stopper", thread_stopper_); tracker->TrackField("cleanup_hooks", cleanup_hooks_); tracker->TrackField("async_hooks", async_hooks_); tracker->TrackField("immediate_info", immediate_info_); @@ -1094,38 +1084,6 @@ void Environment::CleanupFinalizationGroups() { } } -void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { - CHECK_NULL(async_); - env_ = env; - async_ = new uv_async_t; - async_->data = data; - CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); -} - -void AsyncRequest::Uninstall() { - if (async_ != nullptr) { - env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); - async_ = nullptr; - } -} - -void AsyncRequest::Stop() { - set_stopped(true); - if (async_ != nullptr) uv_async_send(async_); -} - -uv_async_t* AsyncRequest::GetHandle() { - return async_; -} - -void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { - if (async_ != nullptr) tracker->TrackField("async_request", *async_); -} - -AsyncRequest::~AsyncRequest() { - CHECK_NULL(async_); -} - // Not really any better place than env.cc at this moment. void BaseObject::DeleteMe(void* data) { BaseObject* self = static_cast(data); diff --git a/src/env.h b/src/env.h index 5295d8c7f6a225..ee7fc5a1e8ce1d 100644 --- a/src/env.h +++ b/src/env.h @@ -585,34 +585,6 @@ struct AllocatedBuffer { friend class Environment; }; -class AsyncRequest : public MemoryRetainer { - public: - AsyncRequest() = default; - ~AsyncRequest() override; - - AsyncRequest(const AsyncRequest&) = delete; - AsyncRequest& operator=(const AsyncRequest&) = delete; - AsyncRequest(AsyncRequest&&) = delete; - AsyncRequest& operator=(AsyncRequest&&) = delete; - - void Install(Environment* env, void* data, uv_async_cb target); - void Uninstall(); - void Stop(); - inline void set_stopped(bool flag); - inline bool is_stopped() const; - uv_async_t* GetHandle(); - void MemoryInfo(MemoryTracker* tracker) const override; - - - SET_MEMORY_INFO_NAME(AsyncRequest) - SET_SELF_SIZE(AsyncRequest) - - private: - Environment* env_; - uv_async_t* async_ = nullptr; - std::atomic_bool stopped_ {true}; -}; - class KVStore { public: KVStore() = default; @@ -1058,6 +1030,14 @@ class Environment : public MemoryRetainer { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // Increase or decrease a counter that manages whether this Environment + // keeps the event loop alive on its own or not. The counter starts out at 0, + // meaning it does not, and any positive value will make it keep the event + // loop alive. + // This is used by Workers to manage their own .ref()/.unref() implementation, + // as Workers aren't directly associated with their own libuv handles. + inline void add_refs(int64_t diff); + inline bool has_run_bootstrapping_code() const; inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code); @@ -1078,6 +1058,7 @@ class Environment : public MemoryRetainer { inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); inline bool is_stopping() const; + inline void set_stopping(bool value); inline std::list* extra_linked_bindings(); inline node_module* extra_linked_bindings_head(); inline const Mutex& extra_linked_bindings_mutex() const; @@ -1219,8 +1200,6 @@ class Environment : public MemoryRetainer { inline std::shared_ptr options(); inline std::shared_ptr> inspector_host_port(); - inline AsyncRequest* thread_stopper() { return &thread_stopper_; } - inline int32_t stack_trace_limit() const { return 10; } // The BaseObject count is a debugging helper that makes sure that there are @@ -1285,6 +1264,7 @@ class Environment : public MemoryRetainer { uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; uv_async_t task_queues_async_; + int64_t task_queues_async_refs_ = 0; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1342,7 +1322,7 @@ class Environment : public MemoryRetainer { bool has_run_bootstrapping_code_ = false; bool has_serialized_options_ = false; - bool can_call_into_js_ = true; + std::atomic_bool can_call_into_js_ { true }; Flags flags_; uint64_t thread_id_; std::unordered_set sub_worker_contexts_; @@ -1458,10 +1438,7 @@ class Environment : public MemoryRetainer { bool started_cleanup_ = false; int64_t base_object_count_ = 0; - - // A custom async abstraction (a pair of async handle and a state variable) - // Used by embedders to shutdown running Node instance. - AsyncRequest thread_stopper_; + std::atomic_bool is_stopping_ { false }; typedef std::unordered_set> ArrayBufferAllocatorList; diff --git a/src/node_worker.cc b/src/node_worker.cc index 1cf02b19619984..045fcc74a3a2f6 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -281,7 +281,7 @@ void Worker::Run() { stopped_ = true; this->env_ = nullptr; } - env_->thread_stopper()->set_stopped(true); + env_->set_stopping(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -424,7 +424,6 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - on_thread_finished_.Uninstall(); { HandleScope handle_scope(env()->isolate()); @@ -455,6 +454,8 @@ void Worker::JoinThread() { } Worker::~Worker() { + JoinThread(); + Mutex::ScopedLock lock(mutex_); CHECK(stopped_); @@ -630,18 +631,16 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->stopped_ = false; w->thread_joined_ = false; - w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { - Worker* w_ = static_cast(handle->data); - CHECK(w_->is_stopped()); - w_->parent_port_ = nullptr; - w_->JoinThread(); - delete w_; - }); + if (w->has_ref_) + w->env()->add_refs(1); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; thread_options.stack_size = kStackSize; CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) { + // XXX: This could become a std::unique_ptr, but that makes at least + // gcc 6.3 detect undefined behaviour when there shouldn't be any. + // gcc 7+ handles this well. Worker* w = static_cast(arg); const uintptr_t stack_top = reinterpret_cast(&arg); @@ -652,7 +651,12 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - w->on_thread_finished_.Stop(); + w->env()->SetImmediateThreadsafe( + [w = std::unique_ptr(w)](Environment* env) { + if (w->has_ref_) + env->add_refs(-1); + // implicitly delete w + }); }, static_cast(w)), 0); } @@ -667,13 +671,19 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (!w->has_ref_) { + w->has_ref_ = true; + w->env()->add_refs(1); + } } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (w->has_ref_) { + w->has_ref_ = false; + w->env()->add_refs(-1); + } } void Worker::GetResourceLimits(const FunctionCallbackInfo& args) { diff --git a/src/node_worker.h b/src/node_worker.h index ff2864e3193174..86de4d493e2480 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -42,7 +42,6 @@ class Worker : public AsyncWrap { void MemoryInfo(MemoryTracker* tracker) const override { tracker->TrackField("parent_port", parent_port_); - tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } SET_MEMORY_INFO_NAME(Worker) @@ -109,14 +108,14 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; - AsyncRequest on_thread_finished_; - // A raw flag that is used by creator and worker threads to // sync up on pre-mature termination of worker - while in the // warmup phase. Once the worker is fully warmed up, use the // async handle of the worker's Environment for the same purpose. bool stopped_ = true; + bool has_ref_ = true; + // The real Environment of the worker object. It has a lesser // lifespan than the worker object itself - comes to life // when the worker thread creates a new Environment, and gets