Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ip_port schema type crash #11077

Merged
merged 5 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
Expand Down
32 changes: 16 additions & 16 deletions apps/emqx/src/emqx_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ base_listener(Bind) ->
)},
{"bind",
sc(
hoconsc:union([ip_port(), integer()]),
ip_port(),
#{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove integer: ip_port() also support setting as 1883.

default => Bind,
required => true,
Expand Down Expand Up @@ -2418,21 +2418,21 @@ mk_duration(Desc, OverrideMeta) ->
to_duration(Str) ->
case hocon_postprocess:duration(Str) of
I when is_integer(I) -> {ok, I};
_ -> {error, Str}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hocon 0.39.9 has fixed a issue: typerelf:from_string conversion is not executed when value is integer emqx/hocon#259
In order to be compatible with the previous format.

%% {error, "1"} = hocon_postprocess:duration("1")
%% we should ensure:
to_duration("12") -> 12.
to_duration_s("12") -> 12s
to_duration_ms("12") -> 12ms

to_timeout_duration("12") -> 12
to_timeout_duration_ms("12") ->  12ms
to_timeout_duration_s("12") -> 12s

_ -> to_integer(Str)
end.

to_duration_s(Str) ->
case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I / 1000)};
_ -> {error, Str}
_ -> to_integer(Str)
end.

-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
Input :: string() | binary().
to_duration_ms(Str) ->
case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I)};
_ -> {error, Str}
_ -> to_integer(Str)
end.

-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
Expand Down Expand Up @@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) ->
to_bytesize(Str) ->
case hocon_postprocess:bytesize(Str) of
I when is_integer(I) -> {ok, I};
_ -> {error, Str}
_ -> to_integer(Str)
end.

to_wordsize(Str) ->
Expand All @@ -2483,6 +2483,13 @@ to_wordsize(Str) ->
Error -> Error
end.

to_integer(Str) ->
case string:to_integer(Str) of
{Int, []} -> {ok, Int};
{Int, <<>>} -> {ok, Int};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have another function clause to_integer(Str) when is_binary(Str) -> ... to make this case clause reachable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I will remove is_list

_ -> {error, Str}
end.

to_percent(Str) ->
{ok, hocon_postprocess:percent(Str)}.

Expand Down Expand Up @@ -2525,9 +2532,9 @@ to_ip_port(Str) ->
case split_ip_port(Str) of
{"", Port} ->
%% this is a local address
{ok, list_to_integer(Port)};
{ok, parse_port(Port)};
{MaybeIp, Port} ->
PortVal = list_to_integer(Port),
PortVal = parse_port(Port),
case inet:parse_address(MaybeIp) of
{ok, IpTuple} ->
{ok, {IpTuple, PortVal}};
Expand All @@ -2543,18 +2550,11 @@ split_ip_port(Str0) ->
case lists:split(string:rchr(Str, $:), Str) of
%% no colon
{[], Str} ->
try
%% if it's just a port number, then return as-is
_ = list_to_integer(Str),
{"", Str}
catch
_:_ ->
error
end;
{"", Str};
{IpPlusColon, PortString} ->
IpStr0 = lists:droplast(IpPlusColon),
case IpStr0 of
%% dropp head/tail brackets
%% drop head/tail brackets
[$[ | S] ->
case lists:last(S) of
$] -> {lists:droplast(S), PortString};
Expand Down
21 changes: 14 additions & 7 deletions apps/emqx/test/emqx_crl_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,24 @@ t_update_config(_Config) ->
emqx_config_handler:start_link(),
{ok, Pid} = emqx_crl_cache:start_link(),
Conf = #{
refresh_interval => timer:minutes(5),
http_timeout => timer:minutes(10),
refresh_interval => <<"5m">>,
http_timeout => <<"10m">>,
capacity => 123
},
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
State = sys:get_state(Pid),
?assertEqual(Conf, #{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}),
?assertEqual(
#{
refresh_interval => timer:minutes(5),
http_timeout => timer:minutes(10),
capacity => 123
},
#{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}
),
emqx_config:erase(<<"crl_cache">>),
emqx_config_handler:stop(),
ok.
Expand Down
16 changes: 8 additions & 8 deletions apps/emqx/test/emqx_flapping_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ init_per_suite(Config) ->
<<"enable">> => true,
<<"max_count">> => 3,
% 0.1s
<<"window_time">> => 100,
<<"window_time">> => <<"100ms">>,
%% 2s
<<"ban_time">> => "2s"
<<"ban_time">> => <<"2s">>
}
),
Config.
Expand Down Expand Up @@ -119,16 +119,16 @@ t_conf_update(_) ->
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),

