Skip to content

Commit

Permalink
Merge pull request #9938 from keynslug/feat/mqtt-bridge-async-errors
Browse files Browse the repository at this point in the history
feat(mqtt-bridge): report recoverable errors of async queries
  • Loading branch information
keynslug committed Feb 10, 2023
2 parents 2ed54e5 + 1b19541 commit fe450ca
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
21 changes: 20 additions & 1 deletion apps/emqx_connector/src/emqx_connector_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
on_get_status/2
]).

-export([on_async_result/2]).

-behaviour(hocon_schema).

-import(hoconsc, [mk/2]).
Expand Down Expand Up @@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
classify_error(Reason)
end.

on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
Callback = {fun on_async_result/2, [CallbackIn]},
case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
ok ->
ok;
Expand All @@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
classify_error(Reason)
end.

on_async_result(Callback, ok) ->
apply_callback_function(Callback, ok);
on_async_result(Callback, {ok, _} = Ok) ->
apply_callback_function(Callback, Ok);
on_async_result(Callback, {error, Reason}) ->
apply_callback_function(Callback, classify_error(Reason)).

apply_callback_function(F, Result) when is_function(F) ->
erlang:apply(F, [Result]);
apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
erlang:apply(F, A ++ [Result]);
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
erlang:apply(M, F, A ++ [Result]).

on_get_status(_InstId, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:status(InstanceId).

Expand All @@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) ->
{error, {recoverable_error, Reason}};
classify_error({shutdown, _} = Reason) ->
{error, {recoverable_error, Reason}};
classify_error(shutdown = Reason) ->
{error, {recoverable_error, Reason}};
classify_error(Reason) ->
{error, {unrecoverable_error, Reason}}.

Expand Down
1 change: 1 addition & 0 deletions changes/v5.0.17/fix-9938.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Report some egress MQTT bridge errors as recoverable, and thus retryable.
1 change: 1 addition & 0 deletions changes/v5.0.17/fix-9938.zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
将一些出口 MQTT 网桥错误报告为可恢复,因此可重试。

0 comments on commit fe450ca

Please sign in to comment.