Skip to content

Commit

Permalink
feat: refactored some bridges to avoid leaking resources during crash…
Browse files Browse the repository at this point in the history
…es at creation
  • Loading branch information
lafirest committed May 31, 2023
1 parent e6049df commit 6081ce8
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 19 deletions.
9 changes: 6 additions & 3 deletions apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
{ingress, Ingress},
{client_opts, ClientOpts}
],
ok = emqx_resource:allocate_resource(ResourceId, ingress_pool_name, PoolName),
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of
ok ->
{ok, #{ingress_pool_name => PoolName}};
Expand Down Expand Up @@ -132,6 +133,7 @@ start_egress(ResourceId, Egress, ClientOpts) ->
{pool_size, PoolSize},
{client_opts, ClientOpts}
],
ok = emqx_resource:allocate_resource(ResourceId, egress_pool_name, PoolName),
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of
ok ->
{ok, #{
Expand All @@ -142,13 +144,14 @@ start_egress(ResourceId, Egress, ClientOpts) ->
{error, Reason}
end.

on_stop(ResourceId, State) ->
on_stop(ResourceId, _State) ->
?SLOG(info, #{
msg => "stopping_mqtt_connector",
connector => ResourceId
}),
ok = stop_ingress(State),
ok = stop_egress(State).
Allocated = emqx_resource:get_allocated_resources(ResourceId),
ok = stop_ingress(Allocated),
ok = stop_egress(Allocated).

stop_ingress(#{ingress_pool_name := PoolName}) ->
emqx_resource_pool:stop(PoolName);
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,25 @@ on_start(

Prepares = parse_prepare_sql(Config),
State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)},
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok ->
{ok, State};
Error ->
Error
end.

on_stop(InstanceId, #{pool_name := PoolName}) ->
on_stop(InstanceId, _State) ->
?SLOG(info, #{
msg => "stopping_tdengine_connector",
connector => InstanceId
}),
emqx_resource_pool:stop(PoolName).
case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State);
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,24 @@ on_start(
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined))
},
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State};
{error, Reason} -> {error, Reason}
end.

on_stop(InstId, #{pool_name := PoolName}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_http_connector",
connector => InstId
}),
ehttpc_sup:stop_pool(PoolName).
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
ehttpc_sup:stop_pool(PoolName);
_ ->
ok
end.

on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_ldap.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,23 @@ on_start(
{pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
],
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{pool_name => InstId}};
{error, Reason} -> {error, Reason}
end.

on_stop(InstId, #{pool_name := PoolName}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_ldap_connector",
connector => InstId
}),
emqx_resource_pool:stop(PoolName).
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) ->
Request = {Base, Filter, Attributes},
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_mongo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ on_start(
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
],
Collection = maps:get(collection, Config, <<"mqtt">>),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok ->
{ok, #{
Expand All @@ -194,12 +195,17 @@ on_start(
{error, Reason}
end.

on_stop(InstId, #{pool_name := PoolName}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_mongodb_connector",
connector => InstId
}),
emqx_resource_pool:stop(PoolName).
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(
InstId,
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ on_start(
]
),
State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId})};
Expand All @@ -140,12 +141,17 @@ maybe_add_password_opt(undefined, Options) ->
maybe_add_password_opt(Password, Options) ->
[{password, Password} | Options].

on_stop(InstId, #{pool_name := PoolName}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_mysql_connector",
connector => InstId
}),
emqx_resource_pool:stop(PoolName).
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_pgsql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ on_start(
{pool_size, PoolSize}
],
State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
Expand All @@ -132,12 +133,17 @@ on_start(
{error, Reason}
end.

on_stop(InstId, #{pool_name := PoolName}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping postgresql connector",
connector => InstId
}),
emqx_resource_pool:stop(PoolName).
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
Expand Down
14 changes: 10 additions & 4 deletions apps/emqx_connector/src/emqx_connector_redis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ on_start(
[{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
State = #{pool_name => InstId, type => Type},
ok = emqx_resource:allocate_resource(InstId, type, Type),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case Type of
cluster ->
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
Expand All @@ -177,14 +179,18 @@ on_start(
end
end.

on_stop(InstId, #{pool_name := PoolName, type := Type}) ->
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_redis_connector",
connector => InstId
}),
case Type of
cluster -> eredis_cluster:stop_pool(PoolName);
_ -> emqx_resource_pool:stop(PoolName)
case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName, type := cluster} ->
eredis_cluster:stop_pool(PoolName);
#{pool_name := PoolName, type := _} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.

on_query(InstId, {cmd, _} = Query, State) ->
Expand Down

0 comments on commit 6081ce8

Please sign in to comment.