Skip to content

Commit

Permalink
add heartbeat check
Browse files Browse the repository at this point in the history
  • Loading branch information
dndx committed Jun 17, 2021
1 parent 78fc104 commit 5a9fa01
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
2 changes: 2 additions & 0 deletions kong/hybrid/data_plane.lua
Expand Up @@ -128,6 +128,8 @@ function _M:communicate(premature)
self:start_timer()
return
end

self:start_timer()
end


Expand Down
46 changes: 42 additions & 4 deletions kong/hybrid/event_loop.lua
Expand Up @@ -3,15 +3,20 @@ local _M = {}

local queue = require("kong.hybrid.queue")
local message = require("kong.hybrid.message")
local constants = require("kong.constants")


local exiting = ngx.worker.exiting
local pcall = pcall
local ngx_log = ngx.log
local ngx_time = ngx.time


local ngx_WARN = ngx.WARN
local ngx_DEBUG = ngx.DEBUG
local _MT = { __index = _M, }
local PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL
local PING_WAIT = PING_INTERVAL * 1.5


function _M.new(node_id)
Expand All @@ -21,22 +26,48 @@ function _M.new(node_id)
node_id = assert(node_id),
}

return setmetatable(self, _MT)
self = setmetatable(self, _MT)

self:register_callback("kong:hybrid:ping", function(m)
local pong = message.new(node_id, m.src, "kong:hybrid:pong", "")
self:send(pong)
ngx_log(ngx_DEBUG, "sent pong to: ", m.src)
end)

self:register_callback("kong:hybrid:pong", function(m)
ngx_log(ngx_DEBUG, "received pong from: ", m.src)
end)

return self
end


function _M:handle_peer(peer_id, sock)
if self.clients[peer_id] then
return nil, "duplicate client: " .. node_id
return nil, "duplicate client: " .. peer_id
end

local q = queue.new()
self.clients[peer_id] = q

local ping_thread = ngx.thread.spawn(function()
while not exiting() do
ngx.sleep(PING_INTERVAL)

local m = message.new(self.node_id, peer_id, "kong:hybrid:ping", "")
q:enqueue(m)
ngx_log(ngx_DEBUG, "sent ping to: ", peer_id)
end
end)

local read_thread = ngx.thread.spawn(function()
local last_seen = ngx_time()

while not exiting() do
local m, err = message.unpack_from_socket(sock)
if m then
last_seen = ngx_time()

local callback = self.callbacks[m.topic]
if callback then
local succ, err = pcall(callback, m)
Expand All @@ -52,7 +83,14 @@ function _M:handle_peer(peer_id, sock)
"for topic \"", m.topic, "\" doesn't exist")
end

elseif err ~= "timeout" then
elseif err == "timeout" then
local waited = ngx_time() - last_seen
if waited > PING_WAIT then
return nil, "did not receive PING frame from " .. peer_id ..
" within " .. PING_WAIT .. " seconds"
end

else
return nil, "failed to receive message from DP: " .. err
end
end
Expand All @@ -79,6 +117,7 @@ function _M:handle_peer(peer_id, sock)

ngx.thread.kill(write_thread)
ngx.thread.kill(read_thread)
ngx.thread.kill(ping_thread)

self.clients[peer_id] = nil

Expand All @@ -100,7 +139,6 @@ function _M:send(message)
end

local q = self.clients[message.dest]
print("message.dest = ", message.dest)
if not q then
return nil, "node " .. message.dest .. " is disconnected"
end
Expand Down

0 comments on commit 5a9fa01

Please sign in to comment.