Skip to content

Commit

Permalink
使用 sync 替换 mq.await
Browse files Browse the repository at this point in the history
  • Loading branch information
xicilion committed Apr 23, 2017
1 parent a1219dc commit 0796c51
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 110 deletions.
28 changes: 20 additions & 8 deletions fibjs/include/JSHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
namespace fibjs {

class JSHandler : public Handler_base {

public:
JSHandler(v8::Local<v8::Value> proc, bool async = false);

FIBER_FREE();

public:
Expand All @@ -31,10 +35,10 @@ class JSHandler : public Handler_base {
AsyncEvent* ac);

public:
static result_t New(v8::Local<v8::Value> hdlr,
obj_ptr<Handler_base>& retVal)
static result_t New(v8::Local<v8::Value> hdlr, obj_ptr<Handler_base>& retVal)
{
if (hdlr->IsString() || hdlr->IsStringObject() || hdlr->IsNumberObject() || hdlr->IsRegExp() || (!hdlr->IsFunction() && !hdlr->IsObject()))
if (hdlr->IsString() || hdlr->IsStringObject() || hdlr->IsNumberObject() || hdlr->IsRegExp()
|| (!hdlr->IsFunction() && !hdlr->IsObject()))
return CHECK_ERROR(CALL_E_BADVARTYPE);

retVal = Handler_base::getInstance(hdlr);
Expand All @@ -53,9 +57,8 @@ class JSHandler : public Handler_base {
return 0;
}

v8::Local<v8::Object> o = v8::Local<v8::Object>::Cast(hdlr);
if (!hdlr->IsFunction()) {
v8::Local<v8::Object> o = v8::Local<v8::Object>::Cast(hdlr);

obj_ptr<Routing_base> r = new Routing();
result_t hr = r->append(o);
if (hr < 0)
Expand All @@ -65,16 +68,25 @@ class JSHandler : public Handler_base {
return 0;
}

obj_ptr<JSHandler> r = new JSHandler();
r->SetPrivate("handler", hdlr);
Isolate* isolate = Isolate::current();
v8::Local<v8::Value> _async = o->GetPrivate(o->CreationContext(),
v8::Private::ForApi(isolate->m_isolate, isolate->NewFromUtf8("_async")))
.ToLocalChecked();

if (!IsEmpty(_async))
retVal = new JSHandler(_async, true);
else
retVal = new JSHandler(hdlr);

retVal = r;
return 0;
}

public:
static result_t js_invoke(Handler_base* hdlr, object_base* v,
obj_ptr<Handler_base>& retVal, AsyncEvent* ac);

private:
bool m_async;
};

} /* namespace fibjs */
Expand Down
7 changes: 0 additions & 7 deletions fibjs/include/ifs/mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class mq_base : public object_base {
// mq_base
static result_t jsHandler(v8::Local<v8::Value> hdlr, obj_ptr<Handler_base>& retVal);
static result_t await(obj_ptr<Handler_base>& retVal);
static result_t await(v8::Local<v8::Function> proc, obj_ptr<Handler_base>& retVal);
static result_t nullHandler(obj_ptr<Handler_base>& retVal);
static result_t invoke(Handler_base* hdlr, object_base* v, AsyncEvent* ac);

Expand Down Expand Up @@ -113,12 +112,6 @@ inline void mq_base::s_await(const v8::FunctionCallbackInfo<v8::Value>& args)

hr = await(vr);

METHOD_OVER(1, 1);

ARG(v8::Local<v8::Function>, 0);

hr = await(v0, vr);

METHOD_RETURN();
}

Expand Down
17 changes: 0 additions & 17 deletions fibjs/include/ifs/mq.idl
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,6 @@ module mq
*/
static Handler await();

/*! @brief 创建一个异步等待处理器
@param proc 指定异步处理事务
@return 返回创建的处理器

异步等待处理器用于需要异步处理的消息处理模式,示例如下:
@code
var hdr = mq.await((v, done) => {
....
....

done();
};
@endcode
示例是一个异步消息处理器,处理器接受任务后,将进入异步运行状态,直到 done 被调用,才继续下一阶段的处理。
*/
static Handler await(Function proc);

/*! @brief 创建一个空处理器对象,次处理对象不做任何处理直接返回
@return 返回空处理函数
*/
Expand Down
7 changes: 6 additions & 1 deletion fibjs/src/global/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ static void sync_stub(const v8::FunctionCallbackInfo<v8::Value>& args)

result_t global_base::sync(v8::Local<v8::Function> func, v8::Local<v8::Function>& retVal)
{
retVal = Isolate::current()->NewFunction("require", sync_stub, func);
Isolate* isolate = Isolate::current();

retVal = isolate->NewFunction("require", sync_stub, func);
retVal->SetPrivate(retVal->CreationContext(),
v8::Private::ForApi(isolate->m_isolate, isolate->NewFromUtf8("_async")), func);

return 0;
}

Expand Down
50 changes: 0 additions & 50 deletions fibjs/src/mq/AsyncWaitHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,54 +50,4 @@ result_t mq_base::await(obj_ptr<Handler_base>& retVal)
return 0;
}

