Skip to content

Commit

Permalink
child_process, feat: support ipc message.
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Apr 8, 2022
1 parent 061f483 commit f9fd64f
Show file tree
Hide file tree
Showing 20 changed files with 722 additions and 36 deletions.
35 changes: 30 additions & 5 deletions fibjs/include/ChildProcess.h
Expand Up @@ -14,9 +14,25 @@
namespace fibjs {

class ChildProcess : public ChildProcess_base {
public:
class Ipc {
public:
Ipc(Isolate* _isolate, v8::Local<v8::Object> _o, obj_ptr<Stream_base>& stream);

static result_t send(Stream_base* stream, v8::Local<v8::Value> msg);
static result_t sync_delete(Ipc* pThis);

public:
Isolate* m_isolate;
v8::Global<v8::Object> m_o;
obj_ptr<Stream_base> m_stream;
obj_ptr<Stream_base>& m_channel;
};

public:
ChildProcess()
: m_exitCode(-1)
, m_ipc(-1)
, m_pty(false)
{
memset(&uv_options, 0, sizeof(uv_process_options_t));
Expand All @@ -31,6 +47,9 @@ class ChildProcess : public ChildProcess_base {
virtual result_t kill(int32_t signal);
virtual result_t kill(exlib::string signal);
virtual result_t join(AsyncEvent* ac);
virtual result_t get_connected(bool& retVal);
virtual result_t disconnect();
virtual result_t send(v8::Local<v8::Value> msg);
virtual result_t usage(v8::Local<v8::Object>& retVal);
virtual result_t get_pid(int32_t& retVal);
virtual result_t get_exitCode(int32_t& retVal);
Expand All @@ -40,16 +59,18 @@ class ChildProcess : public ChildProcess_base {

public:
static int32_t spawn(uv_process_t* process, const uv_process_options_t* options);
result_t spawn(exlib::string command, v8::Local<v8::Array> args, v8::Local<v8::Object> options, bool fork);

public:
EVENT_FUNC(exit);
EVENT_FUNC(message);

public:
result_t fill_stdio(v8::Local<v8::Object> options);
private:
result_t create_pipe(int32_t idx);
result_t fill_stdio(v8::Local<v8::Object> options, bool fork);
result_t fill_env(v8::Local<v8::Object> options);
result_t fill_arg(exlib::string command, v8::Local<v8::Array> args);
result_t fill_opt(v8::Local<v8::Object> options);
result_t spawn(exlib::string command, v8::Local<v8::Array> args, v8::Local<v8::Object> options);

public:
static void on_uv_close(uv_handle_t* handle);
Expand All @@ -59,12 +80,16 @@ class ChildProcess : public ChildProcess_base {
exlib::Event m_ev;
obj_ptr<ValueHolder> m_vholder;

obj_ptr<UVStream> m_stdio[3];
obj_ptr<UVStream> m_stdio[4];

uv_stdio_container_t stdios[3];
uv_stdio_container_t stdios[4];
uv_process_options_t uv_options;
uv_process_t m_process;

obj_ptr<Stream_base> m_channel;

int32_t m_ipc;

bool m_pty;

int32_t m_exitCode;
Expand Down
11 changes: 8 additions & 3 deletions fibjs/include/Isolate.h
Expand Up @@ -149,9 +149,14 @@ class Isolate : public exlib::linkitem {
obj_ptr<SandBox> m_topSandbox;
obj_ptr<HttpClient> m_httpclient;

obj_ptr<Stream_base> m_stdin;
obj_ptr<Stream_base> m_stdout;
obj_ptr<Stream_base> m_stderr;
obj_ptr<Stream_base> m_stdio[3];

obj_ptr<Stream_base>& m_stdin = m_stdio[0];
obj_ptr<Stream_base>& m_stdout = m_stdio[1];
obj_ptr<Stream_base>& m_stderr = m_stdio[2];

obj_ptr<Stream_base> m_channel;
int32_t m_ipc_mode;

exlib::List<exlib::linkitem> m_fibers;

Expand Down
8 changes: 4 additions & 4 deletions fibjs/include/UVStream.h
Expand Up @@ -362,7 +362,7 @@ class UVStream : public UVStream_tmpl<Stream_base> {
FIBER_FREE();

public:
UVStream(int32_t fd)
UVStream(int32_t fd, int32_t ipc = 0)
: UVStream_tmpl<Stream_base>(fd)
{
uv_call([&] {
Expand All @@ -371,17 +371,17 @@ class UVStream : public UVStream_tmpl<Stream_base> {
return uv_stream_set_blocking(&m_stream, 1);
}

uv_pipe_init(s_uv_loop, &m_pipe, 0);
uv_pipe_init(s_uv_loop, &m_pipe, ipc);
return uv_pipe_open(&m_pipe, fd);
});
}

public:
static result_t create_pipe(obj_ptr<UVStream>& retVal)
static result_t create_pipe(obj_ptr<UVStream>& retVal, int32_t ipc = 0)
{
obj_ptr<UVStream> stream = new UVStream();
result_t hr = uv_call([&] {
return uv_pipe_init(s_uv_loop, &stream->m_pipe, 0);
return uv_pipe_init(s_uv_loop, &stream->m_pipe, ipc);
});
if (hr < 0)
return hr;
Expand Down
82 changes: 81 additions & 1 deletion fibjs/include/ifs/ChildProcess.h
Expand Up @@ -27,6 +27,9 @@ class ChildProcess_base : public EventEmitter_base {
virtual result_t kill(int32_t signal) = 0;
virtual result_t kill(exlib::string signal) = 0;
virtual result_t join(AsyncEvent* ac) = 0;
virtual result_t get_connected(bool& retVal) = 0;
virtual result_t disconnect() = 0;
virtual result_t send(v8::Local<v8::Value> msg) = 0;
virtual result_t usage(v8::Local<v8::Object>& retVal) = 0;
virtual result_t get_pid(int32_t& retVal) = 0;
virtual result_t get_exitCode(int32_t& retVal) = 0;
Expand All @@ -35,6 +38,8 @@ class ChildProcess_base : public EventEmitter_base {
virtual result_t get_stderr(obj_ptr<Stream_base>& retVal) = 0;
virtual result_t get_onexit(v8::Local<v8::Function>& retVal) = 0;
virtual result_t set_onexit(v8::Local<v8::Function> newVal) = 0;
virtual result_t get_onmessage(v8::Local<v8::Function>& retVal) = 0;
virtual result_t set_onmessage(v8::Local<v8::Function> newVal) = 0;

public:
static void s__new(const v8::FunctionCallbackInfo<v8::Value>& args)
Expand All @@ -50,6 +55,9 @@ class ChildProcess_base : public EventEmitter_base {
public:
static void s_kill(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_join(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_get_connected(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_disconnect(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_send(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_usage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_get_pid(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_get_exitCode(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
Expand All @@ -58,6 +66,8 @@ class ChildProcess_base : public EventEmitter_base {
static void s_get_stderr(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_get_onexit(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_set_onexit(v8::Local<v8::Name> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args);
static void s_get_onmessage(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_set_onmessage(v8::Local<v8::Name> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args);

public:
ASYNC_MEMBER0(ChildProcess_base, join);
Expand All @@ -73,16 +83,20 @@ inline ClassInfo& ChildProcess_base::class_info()
{ "kill", s_kill, false },
{ "join", s_join, false },
{ "joinSync", s_join, false },
{ "disconnect", s_disconnect, false },
{ "send", s_send, false },
{ "usage", s_usage, false }
};

static ClassData::ClassProperty s_property[] = {
{ "connected", s_get_connected, block_set, false },
{ "pid", s_get_pid, block_set, false },
{ "exitCode", s_get_exitCode, block_set, false },
{ "stdin", s_get_stdin, block_set, false },
{ "stdout", s_get_stdout, block_set, false },
{ "stderr", s_get_stderr, block_set, false },
{ "onexit", s_get_onexit, s_set_onexit, false }
{ "onexit", s_get_onexit, s_set_onexit, false },
{ "onmessage", s_get_onmessage, s_set_onmessage, false }
};

static ClassData s_cd = {
Expand Down Expand Up @@ -132,6 +146,47 @@ inline void ChildProcess_base::s_join(const v8::FunctionCallbackInfo<v8::Value>&
METHOD_VOID();
}

inline void ChildProcess_base::s_get_connected(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args)
{
bool vr;

METHOD_NAME("ChildProcess.connected");
METHOD_INSTANCE(ChildProcess_base);
PROPERTY_ENTER();

hr = pInst->get_connected(vr);

METHOD_RETURN();
}

inline void ChildProcess_base::s_disconnect(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_NAME("ChildProcess.disconnect");
METHOD_INSTANCE(ChildProcess_base);
METHOD_ENTER();

METHOD_OVER(0, 0);

hr = pInst->disconnect();

METHOD_VOID();
}

inline void ChildProcess_base::s_send(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_NAME("ChildProcess.send");
METHOD_INSTANCE(ChildProcess_base);
METHOD_ENTER();

METHOD_OVER(1, 1);

ARG(v8::Local<v8::Value>, 0);

hr = pInst->send(v0);

METHOD_VOID();
}

inline void ChildProcess_base::s_usage(const v8::FunctionCallbackInfo<v8::Value>& args)
{
v8::Local<v8::Object> vr;
Expand Down Expand Up @@ -236,4 +291,29 @@ inline void ChildProcess_base::s_set_onexit(v8::Local<v8::Name> property, v8::Lo

PROPERTY_SET_LEAVE();
}

inline void ChildProcess_base::s_get_onmessage(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args)
{
v8::Local<v8::Function> vr;

METHOD_NAME("ChildProcess.onmessage");
METHOD_INSTANCE(ChildProcess_base);
PROPERTY_ENTER();

hr = pInst->get_onmessage(vr);

METHOD_RETURN();
}

inline void ChildProcess_base::s_set_onmessage(v8::Local<v8::Name> property, v8::Local<v8::Value> value, const v8::PropertyCallbackInfo<void>& args)
{
METHOD_NAME("ChildProcess.onmessage");
METHOD_INSTANCE(ChildProcess_base);
PROPERTY_ENTER();
PROPERTY_VAL(v8::Local<v8::Function>);

hr = pInst->set_onmessage(v0);

PROPERTY_SET_LEAVE();
}
}
51 changes: 49 additions & 2 deletions fibjs/include/ifs/process.h
Expand Up @@ -52,6 +52,9 @@ class process_base : public EventEmitter_base {
static result_t memoryUsage(v8::Local<v8::Object>& retVal);
static result_t nextTick(v8::Local<v8::Function> func, OptArgs args);
static result_t binding(exlib::string name, v8::Local<v8::Value>& retVal);
static result_t get_connected(bool& retVal);
static result_t disconnect();
static result_t send(v8::Local<v8::Value> msg);

public:
static void s__new(const v8::FunctionCallbackInfo<v8::Value>& args)
Expand Down Expand Up @@ -90,6 +93,9 @@ class process_base : public EventEmitter_base {
static void s_static_memoryUsage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_static_nextTick(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_static_binding(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_static_get_connected(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args);
static void s_static_disconnect(const v8::FunctionCallbackInfo<v8::Value>& args);
static void s_static_send(const v8::FunctionCallbackInfo<v8::Value>& args);
};
}

Expand All @@ -108,7 +114,9 @@ inline ClassInfo& process_base::class_info()
{ "cpuUsage", s_static_cpuUsage, true },
{ "memoryUsage", s_static_memoryUsage, true },
{ "nextTick", s_static_nextTick, true },
{ "binding", s_static_binding, true }
{ "binding", s_static_binding, true },
{ "disconnect", s_static_disconnect, true },
{ "send", s_static_send, true }
};

static ClassData::ClassProperty s_property[] = {
Expand All @@ -125,7 +133,8 @@ inline ClassInfo& process_base::class_info()
{ "stdin", s_static_get_stdin, block_set, true },
{ "stdout", s_static_get_stdout, block_set, true },
{ "stderr", s_static_get_stderr, block_set, true },
{ "exitCode", s_static_get_exitCode, s_static_set_exitCode, true }
{ "exitCode", s_static_get_exitCode, s_static_set_exitCode, true },
{ "connected", s_static_get_connected, block_set, true }
};

static ClassData s_cd = {
Expand Down Expand Up @@ -479,4 +488,42 @@ inline void process_base::s_static_binding(const v8::FunctionCallbackInfo<v8::Va

METHOD_RETURN();
}

inline void process_base::s_static_get_connected(v8::Local<v8::Name> property, const v8::PropertyCallbackInfo<v8::Value>& args)
{
bool vr;

METHOD_NAME("process.connected");
PROPERTY_ENTER();

hr = get_connected(vr);

METHOD_RETURN();
}

inline void process_base::s_static_disconnect(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_NAME("process.disconnect");
METHOD_ENTER();

METHOD_OVER(0, 0);

hr = disconnect();

METHOD_VOID();
}

inline void process_base::s_static_send(const v8::FunctionCallbackInfo<v8::Value>& args)
{
METHOD_NAME("process.send");
METHOD_ENTER();

METHOD_OVER(1, 1);

ARG(v8::Local<v8::Value>, 0);

hr = send(v0);

METHOD_VOID();
}
}
5 changes: 5 additions & 0 deletions fibjs/src/base/Isolate.cpp
Expand Up @@ -25,6 +25,8 @@ namespace fibjs {

static std::unique_ptr<v8::Platform> g_default_platform;

void init_process_ipc(Isolate* isolate);

void Isolate::init_default_platform(platform_creator get_platform)
{
g_default_platform = get_platform ? get_platform() : v8::platform::NewDefaultPlatform();
Expand Down Expand Up @@ -107,6 +109,7 @@ class ShellArrayBufferAllocator : public v8::ArrayBuffer::Allocator {
Isolate::Isolate(exlib::string jsFilename)
: m_id((int32_t)s_iso_id.inc())
, m_hr(0)
, m_ipc_mode(0)
, m_test(NULL)
, m_currentFibers(0)
, m_idleFibers(0)
Expand Down Expand Up @@ -216,6 +219,8 @@ void Isolate::init()
v8::MaybeLocal<v8::Value> result = script->Run(_context);
v8::Local<v8::Object> AssertionError = result.ToLocalChecked().As<v8::Object>();
m_AssertionError.Reset(m_isolate, AssertionError);

init_process_ipc(this);
}

static result_t syncExit(Isolate* isolate)
Expand Down

0 comments on commit f9fd64f

Please sign in to comment.