Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions apisix/plugins/elasticsearch-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin = require("apisix.plugin")
local ngx = ngx
local ngx_re = ngx.re
local str_format = core.string.format
local math_random = math.random
local os_date = os.date
local pairs = pairs

local plugin_name = "elasticsearch-logger"
Expand Down Expand Up @@ -200,11 +202,37 @@ local function get_es_major_version(uri, conf)
end


local function get_logger_entry(conf, ctx)
local function replace_time(m)
local time_format = m[1]
local time = os_date(time_format)
if not time then
core.log.error("failed to parse time format: ", time_format)
return ""
end
return time
end


local function resolve_index_vars(index, var)
local new_index, _, err = ngx_re.gsub(index, "(?<!\\$){([^}]*)}", replace_time, "jo")
if not new_index then
core.log.error("failed to substitute time format: ", err)
end

new_index, err = core.utils.resolve_var(new_index or index, var)
if not new_index then
core.log.error("failed to resolve APISIX variable from index: ", err)
end

return new_index or index
end


local function get_logger_entry(conf, ctx, index)
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
local body = {
index = {
_index = conf.field.index
_index = index
}
}
-- for older version type is required
Expand Down Expand Up @@ -303,10 +331,11 @@ end


function _M.log(conf, ctx)
local index = resolve_index_vars(conf.field.index, ctx.var)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = get_logger_entry(conf, ctx)
local entry = get_logger_entry(conf, ctx, index)

if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
Expand All @@ -320,5 +349,6 @@ function _M.log(conf, ctx)
process, max_pending_entries)
end

_M._resolve_index_vars = resolve_index_vars

return _M
2 changes: 1 addition & 1 deletion docs/en/latest/plugins/elasticsearch-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ The `elasticsearch-logger` Plugin pushes request and response logs in batches to
| ------------- | ------- | -------- | --------------------------- | ------------ | ------------------------------------------------------------ |
| endpoint_addrs | array[string] | True | | | Elasticsearch API endpoint addresses. If multiple endpoints are configured, they will be written randomly. |
| field | object | True | | | Elasticsearch field configuration. |
| field.index | string | True | | | Elasticsearch [_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field). Supports the configuration of a [lua time format](https://www.lua.org/pil/22.1.html) in curly brackets to include the current date, such as `service-{%Y-%m-%d}`. |
| field.index | string | True | | | Elasticsearch [_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field). Supports [Lua time format](https://www.lua.org/pil/22.1.html) in curly brackets for date-based indices (e.g., `service-{%Y-%m-%d}`) and [APISIX variables](../apisix-variable.md) prefixed with `$` (e.g., `service-$host-{%Y.%m.%d}`). |
| log_format | object | False | | | Custom log format as key-value pairs in JSON. Values support strings and nested objects (up to five levels deep; deeper fields are truncated). Within strings, [APISIX](../apisix-variable.md) or [NGINX variables](http://nginx.org/en/docs/varindex.html) can be referenced by prefixing with `$`. |
| auth | object | False | | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) configuration. |
| auth.username | string | False | | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) username. Required if `auth` is configured. Must be provided together with `auth.password`. |
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/latest/plugins/elasticsearch-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ description: elasticsearch-logger Plugin 将请求和响应日志批量推送到
| ------------- | ------- | -------- | -------------------- | ------------ | ------------------------------------------------------------ |
| endpoint_addrs | array[string] | 是 | | | Elasticsearch API 端点地址。如果配置了多个端点,则会随机写入。 |
| field | object | 是 | | | Elasticsearch 字段配置。 |
| field.index | string | 是 | | | Elasticsearch [_index 字段](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field)。支持在花括号中使用 [lua 时间格式](https://www.lua.org/pil/22.1.html) 来包含当前日期例如 `service-{%Y-%m-%d}`。 |
| field.index | string | 是 | | | Elasticsearch [_index 字段](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field)。支持在花括号中使用 [Lua 时间格式](https://www.lua.org/pil/22.1.html) 来包含当前日期例如 `service-{%Y-%m-%d}`),以及使用 `$` 前缀引用 [APISIX 变量](../apisix-variable.md)(例如 `service-$host-{%Y.%m.%d}`)。 |
| log_format | object | 否 | | | 自定义日志格式以 JSON 的键值对声明。值支持字符串和嵌套对象(最多五层,超出部分将被截断)。字符串中可通过 `$` 前缀引用 [APISIX](../apisix-variable.md) 或 [NGINX 变量](http://nginx.org/en/docs/varindex.html)。 |
| auth | object | 否 | | | Elasticsearch [身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) 配置。 |
| auth.username | string | 否 | | | Elasticsearch [身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) 用户名​​。当配置 `auth` 时必填,需与 `auth.password` 成对配置。 |
Expand Down
273 changes: 273 additions & 0 deletions t/plugin/elasticsearch-logger2.t
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,276 @@ GET /hello
hello world
--- error_log
Batch Processor[elasticsearch-logger] successfully processed the entries



=== TEST 4: resolve_index_vars unit test
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.elasticsearch-logger")
local configs = {
["%Y"] = "^\\d{4}$",
["%m"] = "^\\d{2}$",
["%d"] = "^\\d{2}$",
["%Y.%m.%d"] = "^\\d{4}\\.\\d{2}\\.\\d{2}$",
}

for format, regex in pairs(configs) do
local new = plugin._resolve_index_vars("prefix{" .. format .. "}suffix")
local ok = ngx.re.match(new, "^prefix" .. regex:sub(2, -2) .. "suffix$")
if not ok then
ngx.say("error: " .. new)
return
end
end
ngx.say("ok")
}
}
--- response_body
ok



=== TEST 5: test date variable in index
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test

local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
uri = "/hello",
upstream = {
type = "roundrobin",
nodes = {
["127.0.0.1:1980"] = 1
}
},
plugins = {
["elasticsearch-logger"] = {
endpoint_addr = "http://127.0.0.1:9201",
field = {
index = "services-{%Y.%m.%d}"
},
auth = {
username = "elastic",
password = "123456"
},
batch_max_size = 1,
inactive_timeout = 1,
}
}
})