class AsyncHandler : public Handler_base {
public:
AsyncHandler(v8::Local<v8::Function> proc)
{
SetPrivate("_proc", proc);
}

public:
// Handler_base
virtual result_t invoke(object_base* v, obj_ptr<Handler_base>& retVal,
AsyncEvent* ac)
{
if (ac)
return CHECK_ERROR(CALL_E_NOASYNC);

v8::Local<v8::Value> v1;
v1 = GetPrivate("_proc");
if (IsEmpty(v1))
return CALL_RETURN_NULL;

Isolate* isolate = holder();

retVal = new AsyncWaitHandler();
v8::Local<v8::Function> proc = v8::Local<v8::Function>::Cast(v1);

v8::Local<v8::Value> args[2];

args[0] = v->wrap();
args[1] = isolate->NewFunction("done", _done, retVal->wrap());

proc->Call(args[0], 2, args);
return 0;
}

public:
static void _done(const v8::FunctionCallbackInfo<v8::Value>& args)
{
AsyncWait_base* v = AsyncWait_base::getInstance(args.Data()->ToObject());
if (v)
v->end();
args.GetReturnValue().SetUndefined();
}
};

result_t mq_base::await(v8::Local<v8::Function> proc, obj_ptr<Handler_base>& retVal)
{
retVal = new AsyncHandler(proc);
return 0;
}

} /* namespace fibjs */
51 changes: 30 additions & 21 deletions fibjs/src/mq/JSHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,22 @@
#include "ifs/global.h"
#include "ifs/mq.h"
#include "ifs/console.h"
#include "AsyncWaitHandler.h"

namespace fibjs {

inline result_t msgMethod(Message_base* msg, exlib::string& method)
static void _done(const v8::FunctionCallbackInfo<v8::Value>& args)
{
exlib::string str;
const char *p, *p1;

msg->get_value(str);

p = p1 = str.c_str();
while (true) {
while (*p && *p != '.' && *p != '/' && *p != '\\')
p++;
if (p != p1)
break;
if (!*p)
return CHECK_ERROR(Runtime::setError("JSHandler: method \"" + method + "\" not found."));
p++;
p1 = p;
}

msg->set_value(*p ? p + 1 : "");
method.assign(p1, (int32_t)(p - p1));
AsyncWait_base* v = AsyncWait_base::getInstance(args.Data()->ToObject());
if (v)
v->end();
args.GetReturnValue().SetUndefined();
}

return 0;
JSHandler::JSHandler(v8::Local<v8::Value> proc, bool async)
: m_async(async)
{
SetPrivate("handler", proc);
}

result_t JSHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
Expand All @@ -47,6 +37,25 @@ result_t JSHandler::invoke(object_base* v, obj_ptr<Handler_base>& retVal,
if (ac)
return CHECK_ERROR(CALL_E_NOASYNC);

if (m_async) {
v8::Local<v8::Value> v1 = GetPrivate("handler");
if (IsEmpty(v1))
return CALL_RETURN_NULL;

Isolate* isolate = holder();

retVal = new AsyncWaitHandler();
v8::Local<v8::Function> proc = v8::Local<v8::Function>::Cast(v1);

v8::Local<v8::Value> args[2];

args[0] = v->wrap();
args[1] = isolate->NewFunction("done", _done, retVal->wrap());

proc->Call(args[0], 2, args);
return 0;
}

v8::Local<v8::Object> o = v->wrap();
Isolate* isolate = holder();

Expand Down
12 changes: 6 additions & 6 deletions test/mq_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,27 +459,27 @@ describe("mq", () => {
assert.equal(n, 400);
});

it("await(func)", () => {
it("sync(func)", () => {
var n = 100;

mq.invoke(mq.await((v, done) => {
mq.invoke(mq.jsHandler(sync((v, done) => {
function delayend() {
assert.equal(n, 100);
n = 200;
done();
}
setTimeout(delayend, 10);
}), m);
})), m);
assert.equal(n, 200);

n = 300;
mq.invoke(mq.await((v, done) => {
mq.invoke(mq.jsHandler(sync((v, done) => {
assert.equal(n, 300);
n = 400;
done();
}), m);
})), m);
assert.equal(n, 400);
});
});

// test.run(console.DEBUG);
// test.run(console.DEBUG);

0 comments on commit 0796c51

Please sign in to comment.