Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix/analytics] more robust buffer #471

Merged
merged 2 commits into from
Aug 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kong/plugins/log_serializers/alf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ function _M.serialize_entry(ngx)

local alf_req_body = analytics_data.req_body or ""
local alf_res_body = analytics_data.res_body or ""
local alf_req_post_args = analytics_data.req_post_args or {}

-- timers
local proxy_started_at, proxy_ended_at = ngx.ctx.proxy_started_at, ngx.ctx.proxy_ended_at
Expand Down Expand Up @@ -128,7 +129,7 @@ function _M.serialize_entry(ngx)
bodySize = string.len(alf_req_body),
postData = {
mimeType = alf_req_mimeType,
params = dic_to_array(ngx.req.get_post_args()),
params = dic_to_array(alf_req_post_args),
text = alf_req_body
}
},
Expand Down Expand Up @@ -182,7 +183,7 @@ function _M.new_alf(ngx, token, environment)
version = "1.2",
creator = {
name = "mashape-analytics-agent-kong",
version = "1.0.1"
version = "1.0.2"
},
entries = {_M.serialize_entry(ngx)}
}
Expand Down
48 changes: 33 additions & 15 deletions kong/plugins/mashape-analytics/buffer.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- ALF buffer module
--
-- This module contains a buffered array of ALF objects. When the buffer is full (max number of entries
-- or max payload size), it is converted to a JSON payload and moved to another buffer of payloads to be
-- or max payload size), it is converted to a JSON payload and moved a queue of payloads to be
-- sent to the server.
--
-- 1 buffer of ALFs (gets flushed once it reached the mmax size)
Expand Down Expand Up @@ -30,6 +30,7 @@ local ANALYTICS_SOCKET = {

local buffer_mt = {}
buffer_mt.__index = buffer_mt
buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE

-- A handler for delayed batch sending. When no call has been made for X seconds
-- (X being conf.delay), we send the batch to keep analytics as close to real-time
Expand Down Expand Up @@ -85,8 +86,15 @@ function buffer_mt:add_alf(alf)
local next_n_entries = #self.entries + 1
local alf_size = string.len(str)

-- If the alf_size exceeds the payload limit by itself, we have a big problem
if alf_size > self.MAX_SIZE then
ngx.log(ngx.ERR, string.format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.",
self.MAX_SIZE / MB))
return
end

-- If size or entries exceed the max limits
local full = next_n_entries > self.MAX_ENTRIES or self:get_size() > self.MAX_SIZE
local full = next_n_entries > self.MAX_ENTRIES or (self:get_size() + alf_size) > self.MAX_SIZE
if full then
self:flush()
-- Batch size reached, let's send the data
Expand Down Expand Up @@ -129,7 +137,11 @@ end
-- 3. Empty the buffer and reset the current buffer size
function buffer_mt:flush()
local payload = self:payload_string()
table.insert(self.sending_queue, payload)
table.insert(self.sending_queue, {
payload = payload,
n_entries = #self.entries,
size = self:get_size()
})
self.entries = {}
self.entries_size = 0
end
Expand All @@ -145,23 +157,29 @@ function buffer_mt.send_batch(premature, self)
return
end

-- Let's send the oldest payload in our buffer
local message = self.sending_queue[1]
-- Let's send the oldest batch in our queue
local batch_to_send = table.remove(self.sending_queue, 1)