if code >= 300 then
ngx.status = code
end

local code, _, body = t("/hello")
}
}
--- error_log eval
qr/body: \{"index":\{"_index":"services-\d\d\d\d\.\d\d\.\d\d"\}\}/



=== TEST 6: test APISIX variable in index
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test

local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
uri = "/hello",
upstream = {
type = "roundrobin",
nodes = {
["127.0.0.1:1980"] = 1
}
},
plugins = {
["elasticsearch-logger"] = {
endpoint_addr = "http://127.0.0.1:9201",
field = {
index = "services-$host"
},
auth = {
username = "elastic",
password = "123456"
},
batch_max_size = 1,
inactive_timeout = 1,
}
}
})

if code >= 300 then
ngx.status = code
end

local code, _, body = t("/hello")
}
}
--- error_log eval
qr/body: \{"index":\{"_index":"services-127.0.0.1"\}\}/



=== TEST 7: test both APISIX variable and date variable in index
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test

local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
uri = "/hello",
upstream = {
type = "roundrobin",
nodes = {
["127.0.0.1:1980"] = 1
}
},
plugins = {
["elasticsearch-logger"] = {
endpoint_addr = "http://127.0.0.1:9201",
field = {
index = "services-$host-{%Y.%m.%d}"
},
auth = {
username = "elastic",
password = "123456"
},
batch_max_size = 1,
inactive_timeout = 1,
}
}
})

if code >= 300 then
ngx.status = code
end

local code, _, body = t("/hello")
}
}
--- error_log eval
qr/body: \{"index":\{"_index":"services-127.0.0.1-\d\d\d\d\.\d\d\.\d\d"\}\}/



=== TEST 8: dynamic index template should not be mutated across requests
--- config
location /t {
content_by_lua_block {
local http = require "resty.http"
local httpc = http.new()
local t = require("lib.test_admin").test

local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
uri = "/hello",
upstream = {
type = "roundrobin",
nodes = {
["127.0.0.1:1980"] = 1
}
},
plugins = {
["elasticsearch-logger"] = {
endpoint_addr = "http://127.0.0.1:9201",
field = {
index = "services-$arg_id-{%Y.%m.%d}"
},
auth = {
username = "elastic",
password = "123456"
},
batch_max_size = 1,
inactive_timeout = 1,
}
}
})

if code >= 300 then
ngx.status = code
ngx.say(body)
return
end

local port = ngx.var.server_port
local res, err = httpc:request_uri("http://127.0.0.1:" .. port .. "/hello?id=first", {method = "GET"})
if not res then
ngx.say("request 1 failed: ", err)
return
end
res, err = httpc:request_uri("http://127.0.0.1:" .. port .. "/hello?id=second", {method = "GET"})
if not res then
ngx.say("request 2 failed: ", err)
return
end
ngx.sleep(2)
ngx.say("done")
}
}
--- response_body
done
--- error_log eval
[qr/body: \{"index":\{"_index":"services-first-\d\d\d\d\.\d\d\.\d\d"\}\}/, qr/body: \{"index":\{"_index":"services-second-\d\d\d\d\.\d\d\.\d\d"\}\}/]
--- timeout: 5



=== TEST 9: ${xx} variable syntax should not trigger time replacement
--- config
location /t {
content_by_lua_block {
local http = require "resty.http"
local httpc = http.new()
local t = require("lib.test_admin").test

local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
uri = "/hello",
upstream = {
type = "roundrobin",
nodes = {
["127.0.0.1:1980"] = 1
}
},
plugins = {
["elasticsearch-logger"] = {
endpoint_addr = "http://127.0.0.1:9201",
field = {
index = "services-${arg_id}-{%Y.%m.%d}"
},
auth = {
username = "elastic",
password = "123456"
},
batch_max_size = 1,
inactive_timeout = 1,
}
}
})

if code >= 300 then
ngx.status = code
ngx.say(body)
return
end

local port = ngx.var.server_port
local res, err = httpc:request_uri("http://127.0.0.1:" .. port .. "/hello?id=myservice", {method = "GET"})
if not res then
ngx.say("request failed: ", err)
return
end
ngx.sleep(2)
ngx.say("done")
}
}
--- response_body
done
--- error_log eval
qr/body: \{"index":\{"_index":"services-myservice-\d\d\d\d\.\d\d\.\d\d"\}\}/
--- no_error_log
failed to parse time format
--- timeout: 5
Loading