Skip to content

Commit

Permalink
feat(kafka-logger): support for specified the log formats via admin A…
Browse files Browse the repository at this point in the history
…PI. (#4483)

* feat(kafka-logger): support for specified the log formats via admin API.

* add kafka-logger log-format test case

* add english docs

Co-authored-by: zhuo.chen <zhuo.chen@upai.com>
  • Loading branch information
chzhuo and zhuo.chen committed Jun 30, 2021
1 parent d0bc723 commit 20d9dd2
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 53 deletions.
52 changes: 5 additions & 47 deletions apisix/plugins/http-logger.lua
Expand Up @@ -24,17 +24,12 @@ local plugin = require("apisix.plugin")

local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at

local plugin_name = "http-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local schema = {
type = "object",
Expand All @@ -59,14 +54,7 @@ local schema = {
local metadata_schema = {
type = "object",
properties = {
log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
},
log_format = log_util.metadata_schema_log_format,
},
additionalProperties = false,
}
Expand Down Expand Up @@ -157,24 +145,6 @@ local function send_http_data(conf, log_message)
end


local function gen_log_format(metadata)
local log_format = {}
if metadata == nil then
return log_format
end

for k, var_name in pairs(metadata.value.log_format) do
if var_name:byte(1, 1) == str_byte("$") then
log_format[k] = {true, var_name:sub(2)}
else
log_format[k] = {false, var_name}
end
end
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
Expand All @@ -198,23 +168,11 @@ function _M.log(conf, ctx)
core.log.info("metadata: ", core.json.delay_encode(metadata))

local entry
local log_format = lru_log_format(metadata or "", nil, gen_log_format,
metadata)
if core.table.nkeys(log_format) > 0 then
entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end

local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
end
Expand Down
29 changes: 26 additions & 3 deletions apisix/plugins/kafka-logger.lua
Expand Up @@ -18,6 +18,8 @@ local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")

local math = math
local pairs = pairs
local type = type
Expand Down Expand Up @@ -63,15 +65,27 @@ local schema = {
required = {"broker_list", "kafka_topic"}
}

local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
additionalProperties = false,
}

local _M = {
version = 0.1,
priority = 403,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf)
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end

Expand Down Expand Up @@ -152,8 +166,17 @@ function _M.log(conf, ctx)
-- core.log.info("origin entry: ", entry)

else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
core.log.info("custom log format entry: ", core.json.delay_encode(entry))
else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
end
end

if not stale_timer_running then
Expand Down
46 changes: 46 additions & 0 deletions apisix/utils/log-util.lua
Expand Up @@ -17,10 +17,56 @@
local core = require("apisix.core")
local ngx = ngx
local pairs = pairs
local str_byte = string.byte
local req_get_body_data = ngx.req.get_body_data

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
})

local _M = {}
_M.metadata_schema_log_format = {
type = "object",
default = {
["host"] = "$host",
["@timestamp"] = "$time_iso8601",
["client_ip"] = "$remote_addr",
},
}


local function gen_log_format(format)
local log_format = {}
for k, var_name in pairs(format) do
if var_name:byte(1, 1) == str_byte("$") then
log_format[k] = {true, var_name:sub(2)}
else
log_format[k] = {false, var_name}
end
end
core.log.info("log_format: ", core.json.delay_encode(log_format))
return log_format
end

local function get_custom_format_log(ctx, format)
local log_format = lru_log_format(format or "", nil, gen_log_format, format)
local entry = core.table.new(0, core.table.nkeys(log_format))
for k, var_attr in pairs(log_format) do
if var_attr[1] then
entry[k] = ctx.var[var_attr[2]]
else
entry[k] = var_attr[2]
end
end

local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
entry.service_id = matched_route.service_id
entry.route_id = matched_route.id
end
return entry
end
_M.get_custom_format_log = get_custom_format_log

local function get_full_log(ngx, conf)
local ctx = ngx.ctx.api_ctx
Expand Down
2 changes: 1 addition & 1 deletion docs/en/latest/plugins/http-logger.md
Expand Up @@ -91,7 +91,7 @@ hello, world
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as JSON object. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). |

Note that the metadata configuration is applied in global scope, which means it will take effect on all Route or Service which use http-logger plugin.
Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use http-logger plugin.

**APISIX Variables**

Expand Down
38 changes: 38 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Expand Up @@ -176,6 +176,44 @@ HTTP/1.1 200 OK
hello, world
```

## Metadata

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as JSON object. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). |

Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use kafka-logger plugin.

**APISIX Variables**

| Variable Name | Description | Usage Example |
|------------------|-------------------------|----------------|
| route_id | id of `route` | $route_id |
| route_name | name of `route` | $route_name |
| service_id | id of `service` | $service_id |
| service_name | name of `service` | $service_name |
| consumer_name | username of `consumer` | $consumer_name |

### Example

```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
```

It is expected to see some logs like that:

```shell
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
```

## Disable Plugin

Remove the corresponding json configuration in the plugin configuration to disable the `kafka-logger`.
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/latest/plugins/http-logger.md
Expand Up @@ -89,7 +89,7 @@ hello, world

| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ |
| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 对象方式声明日志格式。对 value 部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,该设置是全局生效的,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 |
| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 对象方式声明日志格式。对 value 部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,**该设置是全局生效的**,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 |

**APISIX 变量**

Expand Down
38 changes: 37 additions & 1 deletion docs/zh/latest/plugins/kafka-logger.md
Expand Up @@ -138,7 +138,7 @@ title: kafka-logger
1. 为特定路由启用 kafka-logger 插件。

```shell
curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"kafka-logger": {
Expand Down Expand Up @@ -171,6 +171,42 @@ HTTP/1.1 200 OK
hello, world
```

## 插件元数据设置

| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ |
| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 对象方式声明日志格式。对 value 部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,**该设置是全局生效的**,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 |

**APISIX 变量**

| 变量名 | 描述 | 使用示例 |
|------------------|-------------------------|----------------|
| route_id | `route` 的 id | $route_id |
| route_name | `route` 的 name | $route_name |
| service_id | `service` 的 id | $service_id |
| service_name | `service` 的 name | $service_name |
| consumer_name | `consumer` 的 username | $consumer_name |

### 设置日志格式示例

```shell
curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
```

在日志收集处,将得到类似下面的日志:

```shell
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
```

## 禁用插件

当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:
Expand Down

0 comments on commit 20d9dd2

Please sign in to comment.