Skip to content

Commit

Permalink
Merge pull request #10755 from zmstone/0517-fix-bridge-update-timeout…
Browse files Browse the repository at this point in the history
…-issue

0517 fix bridge update timeout issue
  • Loading branch information
zmstone committed May 20, 2023
2 parents bb4fef3 + cb76e5a commit 3e98b3b
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,12 @@ t_write_failure(Config) ->
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result),
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= closed ->
ok;
_ ->
throw({unexpected, Result})
end,
ok
end),
ok.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.22"},
{vsn, "0.1.23"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_connector/src/emqx_connector_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ bridge_spec(Config) ->
id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
restart => temporary,
shutdown => 5000
shutdown => 1000
}.

-spec bridges() -> [{_Name, _Status}].
Expand Down Expand Up @@ -181,7 +181,7 @@ on_stop(_InstId, #{name := InstanceId}) ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "stop_mqtt_connector",
msg => "stop_mqtt_connector_error",
connector => InstanceId,
reason => Reason
})
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ connect(Name) ->
Error
end;
{error, Reason} = Error ->
?SLOG(error, #{
?SLOG(warning, #{
msg => "client_connect_failed",
reason => Reason
reason => Reason,
name => Name
}),
Error
end.

subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
emqtt:subscribe(ref(Ref), FromTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
Expand Down
14 changes: 7 additions & 7 deletions apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ ensure_worker_started(ResId, Idx, Opts) ->
id => ?CHILD_ID(Mod, ResId, Idx),
start => {Mod, start_link, [ResId, Idx, Opts]},
restart => transient,
shutdown => 5000,
%% if we delay shutdown, when the pool is big, it will take a long time
shutdown => brutal_kill,
type => worker,
modules => [Mod]
},
Expand All @@ -130,13 +131,12 @@ ensure_worker_removed(ResId, Idx) ->
ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
case supervisor:terminate_child(?SERVER, ChildId) of
ok ->
Res = supervisor:delete_child(?SERVER, ChildId),
_ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
Res;
{error, not_found} ->
_ = supervisor:delete_child(?SERVER, ChildId),
%% no need to remove worker from the pool,
%% because the entire pool will be force deleted later
ok;
{error, Reason} ->
{error, Reason}
{error, not_found} ->
ok
end.

ensure_disk_queue_dir_absent(ResourceId, Index) ->
Expand Down
27 changes: 24 additions & 3 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@

% State record
-record(data, {
id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
id,
group,
mod,
callback_mode,
query_mode,
config,
opts,
status,
state,
error,
pid,
extra
}).
-type data() :: #data{}.

Expand Down Expand Up @@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) ->
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) ->
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
ResourceManagerPid = gproc:whereis_name(?NAME(ResId)),
try
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition.
%% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long.
emqx_resource_manager_sup:delete_child(ResourceManagerPid)
end.

%% @doc Stops and then starts an instance that was already running
-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
Expand Down Expand Up @@ -439,8 +458,10 @@ health_check_actions(Data) ->
[{state_timeout, health_check_interval(Data#data.opts), health_check}].

handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data),
%% stop the buffer workers first, brutal_kill, so it should be fast
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
%% now stop the resource, this can be slow
_ = stop_resource(Data),
case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok
Expand Down
12 changes: 10 additions & 2 deletions apps/emqx_resource/src/emqx_resource_manager_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

-behaviour(supervisor).

-export([ensure_child/5]).
-export([ensure_child/5, delete_child/1]).

-export([start_link/0]).

Expand All @@ -27,6 +27,11 @@ ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
ok.

delete_child(Pid) ->
_ = supervisor:terminate_child(?MODULE, Pid),
_ = supervisor:delete_child(?MODULE, Pid),
ok.

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

Expand All @@ -36,7 +41,10 @@ init([]) ->
id => emqx_resource_manager,
start => {emqx_resource_manager, start_link, []},
restart => transient,
shutdown => brutal_kill,
%% never force kill a resource manager.
%% becasue otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop
shutdown => infinity,
type => worker,
modules => [emqx_resource_manager]
}
Expand Down
10 changes: 10 additions & 0 deletions changes/ce/fix-10755.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Fixed data bridge resource update race condition.

In the 'delete + create' process for EMQX resource updates,
long bridge creation times could cause dashboard request timeouts.
If a bridge resource update was initiated before completion of its creation,
it led to an erroneous deletion from the runtime, despite being present in the config file.

This fix addresses the race condition in bridge resource updates,
ensuring the accurate identification and addition of new resources,
maintaining consistency between runtime and configuration file statuses.

0 comments on commit 3e98b3b

Please sign in to comment.