Permalink
Browse files

add skynet.watch for skynet.call, read http://blog.codingnow.com/2013…

  • Loading branch information...
1 parent 340148c commit f7242294dd0f1d5bc678f19d618327ea6895391c @cloudwu committed Dec 9, 2013
Showing with 128 additions and 22 deletions.
  1. +80 −18 lualib/skynet.lua
  2. +4 −0 service/pingserver.lua
  3. +27 −1 service/simplemonitor.lua
  4. +9 −0 service/testping.lua
  5. +2 −2 skynet-src/skynet.h
  6. +6 −1 skynet-src/skynet_server.c
View
@@ -26,9 +26,56 @@ local session_coroutine_address = {}
local wakeup_session = {}
local sleep_session = {}
+local watching_service = {}
+local watching_session = {}
+local exit_queue = {}
+
+-- suspend is function
+local suspend
+
local trace_handle
local trace_func = function() end
+local function string_to_handle(str)
+ return tonumber("0x" .. string.sub(str , 2))
+end
+
+----- monitor exit
+
+local function dispatch_exit()
+ local session = table.remove(exit_queue,1)
+ if session then
+ local co = session_id_coroutine[session]
+ session_id_coroutine[session] = nil
+ return suspend(co, coroutine.resume(co, false))
+ end
+end
+
+local function _exit_dispatch(session, monitor, service)
+-- Don't remove from watching_service , because user may call dead service
+ watching_service[service] = false
+ for session, srv in pairs(watching_session) do
+ if srv == service then
+ table.insert(exit_queue, session)
+ end
+ end
+end
+
+local watch_monitor
+
+function skynet.watch(service)
+ assert(type(service) == "number")
+ if watch_monitor == nil then
+ watch_monitor = string_to_handle(c.command("MONITOR"))
+ assert(watch_monitor, "Need a monitor")
+ end
+ if watching_service[service] == nil then
+ watching_service[service] = true
+ -- read lualib/simplemonitor.lua
+ assert(skynet.call(watch_monitor, "lua", "WATCH", service), "watch a dead service")
+ end
+end
+
-- coroutine reuse
local coroutine_pool = {}
@@ -52,9 +99,6 @@ local function co_create(f)
return co
end
--- suspend is function
-local suspend
-
local function dispatch_wakeup()
local co = next(wakeup_session)
if co then
@@ -111,6 +155,7 @@ function suspend(co, result, command, param, size)
end
trace_count()
dispatch_wakeup()
+ dispatch_exit()
end
function skynet.timeout(ti, func)
@@ -155,10 +200,6 @@ function skynet.name(name, handle)
c.command("NAME", name .. " " .. handle)
end
-local function string_to_handle(str)
- return tonumber("0x" .. string.sub(str , 2))
-end
-
local self_handle
function skynet.self()
if self_handle then
@@ -210,6 +251,9 @@ end
function skynet.send(addr, typename, ...)
local p = proto[typename]
+ if watching_service[addr] == false then
+ error("Service is dead")
+ end
return c.send(addr, p.id, 0 , p.pack(...))
end
@@ -231,10 +275,21 @@ skynet.pack = assert(c.pack)
skynet.unpack = assert(c.unpack)
skynet.tostring = assert(c.tostring)
+local function yield_call(service, session)
+ watching_session[session] = service
+ local succ, msg, sz = coroutine_yield("CALL", session)
+ watching_session[session] = nil
+ assert(succ, "Service is dead")
+ return msg,sz
+end
+
function skynet.call(addr, typename, ...)
local p = proto[typename]
- local session = c.send(addr, p.id , nil , p.pack(...))
- return p.unpack(coroutine_yield("CALL", assert(session, "call to invalid address")))
+ if watching_service[addr] == false then
+ error("Service is dead")
+ end
+ local session = assert(c.send(addr, p.id , nil , p.pack(...)),"call to invalid address")
+ return p.unpack(yield_call(addr, session))
end
function skynet.blockcall(addr, typename , ...)
@@ -245,13 +300,13 @@ function skynet.blockcall(addr, typename , ...)
c.command("UNLOCK")
error("call to invalid address")
end
- return p.unpack(coroutine_yield("CALL", session))
+ return p.unpack(yield_call(addr, session))
end
function skynet.rawcall(addr, typename, msg, sz)
local p = proto[typename]
- local session = c.send(addr, p.id , nil , msg, sz)
- return coroutine_yield("CALL", assert(session, "call to invalid address"))
+ local session = assert(c.send(addr, p.id , nil , msg, sz), "call to invalid address")
+ return yield_call(addr, session)
end
function skynet.ret(msg, sz)
@@ -306,7 +361,7 @@ local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
else
c.trace_switch(trace_handle, session)
session_id_coroutine[session] = nil
- suspend(co, coroutine.resume(co, msg, sz))
+ suspend(co, coroutine.resume(co, true, msg, sz))
end
else
local p = assert(proto[prototype], prototype)
@@ -419,7 +474,6 @@ do
local remote_call_func = setmetatable({}, weak_meta)
local _send = assert(c.send)
- local _yield = coroutine.yield
local _pack = assert(c.pack)
local _unpack = assert(c.unpack)
local _local = skynet.self()
@@ -431,7 +485,7 @@ do
local addr = remote_query(t.__remote)
-- the proto is 11 (lua is 10)
local session = assert(_send(addr, 11 , nil, _pack(t,method,...)), "call to invalid address")
- local msg, sz = _yield("CALL", session)
+ local msg, sz = coroutine_yield(session)
return select(2,assert(_unpack(msg,sz)))
end
remote_call_func[method] = f
@@ -463,13 +517,13 @@ do
local function remote_call(obj, method, ...)
if type(obj) ~= "table" or type(method) ~= "string" then
- return _yield("RETURN", _pack(false, "Invalid call"))
+ return coroutine_yield("RETURN", _pack(false, "Invalid call"))
end
local f = obj[method]
if type(f) ~= "function" then
- return _yield("RETURN", _pack(false, "Object has not method " .. method))
+ return coroutine_yield("RETURN", _pack(false, "Object has not method " .. method))
end
- return _yield("RETURN", _pack(pcall(f,...)))
+ return coroutine_yield("RETURN", _pack(pcall(f,...)))
end
function skynet.remote_root()
@@ -575,6 +629,14 @@ do
unpack = skynet.unpack,
dispatch = _debug_dispatch,
}
+
+ REG {
+ name = "exit",
+ id = 7,
+ pack = skynet.pack,
+ unpack = skynet.unpack,
+ dispatch = _exit_dispatch,
+ }
end
local init_func = {}
@@ -11,6 +11,10 @@ function command.HELLO()
skynet.ret(skynet.pack("hello"))
end
+function command.EXIT()
+ skynet.exit()
+end
+
skynet.start(function()
skynet.dispatch("lua", function(session,addr, cmd, ...)
command[cmd](...)
@@ -2,13 +2,39 @@ local skynet = require "skynet"
-- It's a simple service exit monitor, you can do something more when a service exit.
+local service_map = {}
+
skynet.register_protocol {
name = "client",
id = 3,
unpack = function() end,
dispatch = function(_, address)
+ local w = service_map[address]
+ if w then
+ for watcher in pairs(w) do
+ skynet.send(watcher, "exit", address)
+ end
+ service_map[address] = false
+ end
print(string.format("[:%x] exit", address))
end
}
-skynet.start(function() end)
+local function monitor(session, watcher, command, service)
+ assert(command, "WATCH")
+ local w = service_map[service]
+ if not w then
+ if w == false then
+ skynet.ret(skynet.pack(false))
+ return
+ end
+ w = {}
+ service_map[service] = w
+ end
+ w[watcher] = true
+ skynet.ret(skynet.pack(true))
+end
+
+skynet.start(function()
+ skynet.dispatch("lua", monitor)
+end)
@@ -0,0 +1,9 @@
+local skynet = require "skynet"
+
+skynet.start(function()
+ local ps = skynet.uniqueservice("pingserver")
+ skynet.watch(ps)
+ print(skynet.call(ps, "lua", "PING", "hello"))
+ skynet.send(ps, "lua", "EXIT")
+ print(skynet.call(ps, "lua", "PING", "hay"))
+end)
View
@@ -11,8 +11,8 @@
#define PTYPE_SYSTEM 4
#define PTYPE_HARBOR 5
#define PTYPE_SOCKET 6
-// don't use these id
-#define PTYPE_RESERVED_0 7
+// read lualib/skynet.lua lualib/simplemonitor.lua
+#define PTYPE_EXIT 7
// read lualib/skynet.lua lualib/mqueue.lua
#define PTYPE_RESERVED_QUEUE 8
#define PTYPE_RESERVED_DEBUG 9
@@ -540,7 +540,12 @@ skynet_command(struct skynet_context * context, const char * cmd , const char *
if (strcmp(cmd,"MONITOR") == 0) {
uint32_t handle=0;
if (param == NULL || param[0] == '\0') {
- handle = context->handle;
+ if (G_NODE.monitor_exit) {
+ // return current monitor serivce
+ sprintf(context->result, ":%x", G_NODE.monitor_exit);
+ return context->result;
+ }
+ return NULL;
} else {
if (param[0] == ':') {
handle = strtoul(param+1, NULL, 16);

0 comments on commit f724229

Please sign in to comment.