Skip to content

Commit

Permalink
Merge pull request #10743 from thalesmg/fix-bridge-metric-aggregation…
Browse files Browse the repository at this point in the history
…-r50

fix(bridge_api): don't crash when formatting empty/unknown bridge metrics
  • Loading branch information
thalesmg committed May 19, 2023
2 parents 28dda39 + 09ea2e2 commit 8cfcc8f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 4 deletions.
8 changes: 7 additions & 1 deletion apps/emqx/test/emqx_common_test_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,17 @@ emqx_cluster(Specs0, CommonOpts) ->
]),
%% Set the default node of the cluster:
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
JoinTo =
JoinTo0 =
case CoreNodes of
[First | _] -> First;
_ -> undefined
end,
JoinTo =
case maps:find(join_to, CommonOpts) of
{ok, true} -> JoinTo0;
{ok, JT} -> JT;
error -> JoinTo0
end,
[
{Name,
merge_opts(Opts, #{
Expand Down
23 changes: 23 additions & 0 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,29 @@ format_metrics(#{
Rate5m,
RateMax,
Rcvd
);
format_metrics(_Metrics) ->
%% Empty metrics: can happen when a node joins another and a
%% bridge is not yet replicated to it, so the counters map is
%% empty.
?METRICS(
_Dropped = 0,
_DroppedOther = 0,
_DroppedExpired = 0,
_DroppedQueueFull = 0,
_DroppedResourceNotFound = 0,
_DroppedResourceStopped = 0,
_Matched = 0,
_Queued = 0,
_Retried = 0,
_LateReply = 0,
_SentFailed = 0,
_SentInflight = 0,
_SentSucc = 0,
_Rate = 0,
_Rate5m = 0,
_RateMax = 0,
_Rcvd = 0
).

fill_defaults(Type, RawConf) ->
Expand Down
66 changes: 63 additions & 3 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,22 @@
all() ->
[
{group, single},
{group, cluster_later_join},
{group, cluster}
].

groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [
t_broken_bpapi_vsn,
t_old_bpapi_vsn,
t_bridges_probe
],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[
{single, [], emqx_common_test_helpers:all(?MODULE)},
{cluster, [], emqx_common_test_helpers:all(?MODULE) -- SingleOnlyTests}
{single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
{cluster_later_join, [], ClusterLaterJoinOnlyTCs},
{cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
].

suite() ->
Expand All @@ -104,13 +108,27 @@ init_per_group(cluster, Config) ->
ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
_ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
[{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
init_per_group(cluster_later_join, Config) ->
Cluster = mk_cluster_specs(Config, #{join_to => undefined}),
ct:pal("Starting ~p", [Cluster]),
Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
[NodePrimary | NodesRest] = Nodes,
ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
_ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
[{group, cluster_later_join}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
init_per_group(_, Config) ->
ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS),
ok = load_suite_config(emqx_rule_engine),
ok = load_suite_config(emqx_bridge),
[{group, single}, {api_node, node()} | Config].

mk_cluster_specs(Config) ->
mk_cluster_specs(Config, #{}).

mk_cluster_specs(Config, Opts) ->
Specs = [
{core, emqx_bridge_api_SUITE1, #{}},
{core, emqx_bridge_api_SUITE2, #{}}
Expand All @@ -132,6 +150,7 @@ mk_cluster_specs(Config) ->
load_apps => ?SUITE_APPS ++ [emqx_dashboard],
env_handler => fun load_suite_config/1,
load_schema => false,
join_to => maps:get(join_to, Opts, true),
priv_data_dir => ?config(priv_dir, Config)
},
emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts).
Expand Down Expand Up @@ -164,7 +183,10 @@ load_suite_config(emqx_bridge) ->
load_suite_config(_) ->
ok.

end_per_group(cluster, Config) ->
end_per_group(Group, Config) when
Group =:= cluster;
Group =:= cluster_later_join
->
ok = lists:foreach(
fun(Node) ->
_ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]),
Expand Down Expand Up @@ -1298,6 +1320,44 @@ t_inconsistent_webhook_request_timeouts(Config) ->
validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
ok.

t_cluster_later_join_metrics(Config) ->
Port = ?config(port, Config),
APINode = ?config(api_node, Config),
ClusterNodes = ?config(cluster_nodes, Config),
[OtherNode | _] = ClusterNodes -- [APINode],
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeParams = ?HTTP_BRIDGE(URL1, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
?check_trace(
begin
%% Create a bridge on only one of the nodes.
?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
%% Pre-condition.
?assertMatch(
{ok, 200, #{
<<"metrics">> := #{<<"success">> := _},
<<"node_metrics">> := [_ | _]
}},
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
),
%% Now join the other node join with the api node.
ok = erpc:call(OtherNode, ekka, join, [APINode]),
%% Check metrics; shouldn't crash even if the bridge is not
%% ready on the node that just joined the cluster.
?assertMatch(
{ok, 200, #{
<<"metrics">> := #{<<"success">> := _},
<<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
}},
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
),
ok
end,
[]
),
ok.

validate_resource_request_timeout(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
Expand Down
1 change: 1 addition & 0 deletions changes/ce/fix-10743.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes an issue where trying to get a bridge info or metrics could result in a crash when a node is joining a cluster.

0 comments on commit 8cfcc8f

Please sign in to comment.