Skip to content

Commit

Permalink
More fluent bit migration work.
Browse files Browse the repository at this point in the history
  • Loading branch information
GUI committed Feb 12, 2024
1 parent 5ccda17 commit 2095808
Show file tree
Hide file tree
Showing 24 changed files with 208 additions and 265 deletions.
1 change: 0 additions & 1 deletion build/package_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ if [[ "$ID_NORMALIZED" == "rhel" ]]; then
)
fi
elif [[ "$ID_NORMALIZED" == "debian" ]]; then
libcurl_version=4
libffi_version=8
libldap_version="2.5-0"

Expand Down
9 changes: 2 additions & 7 deletions config/opensearch_templates_v3.json.etlua
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
"data_stream": {},
"template": {
"settings": {
"index": {
"refresh_interval": "10s"
},
"index": <%- json_encode(config["opensearch"]["template"]["index"]) %>,
"translog": <%- json_encode(config["opensearch"]["template"]["translog"]) %>,
"analysis": {
"normalizer": {
"lowercase_normalizer": {
Expand All @@ -20,10 +19,6 @@
"filter": ["uppercase"]
}
}
},
"translog": {
"durability": "async",
"sync_interval": "10s"
}
},
"mappings": {
Expand Down
11 changes: 11 additions & 0 deletions config/schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ import "path"
]
index_name_prefix: string | *"api-umbrella"
template_version: uint | *3
template: {
index: {
refresh_interval: string | *"10s"
number_of_shards: uint | *3
number_of_replicas: uint | *2
}
translog: {
durability: string | *"async"
sync_interval: string | *"10s"
}
}
}

#analytics_output_name: "opensearch"
Expand Down
7 changes: 7 additions & 0 deletions config/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ postgresql:
password: dev_password
opensearch:
index_name_prefix: "api-umbrella-test"
template:
index:
# In the test environment, disable replicas, reduce shards, and
# increasing refresh interval to speed things up.
refresh_interval: 50ms
number_of_shards: 1
number_of_replicas: 0
unbound:
port: 13100
control_port: 13101
Expand Down
6 changes: 3 additions & 3 deletions docker/dev/docker-start
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
set -e -u -x

# Clean files that may be left over if container doesn't shut down cleanly.
rm -f /opt/api-umbrella/var/run/rsyslogd.pid /tmp/.s.PGSQL.*
rm -f /tmp/.s.PGSQL.*

# Clean files that are tailed by rsyslog when using console output to prevent
# Clean files that are tailed by fluent-bit when using console output to prevent
# lots of output from previous logs on startup.
rm -f /var/log/api-umbrella/trafficserver/access.log /var/log/api-umbrella/trafficserver/diags.log /var/log/api-umbrella/trafficserver/error.log /var/log/api-umbrella/trafficserver/manager.log /var/log/api-umbrella/trafficserver/traffic.out
rm -f /var/log/api-umbrella/trafficserver/error.log

make
api-umbrella run
2 changes: 1 addition & 1 deletion src/api-umbrella/cli/write_config_files.lua
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ local function write_templates()

if template_ext == "etlua" then
local render_ok, render_err
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode })
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode, path_join = path_join })
if not render_ok or render_err then
print("template compile error in " .. template_path ..": " .. (render_err or content))
os.exit(1)
Expand Down
2 changes: 1 addition & 1 deletion src/api-umbrella/proxy/hooks/init_preload_modules.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require "api-umbrella.proxy.jobs.api_users_store_refresh_local_cache"
require "api-umbrella.proxy.jobs.db_expirations"
require "api-umbrella.proxy.jobs.distributed_rate_limit_puller"
require "api-umbrella.proxy.jobs.distributed_rate_limit_pusher"
require "api-umbrella.proxy.jobs.opensearch_setup"
require "api-umbrella.proxy.log_utils"
require "api-umbrella.proxy.middleware.api_key_validator"
require "api-umbrella.proxy.middleware.api_matcher"
Expand All @@ -24,6 +23,7 @@ require "api-umbrella.proxy.middleware.role_validator"
require "api-umbrella.proxy.middleware.user_settings"
require "api-umbrella.proxy.middleware.website_matcher"
require "api-umbrella.proxy.opensearch_templates_data"
require "api-umbrella.proxy.startup.opensearch_setup"
require "api-umbrella.proxy.startup.seed_database"
require "api-umbrella.proxy.stores.active_config_store"
require "api-umbrella.proxy.stores.api_users_store"
Expand Down
2 changes: 1 addition & 1 deletion src/api-umbrella/proxy/hooks/init_worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local api_users_store_refresh_local_cache = require "api-umbrella.proxy.jobs.api
local db_expirations = require "api-umbrella.proxy.jobs.db_expirations"
local distributed_rate_limit_puller = require "api-umbrella.proxy.jobs.distributed_rate_limit_puller"
local distributed_rate_limit_pusher = require "api-umbrella.proxy.jobs.distributed_rate_limit_pusher"
local opensearch_setup = require "api-umbrella.proxy.jobs.opensearch_setup"
local opensearch_setup = require "api-umbrella.proxy.startup.opensearch_setup"
local random_seed = require "api-umbrella.utils.random_seed"
local seed_database = require "api-umbrella.proxy.startup.seed_database"

