From 5a9fa01698eeb4abb3bf0fce901c1009a8cc57b9 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Thu, 17 Jun 2021 07:23:14 -0700 Subject: [PATCH] add heartbeat check --- kong/hybrid/data_plane.lua | 2 ++ kong/hybrid/event_loop.lua | 46 ++++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/kong/hybrid/data_plane.lua b/kong/hybrid/data_plane.lua index 6bb45030c625..5b0e35a35d40 100644 --- a/kong/hybrid/data_plane.lua +++ b/kong/hybrid/data_plane.lua @@ -128,6 +128,8 @@ function _M:communicate(premature) self:start_timer() return end + + self:start_timer() end diff --git a/kong/hybrid/event_loop.lua b/kong/hybrid/event_loop.lua index 2610b0c344fc..a8d23b6de9aa 100644 --- a/kong/hybrid/event_loop.lua +++ b/kong/hybrid/event_loop.lua @@ -3,15 +3,20 @@ local _M = {} local queue = require("kong.hybrid.queue") local message = require("kong.hybrid.message") +local constants = require("kong.constants") local exiting = ngx.worker.exiting local pcall = pcall local ngx_log = ngx.log +local ngx_time = ngx.time local ngx_WARN = ngx.WARN +local ngx_DEBUG = ngx.DEBUG local _MT = { __index = _M, } +local PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local PING_WAIT = PING_INTERVAL * 1.5 function _M.new(node_id) @@ -21,22 +26,48 @@ function _M.new(node_id) node_id = assert(node_id), } - return setmetatable(self, _MT) + self = setmetatable(self, _MT) + + self:register_callback("kong:hybrid:ping", function(m) + local pong = message.new(node_id, m.src, "kong:hybrid:pong", "") + self:send(pong) + ngx_log(ngx_DEBUG, "sent pong to: ", m.src) + end) + + self:register_callback("kong:hybrid:pong", function(m) + ngx_log(ngx_DEBUG, "received pong from: ", m.src) + end) + + return self end function _M:handle_peer(peer_id, sock) if self.clients[peer_id] then - return nil, "duplicate client: " .. node_id + return nil, "duplicate client: " .. peer_id end local q = queue.new() self.clients[peer_id] = q + local ping_thread = ngx.thread.spawn(function() + while not exiting() do + ngx.sleep(PING_INTERVAL) + + local m = message.new(self.node_id, peer_id, "kong:hybrid:ping", "") + q:enqueue(m) + ngx_log(ngx_DEBUG, "sent ping to: ", peer_id) + end + end) + local read_thread = ngx.thread.spawn(function() + local last_seen = ngx_time() + while not exiting() do local m, err = message.unpack_from_socket(sock) if m then + last_seen = ngx_time() + local callback = self.callbacks[m.topic] if callback then local succ, err = pcall(callback, m) @@ -52,7 +83,14 @@ function _M:handle_peer(peer_id, sock) "for topic \"", m.topic, "\" doesn't exist") end - elseif err ~= "timeout" then + elseif err == "timeout" then + local waited = ngx_time() - last_seen + if waited > PING_WAIT then + return nil, "did not receive PING frame from " .. peer_id .. + " within " .. PING_WAIT .. " seconds" + end + + else return nil, "failed to receive message from DP: " .. err end end @@ -79,6 +117,7 @@ function _M:handle_peer(peer_id, sock) ngx.thread.kill(write_thread) ngx.thread.kill(read_thread) + ngx.thread.kill(ping_thread) self.clients[peer_id] = nil @@ -100,7 +139,6 @@ function _M:send(message) end local q = self.clients[message.dest] - print("message.dest = ", message.dest) if not q then return nil, "node " .. message.dest .. " is disconnected" end