Skip to content

Commit

Permalink
More fixes for new opensearch analytics setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
GUI committed Feb 12, 2024
1 parent 2095808 commit a148d7e
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 30 deletions.
15 changes: 12 additions & 3 deletions src/api-umbrella/web-app/actions/admin/stats.lua
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ local function fetch_city_locations(buckets, country, region)

local city_names = {}
for _, bucket in ipairs(buckets) do
table.insert(city_names, bucket["key"])
table.insert(city_names, string.lower(bucket["key"]))
end

local conditions = {}
Expand All @@ -185,23 +185,32 @@ local function fetch_city_locations(buckets, country, region)
table.insert(conditions, "region = " .. db.escape_literal(region))
end
if not is_empty(city_names) then
table.insert(conditions, "city IN " .. db.escape_literal(db.list(city_names)))
table.insert(conditions, "lower(city) IN " .. db.escape_literal(db.list(city_names)))
end

local cities = AnalyticsCity:select("WHERE " .. table.concat(conditions, " AND "), {
fields = "city, location[0] AS lon, location[1] AS lat",
})

local data = {}
local city_names = {}
for _, city in ipairs(cities) do
if city.city then
city_names[string.lower(city.city)] = city.city
data[city.city] = {
lat = city.lat,
lon = city.lon,
}
end
end

for _, bucket in ipairs(buckets) do
local city_name = city_names[bucket["key"]]
if city_name then
bucket["key"] = city_name
end
end

return data
end

Expand Down Expand Up @@ -431,7 +440,7 @@ function _M.logs(self)
row["_type"] = nil
row["_score"] = nil
row["_index"] = nil
row["request_at"] = hit["@timestamp"]
row["request_at"] = row["@timestamp"]
row["@timestamp"] = nil
row["request_url"] = sanitized_url_path_and_query(row)
row["request_url_query"] = strip_api_key_from_query(row["request_url_query"])
Expand Down
65 changes: 43 additions & 22 deletions src/api-umbrella/web-app/models/analytics_search_opensearch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ local xpcall_error_handler = require "api-umbrella.utils.xpcall_error_handler"

local opensearch_query = opensearch.query

