Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

0517 fix bridge update timeout issue #10755

Merged
merged 7 commits into from
May 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,12 @@ t_write_failure(Config) ->
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result),
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= closed ->
ok;
_ ->
throw({unexpected, Result})
end,
ok
end),
ok.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.22"},
{vsn, "0.1.23"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_connector/src/emqx_connector_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ bridge_spec(Config) ->
id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
restart => temporary,
shutdown => 5000
shutdown => 1000
}.

-spec bridges() -> [{_Name, _Status}].
Expand Down Expand Up @@ -181,7 +181,7 @@ on_stop(_InstId, #{name := InstanceId}) ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "stop_mqtt_connector",
msg => "stop_mqtt_connector_error",
connector => InstanceId,
reason => Reason
})
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ connect(Name) ->
Error
end;
{error, Reason} = Error ->
?SLOG(error, #{
?SLOG(warning, #{
msg => "client_connect_failed",
reason => Reason
reason => Reason,
name => Name
}),
Error
end.

subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
emqtt:subscribe(ref(Ref), FromTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
Expand Down
14 changes: 7 additions & 7 deletions apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ ensure_worker_started(ResId, Idx, Opts) ->
id => ?CHILD_ID(Mod, ResId, Idx),
start => {Mod, start_link, [ResId, Idx, Opts]},
restart => transient,
shutdown => 5000,
%% if we delay shutdown, when the pool is big, it will take a long time
shutdown => brutal_kill,
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
type => worker,
modules => [Mod]
},
Expand All @@ -130,13 +131,12 @@ ensure_worker_removed(ResId, Idx) ->
ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
case supervisor:terminate_child(?SERVER, ChildId) of
ok ->
Res = supervisor:delete_child(?SERVER, ChildId),
_ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
Res;
{error, not_found} ->
_ = supervisor:delete_child(?SERVER, ChildId),
%% no need to remove worker from the pool,
%% because the entire pool will be force deleted later
ok;
{error, Reason} ->
{error, Reason}
{error, not_found} ->
ok
end.

ensure_disk_queue_dir_absent(ResourceId, Index) ->
Expand Down
27 changes: 24 additions & 3 deletions apps/emqx_resource/src/emqx_resource_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@

% State record
-record(data, {
id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
id,
group,
mod,
callback_mode,
query_mode,
config,
opts,
status,
state,
error,
pid,
extra
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
}).
-type data() :: #data{}.

Expand Down Expand Up @@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) ->
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) ->
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
ResourceManagerPid = gproc:whereis_name(?NAME(ResId)),
try
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition.
%% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long.
emqx_resource_manager_sup:delete_child(ResourceManagerPid)
end.

%% @doc Stops and then starts an instance that was already running
-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
Expand Down Expand Up @@ -439,8 +458,10 @@ health_check_actions(Data) ->
[{state_timeout, health_check_interval(Data#data.opts), health_check}].

handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data),
%% stop the buffer workers first, brutal_kill, so it should be fast
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
%% now stop the resource, this can be slow
_ = stop_resource(Data),
case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok
Expand Down
12 changes: 10 additions & 2 deletions apps/emqx_resource/src/emqx_resource_manager_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

-behaviour(supervisor).

-export([ensure_child/5]).
-export([ensure_child/5, delete_child/1]).

-export([start_link/0]).

Expand All @@ -27,6 +27,11 @@ ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
ok.

delete_child(Pid) ->
_ = supervisor:terminate_child(?MODULE, Pid),
_ = supervisor:delete_child(?MODULE, Pid),
ok.

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

Expand All @@ -36,7 +41,10 @@ init([]) ->
id => emqx_resource_manager,
start => {emqx_resource_manager, start_link, []},
restart => transient,
shutdown => brutal_kill,
%% never force kill a resource manager.
%% becasue otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop
shutdown => infinity,
type => worker,
modules => [emqx_resource_manager]
}
Expand Down
10 changes: 10 additions & 0 deletions changes/ce/fix-10755.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Fixed data bridge resource update race condition.

In the 'delete + create' process for EMQX resource updates,
long bridge creation times could cause dashboard request timeouts.
If a bridge resource update was initiated before completion of its creation,
it led to an erroneous deletion from the runtime, despite being present in the config file.

This fix addresses the race condition in bridge resource updates,
ensuring the accurate identification and addition of new resources,
maintaining consistency between runtime and configuration file statuses.