Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some issues #8443

Merged
merged 6 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.3"},
{vsn, "5.0.4"},
{modules, []},
{registered, []},
{applications, [
Expand Down
11 changes: 6 additions & 5 deletions apps/emqx/src/emqx_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,15 @@ get_schema_mod(RootName) ->
get_root_names() ->
maps:get(names, persistent_term:get(?PERSIS_SCHEMA_MODS, #{names => []})).

-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) ->
ok | {error, term()}.
-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) -> ok.
save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) ->
%% We first try to save to override.conf, because saving to files is more error prone
%% than saving into memory.
ok = save_to_override_conf(OverrideConf, Opts),
%% We may need also support hot config update for the apps that use application envs.
%% If that is the case uncomment the following line to update the configs to app env
%save_to_app_env(AppEnvs),
save_to_config_map(Conf, RawConf),
save_to_override_conf(OverrideConf, Opts).
%save_to_app_env(_AppEnvs),
save_to_config_map(Conf, RawConf).

-spec save_to_app_env([tuple()]) -> ok.
save_to_app_env(AppEnvs) ->
Expand Down
30 changes: 6 additions & 24 deletions apps/emqx/src/emqx_config_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,9 @@ check_and_save_configs(
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
case
save_configs(
ConfKeyPath,
AppEnvs,
NewConf,
NewRawConf,
OverrideConf,
UpdateArgs,
Opts
)
of
{ok, Result1} ->
{ok, Result1#{post_config_update => Result0}};
Error ->
Error
end;
ok = emqx_config:save_configs(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts),
Result1 = return_change_result(ConfKeyPath, UpdateArgs),
{ok, Result1#{post_config_update => Result0}};
Error ->
Error
end.
Expand Down Expand Up @@ -432,12 +419,6 @@ call_post_config_update(
) ->
{ok, Result}.

save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)};
{error, Reason} -> {error, {save_configs, Reason}}
end.

%% The default callback of config handlers
%% the behaviour is overwriting the old config if:
%% 1. the old config is undefined
Expand All @@ -452,8 +433,9 @@ merge_to_old_config(UpdateReq, _RawConf) ->
%% local-override.conf priority is higher than cluster-override.conf
%% If we want cluster to take effect, we must remove the local.
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
_ = emqx_config:save_to_override_conf(Local, Opts),
Opts1 = Opts#{override_to => local},
Local = remove_from_override_config(BinKeyPath, Opts1),
_ = emqx_config:save_to_override_conf(Local, Opts1),
ok;
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
ok.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authn/src/emqx_authn.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},
Expand Down
14 changes: 8 additions & 6 deletions apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
ok.

on_get_status(_InstId, #{pool_name := PoolName}) ->
emqx_plugin_libs_pool:get_status(
PoolName,
fun(Pid) ->
case emqx_authn_jwks_client:get_jwks(Pid) of
Func =
fun(Conn) ->
case emqx_authn_jwks_client:get_jwks(Conn) of
{ok, _} -> true;
_ -> false
end
end
).
end,
case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of
true -> connected;
false -> disconnected
end.

connect(Opts) ->
ConnectorOpts = proplists:get_value(connector_opts, Opts),
Expand Down
19 changes: 10 additions & 9 deletions apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,17 @@ end_per_testcase(_Case, Config) ->
Config.

listener_mqtt_tcp_conf(Port, EnableAuthn) ->
PortS = integer_to_binary(Port),
#{
acceptors => 16,
zone => default,
access_rules => ["allow all"],
bind => {{0, 0, 0, 0}, Port},
max_connections => 1024000,
mountpoint => <<>>,
proxy_protocol => false,
proxy_protocol_timeout => 3000,
enable_authn => EnableAuthn
<<"acceptors">> => 16,
<<"zone">> => <<"default">>,
<<"access_rules">> => ["allow all"],
<<"bind">> => <<"0.0.0.0:", PortS/binary>>,
<<"max_connections">> => 1024000,
<<"mountpoint">> => <<>>,
<<"proxy_protocol">> => false,
<<"proxy_protocol_timeout">> => 3000,
<<"enable_authn">> => EnableAuthn
}.

t_enable_authn(_Config) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
45 changes: 30 additions & 15 deletions apps/emqx_connector/src/emqx_connector_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -309,27 +309,42 @@ on_query(
end,
Result.

on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
case do_get_status(Host, Port, Timeout) of
ok ->
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
true ->
connected;
{error, Reason} ->
false ->
?SLOG(error, #{
msg => "http_connector_get_status_failed",
reason => Reason,
host => Host,
port => Port
state => State
}),
{disconnected, State, Reason}
disconnected
end.

do_get_status(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
do_get_status(PoolName, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
true;
{error, Reason} ->
?SLOG(error, #{
msg => "ehttpc_health_check_failed",
reason => Reason,
worker => Worker
}),
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.

%%--------------------------------------------------------------------
Expand Down
59 changes: 18 additions & 41 deletions apps/emqx_connector/src/emqx_connector_mongo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,52 +240,29 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
end.

health_check(PoolName) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
try
emqx_misc:pmap(
fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
)
of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.
emqx_plugin_libs_pool:health_check_ecpool_workers(
PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
).

%% ===================================================================

check_worker_health(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Conn} ->
%% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
worker => Worker,
reason => Reason
}),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
worker => Worker,
class => Class,
error => Error
}),
false
end;
_ ->
check_worker_health(Conn) ->
%% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
worker => Worker,
reason => worker_not_found
reason => Reason
}),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
class => Class,
error => Error
}),
false
end.
Expand Down
20 changes: 9 additions & 11 deletions apps/emqx_connector/src/emqx_connector_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ on_query(
mysql_function(sql) -> query;
mysql_function(prepared_query) -> execute.

on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of
connected ->
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
connected;
Expand All @@ -180,15 +180,10 @@ on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = S
{connected, NState};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
case AutoReconn of
true ->
connecting;
false ->
disconnected
end
conn_status(AutoReconn)
end;
ConnectStatus ->
ConnectStatus
false ->
conn_status(AutoReconn)
end.

do_get_status(Conn) ->
Expand All @@ -207,6 +202,9 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
end.

%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.

reconn_interval(true) -> 15;
reconn_interval(false) -> false.

Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_pgsql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,19 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
end,
Result.

on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> connected;
false -> conn_status(AutoReconn)
end.

do_get_status(Conn) ->
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).

%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.

reconn_interval(true) -> 15;
reconn_interval(false) -> false.

Expand Down
5 changes: 3 additions & 2 deletions apps/emqx_connector/src/emqx_connector_redis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect :
false ->
disconnect
end;
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn).

do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"},
{vsn, "4.3.1"},
{vsn, "4.3.2"},
{modules, []},
{applications, [kernel, stdlib]},
{env, []}
Expand Down