local date_utc = icu_date.new({
zone_id = "UTC"
})
local date_tz = icu_date.new({
zone_id = config["analytics"]["timezone"],
})
Expand All @@ -39,36 +36,59 @@ local UPPERCASE_FIELDS = {
local _M = {}
_M.__index = _M

local function index_names(start_time, end_time, body)
local names = {
config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-allowed",
config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-errored",
}
local function index_names(body)
local names = {}

local only_denied = false
local exclude_denied = false
local filters = body["query"]["bool"]["filter"]["bool"]["must"]
for _, filter in ipairs(filters) do
if filter["bool"] and filter["bool"]["must"] then
local sub_filters = filter["bool"]["must"]
for _, sub_filter in ipairs(sub_filters) do
if sub_filter["exists"] and sub_filter["exists"]["field"] == "gatekeeper_denied_code" then
only_denied = true
break
end

if sub_filter["term"] and sub_filter["term"]["gatekeeper_denied_code"] then
only_denied = true
break
end

if sub_filter["bool"] and sub_filter["bool"]["must_not"] and sub_filter["bool"]["must_not"]["exists"] and sub_filter["bool"]["must_not"]["exists"]["field"] == "gatekeeper_denied_code" then
exclude_denied = true
break
end
end

if exclude_denied then
if only_denied or exclude_denied then
break
end
end
end

if not only_denied then
table.insert(names, config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-allowed")
table.insert(names, config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-errored")
end

if not exclude_denied then
table.insert(names, config["opensearch"]["index_name_prefix"] .. "-logs-v" .. config["opensearch"]["template_version"] .."-denied")
end

return names
end

local function translate_db_field_name(field)
local db_field = field
if field == "request_at" then
db_field = "@timestamp"
end

return db_field
end

local function parse_query_builder(query)
local query_filter
if not is_empty(query) then
Expand All @@ -78,6 +98,7 @@ local function parse_query_builder(query)
local operator = rule["operator"]
local field = rule["field"]
local value = rule["value"]
local db_field = translate_db_field_name(field)

if not CASE_SENSITIVE_FIELDS[field] and type(value) == "string" then
if UPPERCASE_FIELDS[field] then
Expand All @@ -90,63 +111,63 @@ local function parse_query_builder(query)
if operator == "equal" or operator == "not_equal" then
filter = {
term = {
[field] = value,
[db_field] = value,
},
}
elseif operator == "begins_with" or operator == "not_begins_with" then
filter = {
prefix = {
[field] = value,
[db_field] = value,
},
}
elseif operator == "contains" or operator == "not_contains" then
filter = {
regexp = {
[field] = ".*" .. escape_regex(value) .. ".*",
[db_field] = ".*" .. escape_regex(value) .. ".*",
},
}
elseif operator == "is_null" or operator == "is_not_null" then
filter = {
exists = {
field = field,
field = db_field,
},
}
elseif operator == "less" then
filter = {
range = {
[field] = {
[db_field] = {
lt = tonumber(value),
},
},
}
elseif operator == "less_or_equal" then
filter = {
range = {
[field] = {
[db_field] = {
lte = tonumber(value),
},
},
}
elseif operator == "greater" then
filter = {
range = {
[field] = {
[db_field] = {
gt = tonumber(value),
},
},
}
elseif operator == "greater_or_equal" then
filter = {
range = {
[field] = {
[db_field] = {
gte = tonumber(value),
},
},
}
elseif operator == "between" then
filter = {
range = {
[field] = {
[db_field] = {
gte = tonumber(value[1]),
lte = tonumber(value[2]),
},
Expand Down Expand Up @@ -224,10 +245,10 @@ function _M:set_sort(order_fields)
if not is_empty(order_fields) then
self.body["sort"] = {}
for _, order_field in ipairs(order_fields) do
local column_name = order_field[1]
local db_field = translate_db_field_name(order_field[1])
local dir = order_field[2]
if not is_empty(column_name) and not is_empty(dir) then
table.insert(self.body["sort"], { [column_name] = string.lower(dir) })
if not is_empty(db_field) and not is_empty(dir) then
table.insert(self.body["sort"], { [db_field] = string.lower(dir) })
end
end
end
Expand Down Expand Up @@ -616,7 +637,7 @@ end

function _M:query_header()
local header = deepcopy(self.query)
header["index"] = table.concat(index_names(self.start_time, self.end_time, self.body), ",")
header["index"] = table.concat(index_names(self.body), ",")

return header
end
Expand Down
4 changes: 2 additions & 2 deletions tasks/deps/fluent-bit
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ cmake \
-DFLB_PROCESSOR_ATTRIBUTES="Off" \
-DFLB_PROCESSOR_LABELS="Off" \
../
make
make install
make -j"$NPROC"
make install DESTDIR="$STAGE_DIR"

stamp
39 changes: 38 additions & 1 deletion test/apis/admin/stats/test_search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,44 @@ def test_no_results_non_existent_indices
assert_response_code(200, response)
data = MultiJson.load(response.body)
assert_equal({
"hits_over_time" => [],
"hits_over_time" => [
{
"c" => [
{ "f" => "Thu, Jan 13, 2000", "v" => 947746800000 },
{ "f" => "0", "v" => 0 },
],
},
{
"c" => [
{ "f" => "Fri, Jan 14, 2000", "v" => 947833200000 },
{ "f" => "0", "v" => 0 },
],
},
{
"c" => [
{ "f" => "Sat, Jan 15, 2000", "v" => 947919600000 },
{ "f" => "0", "v" => 0 },
],
},
{
"c" => [
{ "f" => "Sun, Jan 16, 2000", "v" => 948006000000 },
{ "f" => "0", "v" => 0 },
],
},
{
"c" => [
{ "f" => "Mon, Jan 17, 2000", "v" => 948092400000 },
{ "f" => "0", "v" => 0 }
],
},
{
"c" => [
{ "f" => "Tue, Jan 18, 2000", "v" => 948178800000 },
{ "f" => "0", "v" => 0 },
],
},
],
"stats" => {
"total_users" => 0,
"total_ips" => 0,
Expand Down
Loading

0 comments on commit a148d7e

Please sign in to comment.