diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 6dcd243559..8f852e3918 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -747,7 +747,7 @@ schema("/source_types") -> %%------------------------------------------------------------------------------ handle_list(ConfRootKey) -> - Nodes = emqx:running_nodes(), + Nodes = nodes_supporting_bpapi_version(6), NodeReplies = emqx_bridge_proto_v6:v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey), case is_ok(NodeReplies) of {ok, NodeBridges} -> @@ -942,7 +942,7 @@ is_ok(ResL) -> %% bridge helpers -spec lookup_from_all_nodes(emqx_bridge_v2:root_cfg_key(), _, _, _) -> _. lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) -> - Nodes = emqx:running_nodes(), + Nodes = nodes_supporting_bpapi_version(6), case is_ok( emqx_bridge_proto_v6:v2_lookup_from_all_nodes_v6( @@ -959,7 +959,7 @@ lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) -> end. get_metrics_from_all_nodes(ConfRootKey, Type, Name) -> - Nodes = emqx:running_nodes(), + Nodes = nodes_supporting_bpapi_version(6), Result = maybe_unwrap( emqx_bridge_proto_v6:v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, Type, Name) ), @@ -1058,6 +1058,16 @@ supported_versions(_Call) -> bpapi_version_range(6, 6). bpapi_version_range(From, To) -> lists:seq(From, To). +nodes_supporting_bpapi_version(Vsn) -> + [ + N + || N <- emqx:running_nodes(), + case emqx_bpapi:supported_version(N, ?BPAPI_NAME) of + undefined -> false; + NVsn when is_number(NVsn) -> NVsn >= Vsn + end + ]. + maybe_unwrap({error, not_implemented}) -> {error, not_implemented}; maybe_unwrap(RpcMulticallResult) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 4f98baebf6..d56b45a172 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -106,7 +106,6 @@ -define(KAFKA_BRIDGE_UPDATE(Name, Connector), maps:without([<<"name">>, <<"type">>], ?KAFKA_BRIDGE(Name, Connector)) ). --define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?ACTION_CONNECTOR_NAME)). -define(SOURCE_TYPE_STR, "mqtt"). -define(SOURCE_TYPE, <>). @@ -1477,7 +1476,7 @@ t_cluster_later_join_metrics(Config) -> ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, - <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] }}, request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ), @@ -1512,3 +1511,47 @@ t_raw_config_response_defaults(Config) -> ) ), ok. + +t_older_version_nodes_in_cluster(matrix) -> + [ + [cluster, actions], + [cluster, sources] + ]; +t_older_version_nodes_in_cluster(Config) -> + [_, Kind | _] = group_path(Config), + PrimaryNode = ?config(node, Config), + OtherNode = maybe_get_other_node(Config), + ?assertNotEqual(OtherNode, PrimaryNode), + Name = atom_to_binary(?FUNCTION_NAME), + ?check_trace( + begin + #{api_root_key := APIRootKey} = get_common_values(Kind, Name), + erpc:call(PrimaryNode, fun() -> + meck:new(emqx_bpapi, [no_history, passthrough, no_link]), + meck:expect(emqx_bpapi, supported_version, fun(N, Api) -> + case N =:= OtherNode of + true -> 1; + false -> meck:passthrough([N, Api]) + end + end) + end), + erpc:call(OtherNode, fun() -> + meck:new(emqx_bridge_v2, [no_history, passthrough, no_link]), + meck:expect(emqx_bridge_v2, list, fun(_ConfRootKey) -> + error(should_not_be_called) + end) + end), + ?assertMatch( + {ok, 200, _}, + request_json( + get, + uri([APIRootKey]), + Config + ) + ), + ok + end, + [] + ), + + ok. diff --git a/changes/ce/fix-12472.en.md b/changes/ce/fix-12472.en.md new file mode 100644 index 0000000000..dd5cfe2f96 --- /dev/null +++ b/changes/ce/fix-12472.en.md @@ -0,0 +1 @@ +Fixed an issue that could lead to some read operations on `/api/v5/actions/` and `/api/v5/sources/` to return 500 while rolling upgrades are underway.