Skip to content

Commit

Permalink
Merge pull request #8443 from terry-xiaoyu/fix_issues
Browse files Browse the repository at this point in the history
Fix some issues
  • Loading branch information
terry-xiaoyu committed Jul 8, 2022
2 parents 166231b + 23be285 commit 1ea2fbc
Show file tree
Hide file tree
Showing 22 changed files with 167 additions and 153 deletions.
10 changes: 10 additions & 0 deletions CHANGES-5.0.md
@@ -1,3 +1,13 @@
# 5.0.4

## Bug fixes

* The `data/configs/cluster-override.conf` is cleared to 0KB if `hocon_pp:do/2` failed [commits/71f64251](https://github.com/emqx/emqx/pull/8443/commits/71f642518a683cc91a32fd542aafaac6ef915720)
* Improve the health_check for webhooks. [commits/6b45d2ea](https://github.com/emqx/emqx/commit/6b45d2ea9fde6d3b4a5b007f7a8c5a1c573d141e)
Prior to this change, the webhook only checks the connectivity of the TCP port using `gen_tcp:connect/2`, so
if it's a HTTPs server, we didn't check if TLS handshake was successful.
* The `create_at` field of rules is missing after emqx restarts. [commits/5fc09e6b](https://github.com/emqx/emqx/commit/5fc09e6b950c340243d7be627a0ce1700691221c)

# 5.0.3

## Bug fixes
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx.app.src
Expand Up @@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.3"},
{vsn, "5.0.4"},
{modules, []},
{registered, []},
{applications, [
Expand Down
11 changes: 6 additions & 5 deletions apps/emqx/src/emqx_config.erl
Expand Up @@ -508,14 +508,15 @@ get_schema_mod(RootName) ->
get_root_names() ->
maps:get(names, persistent_term:get(?PERSIS_SCHEMA_MODS, #{names => []})).

-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) ->
ok | {error, term()}.
-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) -> ok.
save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) ->
%% We first try to save to override.conf, because saving to files is more error prone
%% than saving into memory.
ok = save_to_override_conf(OverrideConf, Opts),
%% We may need also support hot config update for the apps that use application envs.
%% If that is the case uncomment the following line to update the configs to app env
%save_to_app_env(AppEnvs),
save_to_config_map(Conf, RawConf),
save_to_override_conf(OverrideConf, Opts).
%save_to_app_env(_AppEnvs),
save_to_config_map(Conf, RawConf).

-spec save_to_app_env([tuple()]) -> ok.
save_to_app_env(AppEnvs) ->
Expand Down
30 changes: 6 additions & 24 deletions apps/emqx/src/emqx_config_handler.erl
Expand Up @@ -278,22 +278,9 @@ check_and_save_configs(
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
case
save_configs(
ConfKeyPath,
AppEnvs,
NewConf,
NewRawConf,
OverrideConf,
UpdateArgs,
Opts
)
of
{ok, Result1} ->
{ok, Result1#{post_config_update => Result0}};
Error ->
Error
end;
ok = emqx_config:save_configs(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts),
Result1 = return_change_result(ConfKeyPath, UpdateArgs),
{ok, Result1#{post_config_update => Result0}};
Error ->
Error
end.
Expand Down Expand Up @@ -432,12 +419,6 @@ call_post_config_update(
) ->
{ok, Result}.

save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)};
{error, Reason} -> {error, {save_configs, Reason}}
end.

%% The default callback of config handlers
%% the behaviour is overwriting the old config if:
%% 1. the old config is undefined
Expand All @@ -452,8 +433,9 @@ merge_to_old_config(UpdateReq, _RawConf) ->
%% local-override.conf priority is higher than cluster-override.conf
%% If we want cluster to take effect, we must remove the local.
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
_ = emqx_config:save_to_override_conf(Local, Opts),
Opts1 = Opts#{override_to => local},
Local = remove_from_override_config(BinKeyPath, Opts1),
_ = emqx_config:save_to_override_conf(Local, Opts1),
ok;
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
ok.
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authn/src/emqx_authn.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},
Expand Down
14 changes: 8 additions & 6 deletions apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl
Expand Up @@ -71,15 +71,17 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
ok.

on_get_status(_InstId, #{pool_name := PoolName}) ->
emqx_plugin_libs_pool:get_status(
PoolName,
fun(Pid) ->
case emqx_authn_jwks_client:get_jwks(Pid) of
Func =
fun(Conn) ->
case emqx_authn_jwks_client:get_jwks(Conn) of
{ok, _} -> true;
_ -> false
end
end
).
end,
case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of
true -> connected;
false -> disconnected
end.

connect(Opts) ->
ConnectorOpts = proplists:get_value(connector_opts, Opts),
Expand Down
19 changes: 10 additions & 9 deletions apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl
Expand Up @@ -71,16 +71,17 @@ end_per_testcase(_Case, Config) ->
Config.

listener_mqtt_tcp_conf(Port, EnableAuthn) ->
PortS = integer_to_binary(Port),
#{
acceptors => 16,
zone => default,
access_rules => ["allow all"],
bind => {{0, 0, 0, 0}, Port},
max_connections => 1024000,
mountpoint => <<>>,
proxy_protocol => false,
proxy_protocol_timeout => 3000,
enable_authn => EnableAuthn
<<"acceptors">> => 16,
<<"zone">> => <<"default">>,
<<"access_rules">> => ["allow all"],
<<"bind">> => <<"0.0.0.0:", PortS/binary>>,
<<"max_connections">> => 1024000,
<<"mountpoint">> => <<>>,
<<"proxy_protocol">> => false,
<<"proxy_protocol_timeout">> => 3000,
<<"enable_authn">> => EnableAuthn
}.

t_enable_authn(_Config) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
Expand Down
45 changes: 30 additions & 15 deletions apps/emqx_connector/src/emqx_connector_http.erl
Expand Up @@ -309,27 +309,42 @@ on_query(
end,
Result.

on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
case do_get_status(Host, Port, Timeout) of
ok ->
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
true ->
connected;
{error, Reason} ->
false ->
?SLOG(error, #{
msg => "http_connector_get_status_failed",
reason => Reason,
host => Host,
port => Port
state => State
}),
{disconnected, State, Reason}
disconnected
end.

do_get_status(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
do_get_status(PoolName, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
true;
{error, Reason} ->
?SLOG(error, #{
msg => "ehttpc_health_check_failed",
reason => Reason,
worker => Worker
}),
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.

%%--------------------------------------------------------------------
Expand Down
59 changes: 18 additions & 41 deletions apps/emqx_connector/src/emqx_connector_mongo.erl
Expand Up @@ -240,52 +240,29 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
end.

health_check(PoolName) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
try
emqx_misc:pmap(
fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
)
of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.
emqx_plugin_libs_pool:health_check_ecpool_workers(
PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
).

%% ===================================================================

check_worker_health(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Conn} ->
%% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
worker => Worker,
reason => Reason
}),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
worker => Worker,
class => Class,
error => Error
}),
false
end;
_ ->
check_worker_health(Conn) ->
%% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
worker => Worker,
reason => worker_not_found
reason => Reason
}),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
class => Class,
error => Error
}),
false
end.
Expand Down
20 changes: 9 additions & 11 deletions apps/emqx_connector/src/emqx_connector_mysql.erl
Expand Up @@ -169,9 +169,9 @@ on_query(
mysql_function(sql) -> query;
mysql_function(prepared_query) -> execute.

on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of
connected ->
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
connected;
Expand All @@ -180,15 +180,10 @@ on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = S
{connected, NState};
{error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn
case AutoReconn of
true ->
connecting;
false ->
disconnected
end
conn_status(AutoReconn)
end;
ConnectStatus ->
ConnectStatus
false ->
conn_status(AutoReconn)
end.

do_get_status(Conn) ->
Expand All @@ -207,6 +202,9 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
end.

%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.

reconn_interval(true) -> 15;
reconn_interval(false) -> false.

Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_connector/src/emqx_connector_pgsql.erl
Expand Up @@ -139,13 +139,19 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
end,
Result.

on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> connected;
false -> conn_status(AutoReconn)
end.

do_get_status(Conn) ->
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).

%% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.

reconn_interval(true) -> 15;
reconn_interval(false) -> false.

Expand Down
5 changes: 3 additions & 2 deletions apps/emqx_connector/src/emqx_connector_redis.erl
Expand Up @@ -225,8 +225,9 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect :
false ->
disconnect
end;
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn).

do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"},
{vsn, "4.3.1"},
{vsn, "4.3.2"},
{modules, []},
{applications, [kernel, stdlib]},
{env, []}
Expand Down

0 comments on commit 1ea2fbc

Please sign in to comment.