Skip to content

Commit

Permalink
More fixes and improvements for new fluentbit logging setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
GUI committed Feb 11, 2024
1 parent 9faa9d5 commit 5ccda17
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 58 deletions.
8 changes: 8 additions & 0 deletions config/schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ import "path"
fluent_bit: {
host: string | *"127.0.0.1"
port: uint16 | *14014
service: {
flush: float | *1
storage_max_chunks_up: uint | *32
storage_backlog_mem_limit: string | *"16M"
}
outputs: {
storage_total_limit_size: string | *"128M"
}
}

log: {
Expand Down
2 changes: 2 additions & 0 deletions config/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ router:
refresh_local_cache_interval: 0
fluent_bit:
port: 13014
service:
flush: 0.1
geoip:
db_update_frequency: false
postgresql:
Expand Down
4 changes: 2 additions & 2 deletions src/api-umbrella/proxy/hooks/log_initial_proxy.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ local function build_log_data()
set_computed_url_fields(data, ngx_ctx)
set_computed_user_agent_fields(data)

return normalized_data(data)
return data
end

local function log_request()
-- Build the log message and send to Fluent Bit for processing.
local data = build_log_data()
local message = json_encode(data)
local message = json_encode(normalized_data(data))
local _, err = send_message(message)
if err then
ngx.log(ngx.ERR, "failed to log message: ", err)
Expand Down
2 changes: 0 additions & 2 deletions src/api-umbrella/proxy/log_utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ function _M.normalized_data(data)
request_ip = lowercase_truncate(data["request_ip"], 45),
request_ip_city = truncate(data["request_ip_city"], 200),
request_ip_country = uppercase_truncate(data["request_ip_country"], 2),
request_ip_lat = tonumber(data["request_ip_lat"]),
request_ip_lon = tonumber(data["request_ip_lon"]),
request_ip_region = uppercase_truncate(data["request_ip_region"], 2),
request_method = uppercase_truncate(data["request_method"], 10),
request_origin = truncate(data["request_origin"], 200),
Expand Down
5 changes: 3 additions & 2 deletions src/api-umbrella/proxy/opensearch_templates_data.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ else
if ok then
opensearch_templates = data

-- In the test environment, disable replicas and reduce shards to speed
-- things up.
-- In the test environment, disable replicas, reduce shards, and
-- increasing refresh interval to speed things up.
if config["app_env"] == "test" then
for _, template in pairs(opensearch_templates) do
if not template["template"]["settings"] then
Expand All @@ -33,6 +33,7 @@ else
template["template"]["settings"]["index"] = {}
end

template["template"]["settings"]["index"]["refresh_interval"] = "50ms"
template["template"]["settings"]["index"]["number_of_shards"] = 1
template["template"]["settings"]["index"]["number_of_replicas"] = 0
end
Expand Down
74 changes: 34 additions & 40 deletions templates/etc/fluent-bit/fluent-bit.yaml.etlua
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
service:
log_level: info
# How often to flush inputs to the outputs.
flush: <%- json_encode(config["fluent_bit"]["service"]["flush"]) %>
storage.path: /tmp/fluentbit
storage.sync: normal
storage.checksum: off
storage.max_chunks_up: 256
storage.backlog.mem_limit: 5M
# max_chunks_up * 2MB corresponds to roughly the maximum memory use before
# things are buffered to disk for `storage.type: filesystem` inputs.
storage.max_chunks_up: <%- json_encode(config["fluent_bit"]["service"]["storage_max_chunks_up"]) %>
# Determine how many MB of data can be read from disk in the event fluent-bit
# is restarted.
storage.backlog.mem_limit: <%- json_encode(config["fluent_bit"]["service"]["storage_backlog_mem_limit"]) %>

pipeline:
inputs:
- name: tcp
tag: all
listen: <%- json_encode(config["fluent_bit"]["host"]) %>
port: <%- json_encode(config["fluent_bit"]["port"]) %>
format: json
tag: logs
# mem_buf_limit: 500M
# storage.type: filesystem
# storage.pause_on_chunks_overlimit: on
storage.type: filesystem

- name: fluentbit_metrics
scrape_interval: 10
tag: internal_metrics
scrape_interval: 60
scrape_on_start: true

filters:
- name: rewrite_tag
match: logs
rule: "$gatekeeper_denied_code ^. denied_logs false"
match: all
rule: "$gatekeeper_denied_code ^. denied false"

outputs:
# Print what we would log to stdout for extra redundancy.
- name: stdout
# match: logs denied_logs
match: "*"
json_date_key: false
format: json_lines

# Send API logs to OpenSearch analytics DB.
- name: opensearch
match: logs
# aws_auth: on
# aws_region: us-west-2
match_regex: "^(all|denied)$"
host: <%- json_encode(config["opensearch"]["_first_server"]["host"]) %>
port: <%- json_encode(config["opensearch"]["_first_server"]["port"]) %>
tls: <%- config["opensearch"]["_first_server"]["_https?"] and "on" or "off" %>
Expand All @@ -46,38 +50,28 @@ pipeline:
<% if config["opensearch"]["_first_server"]["password"] then %>
http_passwd: <%- json_encode(config["opensearch"]["_first_server"]["password"]) %>"
<% end %>
index: <%- json_encode(config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .. "-all") %>
suppress_type_name: on
id_key: request_id
# workers: 6
compress: gzip
trace_error: on
# storage.total_limit_size: 4GB
# generate_id: on
# buffer_size: false

- name: opensearch
match: denied_logs
# aws_auth: on
# aws_region: us-west-2
host: <%- json_encode(config["opensearch"]["_first_server"]["host"]) %>
port: <%- json_encode(config["opensearch"]["_first_server"]["port"]) %>
tls: <%- config["opensearch"]["_first_server"]["_https?"] and "on" or "off" %>
<% if config["opensearch"]["_first_server"]["user"] then %>
http_user: <%- json_encode(config["opensearch"]["_first_server"]["user"]) %>
<% end %>
<% if config["opensearch"]["_first_server"]["password"] then %>
http_passwd: <%- json_encode(config["opensearch"]["_first_server"]["password"]) %>"
<% end %>
index: <%- json_encode(config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .. "-denied") %>
index: <%- json_encode(config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .. "-$TAG") %>
# Data streams require "create" operations.
write_operation: create
# _type field is no longer accepted for OpenSearch.
suppress_type_name: on
# Retry failed requests in the event the server is temporarily down.
retry_limit: 30
# Use our request ID for the document ID to help reduce the possibility
# of duplicate data when retries are attempted (note that duplicate data
# can still occur if the data stream index is rotated).
id_key: request_id
# workers: 6
compress: gzip
# Ensure the record is passed through without adding any extra metadata.
logstash_format: off
include_tag_key: off
# Limit the on-disk buffer size.
storage.total_limit_size: <%- json_encode(config["fluent_bit"]["outputs"]["storage_total_limit_size"]) %>
# Read and report errors, increasing buffer size so more complete errors
# can be read in.
trace_error: on
# storage.total_limit_size: 4GB
# generate_id: on
# buffer_size: false
buffer_size: 16KB

# - name: s3
# match: logs denied_logs
Expand Down
8 changes: 4 additions & 4 deletions test/proxy/logging/test_basics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def test_logs_expected_fields_for_non_chunked_non_gzip
assert_equal("via_upstream", record["api_backend_response_code_details"])
assert_equal("text/plain; q=0.5, text/html", record["request_accept"])
assert_equal("compress, gzip", record["request_accept_encoding"])
assert_kind_of(Numeric, record["request_at"])
assert_match(/\A\d{13}\z/, record["request_at"].to_s)
assert_kind_of(Numeric, record["@timestamp"])
assert_match(/\A\d{13}\z/, record["@timestamp"].to_s)
assert_equal("basic-auth-username-example", record["request_basic_auth_username"])
assert_equal("close", record["request_connection"])
assert_equal("application/x-www-form-urlencoded", record["request_content_type"])
Expand Down Expand Up @@ -284,7 +284,7 @@ def test_requests_with_duplicate_query_params
assert_logged_url(url, record)
end

def test_logs_request_at_as_date
def test_logs_timestamp_as_date
response = Typhoeus.get("http://127.0.0.1:9080/api/hello", log_http_options)
assert_response_code(200, response)

Expand All @@ -295,7 +295,7 @@ def test_logs_request_at_as_date
}
result = LogItem.client.indices.get_mapping(mapping_options)