Zones = #{
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}},
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}}
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}},
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}}
},
?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
%% new_zone is already deleted
?assertError({config_not_found, _}, get_policy(new_zone)),
%% update zone(zone_1) has default.
?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)),
?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)),
%% create zone(zone_2) has default
?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)),
?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)),
%% reset to default(empty) andalso get default from global
?assertMatch({ok, _}, emqx:update_config([zones], #{})),
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
Expand Down Expand Up @@ -172,13 +172,13 @@ validate_timer(Lists) ->
ok.

t_window_compatibility_check(_Conf) ->
Flapping = emqx:get_config([flapping_detect]),
Flapping = emqx:get_raw_config([flapping_detect]),
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
%% reset
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
ok = emqx_config:init_load(emqx_schema, FlappingBin),
?assertEqual(Flapping, emqx:get_config([flapping_detect])),
?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])),
ok.

get_policy(Zone) ->
Expand Down
24 changes: 16 additions & 8 deletions apps/emqx/test/emqx_ocsp_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,19 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) ->
enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000,
refresh_interval => 1_000
refresh_http_timeout => <<"15s">>,
refresh_interval => <<"1s">>
}
}
},
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
Expand Down Expand Up @@ -179,15 +183,19 @@ init_per_testcase(_TestCase, Config) ->
enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000,
refresh_interval => 1_000
refresh_http_timeout => <<"15s">>,
refresh_interval => <<"1s">>
}
}
},
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
[
{cache_pid, CachePid}
| Config
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_authn/src/emqx_authn_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,8 @@ authenticator_examples() ->
<<"password">> => ?PH_PASSWORD
},
pool_size => 8,
connect_timeout => 5000,
request_timeout => 5000,
connect_timeout => <<"5s">>,
request_timeout => <<"5s">>,
enable_pipelining => 100,
ssl => #{enable => false}
}
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) ->
<<"max_connections">> => 1024000,
<<"mountpoint">> => <<>>,
<<"proxy_protocol">> => false,
<<"proxy_protocol_timeout">> => 3000,
<<"proxy_protocol_timeout">> => <<"3s">>,
<<"enable_authn">> => EnableAuthn
}.

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authz/test/emqx_authz_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ set_special_configs(_App) ->
<<"headers">> => #{},
<<"ssl">> => #{<<"enable">> => true},
<<"method">> => <<"get">>,
<<"request_timeout">> => 5000
<<"request_timeout">> => <<"5s">>
}).
-define(SOURCE2, #{
<<"type">> => <<"mongodb">>,
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ t_api(_) ->
<<"cache">> => #{
<<"enable">> => false,
<<"max_size">> => 32,
<<"ttl">> => 60000
<<"ttl">> => <<"60s">>
}
},

Expand All @@ -84,7 +84,7 @@ t_api(_) ->
<<"cache">> => #{
<<"enable">> => true,
<<"max_size">> => 32,
<<"ttl">> => 60000
<<"ttl">> => <<"60s">>
}
},

Expand Down
20 changes: 10 additions & 10 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,21 @@ bridge_async_config(#{port := Port} = Config) ->
Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
RequestTimeout = maps:get(request_timeout, Config, "10s"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
" connect_timeout = \"~ps\"\n"
" connect_timeout = \"~p\"\n"
" enable = true\n"
" enable_pipelining = 100\n"
" max_retries = 2\n"
" method = \"post\"\n"
" pool_size = ~p\n"
" pool_type = \"random\"\n"
" request_timeout = \"~ps\"\n"
" request_timeout = \"~s\"\n"
" body = \"${id}\""
" resource_opts {\n"
" inflight_window = 100\n"
Expand Down Expand Up @@ -257,8 +257,8 @@ t_send_async_connection_timeout(Config) ->
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => 10_000,
request_timeout => ResponseDelayMS * 2,
connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s",
request_timeout => "10s",
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
Expand All @@ -278,8 +278,8 @@ t_async_free_retries(Config) ->
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
connect_timeout => "1s",
request_timeout => "10s",
resource_request_ttl => "10000s"
}),
%% Fail 5 times then succeed.
Expand All @@ -304,8 +304,8 @@ t_async_common_retries(Config) ->
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
connect_timeout => "1s",
request_timeout => "10s",
resource_request_ttl => "10000s"
}),
%% Keeps failing until connector gives up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
<<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ clickhouse_config() ->
]
)
),
connect_timeout => 10000
connect_timeout => <<"10s">>
},
#{<<"config">> => Config}.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
" iotdb_version = \"~s\"\n"
" pool_size = 1\n"
" resource_opts = {\n"
" health_check_interval = 5000\n"
" request_ttl = 30000\n"
" health_check_interval = \"5s\"\n"
" request_ttl = 30s\n"
" query_mode = \"async\"\n"
" worker_pool_size = 1\n"
" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" kafka {\n"
" max_batch_bytes = 896KB\n"
" max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n"
" offset_commit_interval_seconds = 3s\n"
%% todo: matrix this
" offset_reset_policy = latest\n"
" }\n"
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer {
kafka {
max_batch_bytes = 896KB
max_rejoin_attempts = 5
offset_commit_interval_seconds = 3
offset_commit_interval_seconds = 3s
offset_reset_policy = latest
}
topic_mapping = [
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
<<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
),
Expand Down