local batch_saved = false
local drop_batch = false
local client = http:new()
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port)
if ok then
local res, err = client:request({path = ANALYTICS_SOCKET.path, body = message})
local res, err = client:request({path = ANALYTICS_SOCKET.path, body = batch_to_send.payload})
if not res then
ngx.log(ngx.ERR, "[mashape-analytics] failed to send batch: "..err)
ngx.log(ngx.ERR, string.format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s",
batch_to_send.n_entries, batch_to_send.size, err))
elseif res.status == 200 then
batch_saved = true
drop_batch = true
ngx.log(ngx.DEBUG, string.format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 400 then
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
drop_batch = true
else
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch. Status: (%s) Error: (%s)", res.status, res.body))
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
end

-- close connection, or put it into the connection pool
Expand All @@ -177,16 +195,16 @@ function buffer_mt.send_batch(premature, self)
ngx.log(ngx.ERR, "[mashape-analytics] failed to connect to the socket server: "..err)
end

if batch_saved then
-- Remove the payload that was sent
table.remove(self.sending_queue, 1)
if not drop_batch then
-- If the batch is not dropped, then add it back to the end of the queue and it will be tried again later
table.insert(self.sending_queue, batch_to_send)
end

self.lock_sending = false

-- Keep sendind data if the buffer is not yet emptied
-- Keep sendind data if the queue is not yet emptied
if #self.sending_queue > 0 then
local ok, err = ngx.timer.at(0, self.send_batch, self)
local ok, err = ngx.timer.at(2, self.send_batch, self)
if not ok then
ngx.log(ngx.ERR, "[mashape-analytics] failed to create batch retry timer: ", err)
end
Expand Down
17 changes: 15 additions & 2 deletions kong/plugins/mashape-analytics/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,24 @@ function AnalyticsHandler:access(conf)
-- Retrieve and keep in memory the bodies for this request
ngx.ctx.analytics = {
req_body = "",
res_body = ""
res_body = "",
req_post_args = {}
}

ngx.req.read_body()

local status, res = pcall(ngx.req.get_post_args)
if not status then
if res == "requesty body in temp file not supported" then
ngx.log(ngx.ERR, "[mashape-analytics] cannot read request body from temporary file. Try increasing the client_body_buffer_size directive.")
else
ngx.log(ngx.ERR, res)
end
else
ngx.ctx.analytics.req_post_args = res
end

if conf.log_body then
ngx.req.read_body()
ngx.ctx.analytics.req_body = ngx.req.get_body_data()
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/plugins/mashape-analytics/alf_serializer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe("ALF serializer", function()
assert.equal("1.2", alf.har.log.version)
assert.truthy(alf.har.log.creator)
assert.equal("mashape-analytics-agent-kong", alf.har.log.creator.name)
assert.equal("1.0.1", alf.har.log.creator.version)
assert.equal("1.0.2", alf.har.log.creator.version)
assert.truthy(alf.har.log.entries)
assert.equal(1, #(alf.har.log.entries))
end)
Expand Down
47 changes: 46 additions & 1 deletion spec/plugins/mashape-analytics/buffer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,56 @@ describe("ALFBuffer", function()
alf_buffer.flush:revert()
end)
end)
it("should call :flush() when reaching its max size", function()
-- How many stubs to reach the limit?
local COMMA_LEN = string.len(",")
local JSON_ARR_LEN = string.len("[]")
local max_n_stubs = math.ceil(ALFBuffer.MAX_BUFFER_SIZE / (STUB_LEN + COMMA_LEN)) -- + the comma after each ALF in the JSON payload

-- Create a new buffer
local buffer = ALFBuffer.new({batch_size = max_n_stubs + 100, delay = 2})

local s = spy.on(buffer, "flush")

-- Add max_n_stubs - 1 entries
for i = 1, max_n_stubs - 1 do
buffer:add_alf(ALF_STUB)
end

assert.spy(s).was_not_called()

-- We should have `(max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN` because no comma for latest object`
-- as our current buffer size.
assert.equal((max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN, buffer:get_size())

-- adding one more entry
buffer:add_alf(ALF_STUB)
assert.spy(s).was.called()
end)
it("should drop an ALF if it is too big by itself", function()
local str = string.rep(".", ALFBuffer.MAX_BUFFER_SIZE)
local huge_alf = {foo = str}

local buffer = ALFBuffer.new(CONF_STUB)

local s = spy.on(_G.ngx, "log")

buffer:add_alf(huge_alf)

assert.spy(s).was.called()
assert.equal(0, buffer.entries_size)
assert.equal(0, #buffer.entries)

finally(function()
_G.ngx.log:revert()
end)
end)
describe(":flush()", function()
it("should have emptied the current buffer and added a payload to be sent", function()
assert.equal(1, #alf_buffer.entries)
assert.equal(1, #alf_buffer.sending_queue)
assert.equal("string", type(alf_buffer.sending_queue[1]))
assert.equal("table", type(alf_buffer.sending_queue[1]))
assert.equal("string", type(alf_buffer.sending_queue[1].payload))
assert.equal(STUB_LEN, alf_buffer.entries_size)
end)
end)
Expand Down
8 changes: 4 additions & 4 deletions spec/plugins/mashape-analytics/fixtures/requests.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ return {
get_method = function() return "GET" end,
http_version = function() return 1.1 end,
get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end,
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end,
get_post_args = function() return {["hello"] = {"world", "earth"}} end
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end
},
resp = {
get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end
Expand All @@ -30,6 +29,7 @@ return {
analytics = {
req_body = "hello=world&hello=earth",
res_body = "{\"message\":\"response body\"}",
req_post_args = {["hello"] = {"world", "earth"}},
response_received = 143284457211
}
}
Expand Down Expand Up @@ -100,8 +100,7 @@ return {
get_method = function() return "GET" end,
http_version = function() return 1.1 end,
get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end,
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end,
get_post_args = function() return {["hello"] = {"world", "earth"}} end
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end
},
resp = {
get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end
Expand All @@ -122,6 +121,7 @@ return {
analytics = {
req_body = "hello=world&hello=earth",
res_body = "{\"message\":\"response body\"}",
req_post_args = {["hello"] = {"world", "earth"}},
response_received = 143284457211
}
}
Expand Down