Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): support for specified the log formats via admin API. #4483

Merged
merged 3 commits into from Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 4 additions & 46 deletions apisix/plugins/http-logger.lua
Expand Up @@ -26,15 +26,11 @@ local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at

local plugin_name = "http-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local schema = {
type = "object",
Expand All @@ -59,14 +55,7 @@ local schema = {
local metadata_schema = {
type = "object",
properties = {
log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
},
log_format = log_util.metadata_schema_log_format,
},
additionalProperties = false,
}
Expand Down Expand Up @@ -157,24 +146,6 @@ local function send_http_data(conf, log_message)
end


local function gen_log_format(metadata)
local log_format = {}
if metadata == nil then
return log_format
end

for k, var_name in pairs(metadata.value.log_format) do
if var_name:byte(1, 1) == str_byte("$") then
log_format[k] = {true, var_name:sub(2)}
else
log_format[k] = {false, var_name}
end
end
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
Expand All @@ -198,23 +169,10 @@ function _M.log(conf, ctx)
core.log.info("metadata: ", core.json.delay_encode(metadata))

local entry
local log_format = lru_log_format(metadata or "", nil, gen_log_format,
metadata)
if core.table.nkeys(log_format) > 0 then
entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end

local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0 then
chzhuo marked this conversation as resolved.
Show resolved Hide resolved
entry = log_util.get_custom_format_log(metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
end
Expand Down
27 changes: 24 additions & 3 deletions apisix/plugins/kafka-logger.lua
Expand Up @@ -18,6 +18,8 @@ local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")

local math = math
local pairs = pairs
local type = type
Expand Down Expand Up @@ -63,15 +65,27 @@ local schema = {
required = {"broker_list", "kafka_topic"}
}

local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
additionalProperties = false,
}

local _M = {
version = 0.1,
priority = 403,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf)
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end

Expand Down Expand Up @@ -152,8 +166,15 @@ function _M.log(conf, ctx)
-- core.log.info("origin entry: ", entry)

else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0 then
chzhuo marked this conversation as resolved.
Show resolved Hide resolved
entry = log_util.get_custom_format_log(metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
chzhuo marked this conversation as resolved.
Show resolved Hide resolved
end
end

if not stale_timer_running then
Expand Down
48 changes: 48 additions & 0 deletions apisix/utils/log-util.lua
Expand Up @@ -17,10 +17,58 @@
local core = require("apisix.core")
local ngx = ngx
local pairs = pairs
local str_byte = string.byte
local req_get_body_data = ngx.req.get_body_data

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local _M = {}
_M.metadata_schema_log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
}


local function gen_log_format(format)
local log_format = {}
for k, var_name in pairs(format) do
if var_name:byte(1, 1) == str_byte("$") then
log_format[k] = {true, var_name:sub(2)}
else
log_format[k] = {false, var_name}
end
end
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end

local function get_custom_format_log(format)
chzhuo marked this conversation as resolved.
Show resolved Hide resolved
local ctx = ngx.ctx.api_ctx

local log_format = lru_log_format(format or "", nil, gen_log_format, format)
local entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end

local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
return entry
end
_M.get_custom_format_log = get_custom_format_log

local function get_full_log(ngx, conf)
local ctx = ngx.ctx.api_ctx
Expand Down
29 changes: 28 additions & 1 deletion docs/zh/latest/plugins/kafka-logger.md
Expand Up @@ -138,7 +138,7 @@ title: kafka-logger
1. 为特定路由启用 kafka-logger 插件。

```shell
curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"kafka-logger": {
Expand Down Expand Up @@ -171,6 +171,33 @@ HTTP/1.1 200 OK
hello, world
```

## 插件元数据设置

| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ |
| log_format | object | 可选 | | | 以 Hash 对象方式声明日志格式。对 value 部分,仅支持字符串。如果是以`$`开头,则表明是要获取 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,该设置是全局生效的,意味着指定 log_format 后,将对所有绑定 kafka-logger 的 Route 或 Service 生效。 |
chzhuo marked this conversation as resolved.
Show resolved Hide resolved

### 设置日志格式示例

```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
```

在日志收集处,将得到类似下面的日志:

```shell
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
```


## 禁用插件

当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:
Expand Down
40 changes: 40 additions & 0 deletions t/plugin/kafka-logger.t
Expand Up @@ -722,3 +722,43 @@ GET /t
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]



=== TEST 20: add plugin metadata
chzhuo marked this conversation as resolved.
Show resolved Hide resolved
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/kafka-logger',
ngx.HTTP_PUT,
[[{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}]],
[[{
"node": {
"value": {
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}
},
"action": "set"
}]]
)
ngx.status = code
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
chzhuo marked this conversation as resolved.
Show resolved Hide resolved