-
-
Notifications
You must be signed in to change notification settings - Fork 218
/
Copy pathmodule.h
111 lines (90 loc) · 3.7 KB
/
module.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#pragma once
#include <cstdio>
#include <future>
#include <chrono>
#include "./closable.h"
#include "./outgoing_msg.h"
#include "util/reaper.h"
#include "util/trash.h"
namespace zmq {
class Context;
class Socket;
struct Terminator {
constexpr Terminator() noexcept = default;
void operator()(void* context) {
assert(context != nullptr);
#ifdef ZMQ_BLOCKY
const bool blocky = zmq_ctx_get(context, ZMQ_BLOCKY) != 0;
#else
/* If the option cannot be set, don't suggest to set it. */
const bool blocky = false;
#endif
/* Start termination asynchronously so we can detect if it takes long
and should warn the user about this default blocking behaviour. */
auto terminate = std::async(std::launch::async, [&] {
[[maybe_unused]] auto err = zmq_ctx_term(context);
assert(err == 0);
});
using namespace std::chrono_literals;
const auto timeout = 500ms;
if (terminate.wait_for(timeout) == std::future_status::timeout) {
/* We can't use process.emitWarning, because the Node.js runtime
has already shut down. So we mimic it instead. */
(void)fprintf(stderr,
"(node:%d) WARNING: Waiting for queued ZeroMQ messages to be "
"delivered.%s\n",
uv_os_getpid(),
blocky ? " Set 'context.blocky = false' to change this behaviour." : "");
}
terminate.wait();
}
};
class Module : public Napi::Addon<Module> {
/* Contains shared global state that will be accessible by all
agents/threads. */
class Global {
using Shared = std::shared_ptr<Global>;
static Shared Instance();
public:
Global();
/* ZMQ pointer to the global shared context which allows agents/threads
to communicate over inproc://. */
void* SharedContext;
/* A list of ZMQ contexts that will be terminated on a clean exit. */
ThreadSafeReaper<void, Terminator> ContextTerminator;
friend class Module;
};
public:
explicit Module(Napi::Env env, Napi::Object exports);
class Global& Global() {
return *global;
}
/* The order of properties defines their destruction in reverse order and is
very important to ensure a clean process exit. During the destruction of
other objects buffers might be released, we must delete trash last. */
Trash<OutgoingMsg::Reference> MsgTrash;
private:
/* Second to last to be deleted is the global state, which also causes
context termination (which might block). */
Global::Shared global = Global::Instance();
public:
/* Reaper that calls ->Close() on objects that have not been GC'ed so far.
Some versions of Node will call destructors on environment shutdown,
while others will *only* call destructors after GC. The reason we need to
call ->Close() is to ensure proper ZMQ cleanup and releasing underlying
resources. The versions of Node that do not call destructors *WILL* of
course leak memory if worker threads are created (in a loop). */
Reaper<Closable> ObjectReaper;
/* A JS reference to the default global context. This is a unique object for
each individual agent/thread, but is in fact a wrapper for the same
global ZMQ context. */
Napi::ObjectReference GlobalContext;
/* JS constructor references. */
Napi::FunctionReference Context;
Napi::FunctionReference Socket;
Napi::FunctionReference Observer;
Napi::FunctionReference Proxy;
};
} // namespace zmq
static_assert(!std::is_copy_constructible_v<zmq::Module>, "not copyable");
static_assert(!std::is_move_constructible_v<zmq::Module>, "not movable");