Expand Down
20 changes: 2 additions & 18 deletions src/api-umbrella/proxy/opensearch_templates_data.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local config = require("api-umbrella.utils.load_config")()
local etlua_render = require("etlua").render
local json_decode = require("cjson").decode
local json_encode = require "api-umbrella.utils.json_encode"
local xpcall_error_handler = require "api-umbrella.utils.xpcall_error_handler"

local opensearch_templates
Expand All @@ -13,31 +14,14 @@ else
local content = f:read("*all")
if content then
local render_ok, render_err
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config })
render_ok, content, render_err = xpcall(etlua_render, xpcall_error_handler, content, { config = config, json_encode = json_encode })
if not render_ok or render_err then
ngx.log(ngx.ERR, "template compile error in " .. path ..": " .. (render_err or content))
end

local ok, data = xpcall(json_decode, xpcall_error_handler, content)
if ok then
opensearch_templates = data

-- In the test environment, disable replicas, reduce shards, and
-- increasing refresh interval to speed things up.
if config["app_env"] == "test" then
for _, template in pairs(opensearch_templates) do
if not template["template"]["settings"] then
template["template"]["settings"] = {}
end
if not template["template"]["settings"]["index"] then
template["template"]["settings"]["index"] = {}
end

template["template"]["settings"]["index"]["refresh_interval"] = "50ms"
template["template"]["settings"]["index"]["number_of_shards"] = 1
template["template"]["settings"]["index"]["number_of_replicas"] = 0
end
end
else
ngx.log(ngx.ERR, "failed to parse json for " .. (path or "") .. ": " .. (data or ""))
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
local interval_lock = require "api-umbrella.utils.interval_lock"
local opensearch = require "api-umbrella.utils.opensearch"
local opensearch_templates = require "api-umbrella.proxy.opensearch_templates_data"
local shared_dict_retry_set = require("api-umbrella.utils.shared_dict_retry").set

local opensearch_query = opensearch.query
local jobs_dict = ngx.shared.jobs
local opensearch_query = opensearch.query
local sleep = ngx.sleep

local delay = 3600 -- in seconds
local timer_at = ngx.timer.at

local _M = {}

Expand Down Expand Up @@ -53,25 +53,35 @@ function _M.create_templates()
end
end

local set_ok, set_err = jobs_dict:safe_set("opensearch_templates_created", true)
local set_ok, set_err, set_forcible = shared_dict_retry_set(jobs_dict, "opensearch_templates_created", true)
if not set_ok then
ngx.log(ngx.ERR, "failed to set 'opensearch_templates_created' in 'active_config' shared dict: ", set_err)
ngx.log(ngx.ERR, "failed to set 'opensearch_templates_created' in 'jobs' shared dict: ", set_err)
elseif set_forcible then
ngx.log(ngx.WARN, "forcibly set 'opensearch_templates_created' in 'jobs' shared dict (shared dict may be too small)")
end
end

local function setup()
local _, err = _M.wait_for_opensearch()
if not err then
_M.create_templates()
else
ngx.log(ngx.ERR, "timed out waiting for eleasticsearch before setup, rerunning...")
if err then
ngx.log(ngx.ERR, "timed out waiting for opensearch before setup, rerunning...")
sleep(5)
setup()
return setup()
end

_M.create_templates()
end

function _M.setup_once()
interval_lock.mutex_exec("opensearch_index_setup", setup)
end

function _M.spawn()
interval_lock.repeat_with_mutex('opensearch_index_setup', delay, setup)
local ok, err = timer_at(0, _M.setup_once)
if not ok then
ngx.log(ngx.ERR, "failed to create timer: ", err)
return
end
end

return _M
7 changes: 4 additions & 3 deletions src/api-umbrella/web-app/actions/admin/stats.lua
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ function _M.logs(self)
for _, hit in ipairs(hits) do
local row = hit["_source"]
ngx.say(csv.row_to_csv({
time.opensearch_to_csv(row["request_at"]) or null,
time.opensearch_to_csv(row["@timestamp"]) or null,
row["request_method"] or null,
row["request_host"] or null,
sanitized_full_url(row) or null,
Expand Down Expand Up @@ -409,7 +409,7 @@ function _M.logs(self)
row["api_backend_resolved_host"] or null,
row["api_backend_response_code_details"] or null,
row["api_backend_response_flags"] or null,
hit["_id"] or null,
hit["request_id"] or null,
}))
end
ngx.flush(true)
Expand All @@ -431,7 +431,8 @@ function _M.logs(self)
row["_type"] = nil
row["_score"] = nil
row["_index"] = nil
row["request_id"] = hit["_id"]
row["request_at"] = hit["@timestamp"]
row["@timestamp"] = nil
row["request_url"] = sanitized_url_path_and_query(row)
row["request_url_query"] = strip_api_key_from_query(row["request_url_query"])
if row["request_query"] then
Expand Down
Loading

0 comments on commit 2095808

Please sign in to comment.