property = result[hit["_index"]]["mappings"]["properties"]["request_at"]
property = result[hit["_index"]]["mappings"]["properties"]["@timestamp"]
assert_equal({
"type" => "date",
}, property)
Expand Down
5 changes: 3 additions & 2 deletions test/support/api_umbrella_test_helpers/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ def wait_for_log(response, options = {})
end

def assert_logs_base_fields(record, user = nil)
assert_kind_of(Numeric, record["request_at"])
assert_match(/\A\d{13}\z/, record["request_at"].to_s)
assert_kind_of(Numeric, record.fetch("@timestamp"))
assert_match(/\A\d{13}\z/, record.fetch("@timestamp").to_s)
assert_match(/\A[a-z0-9]{20}\z/, record.fetch("request_id"))
assert_equal("127.0.0.1:9080", record["request_host"])
assert_match(/\A\d+\.\d+\.\d+\.\d+\z/, record["request_ip"])
assert_equal("GET", record["request_method"])
Expand Down
10 changes: 4 additions & 6 deletions test/support/api_umbrella_test_helpers/setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,10 @@ def setup_server

# Wipe opensearch indices before beginning.
#
# Note, that we don't want to wipe the current day's index that is
# setup as part of the API Umbrella startup. So we're only clearing
# these older indices for previous dates, which we'll assume our test
# suite will work with. We completely delete the indices here (rather
# than relying on LogItem.clean_indices!), so that we're sure each
# test gets fresh index template and mappings setup.
# We completely delete the indices here (rather than relying on
# LogItem.clean_indices!), so that we're sure each test gets fresh
# index template and mappings setup.
client.perform_request "DELETE", "_data_stream/api-umbrella-test-*"
client.indices.delete :index => "api-umbrella-test-*"

self.setup_complete = true
Expand Down

0 comments on commit 5ccda17

Please sign in to comment.