Skip to content

Commit

Permalink
Merge pull request #11077 from zhongwencool/fix-crash-ip-port-type
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongwencool committed Jun 21, 2023
2 parents 010c2fb + 856de78 commit f50d733
Show file tree
Hide file tree
Showing 35 changed files with 139 additions and 128 deletions.
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
Expand Down
32 changes: 16 additions & 16 deletions apps/emqx/src/emqx_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ base_listener(Bind) ->
)},
{"bind",
sc(
hoconsc:union([ip_port(), integer()]),
ip_port(),
#{
default => Bind,
required => true,
Expand Down Expand Up @@ -2418,21 +2418,21 @@ mk_duration(Desc, OverrideMeta) ->
to_duration(Str) ->
case hocon_postprocess:duration(Str) of
I when is_integer(I) -> {ok, I};
_ -> {error, Str}
_ -> to_integer(Str)
end.

to_duration_s(Str) ->
case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I / 1000)};
_ -> {error, Str}
_ -> to_integer(Str)
end.

-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
Input :: string() | binary().
to_duration_ms(Str) ->
case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I)};
_ -> {error, Str}
_ -> to_integer(Str)
end.

-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
Expand Down Expand Up @@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) ->
to_bytesize(Str) ->
case hocon_postprocess:bytesize(Str) of
I when is_integer(I) -> {ok, I};
_ -> {error, Str}
_ -> to_integer(Str)
end.

to_wordsize(Str) ->
Expand All @@ -2483,6 +2483,13 @@ to_wordsize(Str) ->
Error -> Error
end.

to_integer(Str) ->
case string:to_integer(Str) of
{Int, []} -> {ok, Int};
{Int, <<>>} -> {ok, Int};
_ -> {error, Str}
end.

to_percent(Str) ->
{ok, hocon_postprocess:percent(Str)}.

Expand Down Expand Up @@ -2525,9 +2532,9 @@ to_ip_port(Str) ->
case split_ip_port(Str) of
{"", Port} ->
%% this is a local address
{ok, list_to_integer(Port)};
{ok, parse_port(Port)};
{MaybeIp, Port} ->
PortVal = list_to_integer(Port),
PortVal = parse_port(Port),
case inet:parse_address(MaybeIp) of
{ok, IpTuple} ->
{ok, {IpTuple, PortVal}};
Expand All @@ -2543,18 +2550,11 @@ split_ip_port(Str0) ->
case lists:split(string:rchr(Str, $:), Str) of
%% no colon
{[], Str} ->
try
%% if it's just a port number, then return as-is
_ = list_to_integer(Str),
{"", Str}
catch
_:_ ->
error
end;
{"", Str};
{IpPlusColon, PortString} ->
IpStr0 = lists:droplast(IpPlusColon),
case IpStr0 of
%% dropp head/tail brackets
%% drop head/tail brackets
[$[ | S] ->
case lists:last(S) of
$] -> {lists:droplast(S), PortString};
Expand Down
21 changes: 14 additions & 7 deletions apps/emqx/test/emqx_crl_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,24 @@ t_update_config(_Config) ->
emqx_config_handler:start_link(),
{ok, Pid} = emqx_crl_cache:start_link(),
Conf = #{
refresh_interval => timer:minutes(5),
http_timeout => timer:minutes(10),
refresh_interval => <<"5m">>,
http_timeout => <<"10m">>,
capacity => 123
},
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
State = sys:get_state(Pid),
?assertEqual(Conf, #{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}),
?assertEqual(
#{
refresh_interval => timer:minutes(5),
http_timeout => timer:minutes(10),
capacity => 123
},
#{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}
),
emqx_config:erase(<<"crl_cache">>),
emqx_config_handler:stop(),
ok.
Expand Down
16 changes: 8 additions & 8 deletions apps/emqx/test/emqx_flapping_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ init_per_suite(Config) ->
<<"enable">> => true,
<<"max_count">> => 3,
% 0.1s
<<"window_time">> => 100,
<<"window_time">> => <<"100ms">>,
%% 2s
<<"ban_time">> => "2s"
<<"ban_time">> => <<"2s">>
}
),
Config.
Expand Down Expand Up @@ -119,16 +119,16 @@ t_conf_update(_) ->
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),

Zones = #{
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}},
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}}
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}},
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}}
},
?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
%% new_zone is already deleted
?assertError({config_not_found, _}, get_policy(new_zone)),
%% update zone(zone_1) has default.
?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)),
?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)),
%% create zone(zone_2) has default
?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)),
?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)),
%% reset to default(empty) andalso get default from global
?assertMatch({ok, _}, emqx:update_config([zones], #{})),
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
Expand Down Expand Up @@ -172,13 +172,13 @@ validate_timer(Lists) ->
ok.

t_window_compatibility_check(_Conf) ->
Flapping = emqx:get_config([flapping_detect]),
Flapping = emqx:get_raw_config([flapping_detect]),
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
%% reset
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
ok = emqx_config:init_load(emqx_schema, FlappingBin),
?assertEqual(Flapping, emqx:get_config([flapping_detect])),
?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])),
ok.

get_policy(Zone) ->
Expand Down
24 changes: 16 additions & 8 deletions apps/emqx/test/emqx_ocsp_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,19 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) ->
enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000,
refresh_interval => 1_000
refresh_http_timeout => <<"15s">>,
refresh_interval => <<"1s">>
}
}
},
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
Expand Down Expand Up @@ -179,15 +183,19 @@ init_per_testcase(_TestCase, Config) ->
enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000,
refresh_interval => 1_000
refresh_http_timeout => <<"15s">>,
refresh_interval => <<"1s">>
}
}
},
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts),
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
[
{cache_pid, CachePid}
| Config
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_authn/src/emqx_authn_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,8 @@ authenticator_examples() ->
<<"password">> => ?PH_PASSWORD
},
pool_size => 8,
connect_timeout => 5000,
request_timeout => 5000,
connect_timeout => <<"5s">>,
request_timeout => <<"5s">>,
enable_pipelining => 100,
ssl => #{enable => false}
}
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authn/test/emqx_authn_enable_flag_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) ->
<<"max_connections">> => 1024000,
<<"mountpoint">> => <<>>,
<<"proxy_protocol">> => false,
<<"proxy_protocol_timeout">> => 3000,
<<"proxy_protocol_timeout">> => <<"3s">>,
<<"enable_authn">> => EnableAuthn
}.

Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authz/test/emqx_authz_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ set_special_configs(_App) ->
<<"headers">> => #{},
<<"ssl">> => #{<<"enable">> => true},
<<"method">> => <<"get">>,
<<"request_timeout">> => 5000
<<"request_timeout">> => <<"5s">>
}).
-define(SOURCE2, #{
<<"type">> => <<"mongodb">>,
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ t_api(_) ->
<<"cache">> => #{
<<"enable">> => false,
<<"max_size">> => 32,
<<"ttl">> => 60000
<<"ttl">> => <<"60s">>
}
},

Expand All @@ -84,7 +84,7 @@ t_api(_) ->
<<"cache">> => #{
<<"enable">> => true,
<<"max_size">> => 32,
<<"ttl">> => 60000
<<"ttl">> => <<"60s">>
}
},

Expand Down
20 changes: 10 additions & 10 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,21 @@ bridge_async_config(#{port := Port} = Config) ->
Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
RequestTimeout = maps:get(request_timeout, Config, "10s"),
ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
" connect_timeout = \"~ps\"\n"
" connect_timeout = \"~p\"\n"
" enable = true\n"
" enable_pipelining = 100\n"
" max_retries = 2\n"
" method = \"post\"\n"
" pool_size = ~p\n"
" pool_type = \"random\"\n"
" request_timeout = \"~ps\"\n"
" request_timeout = \"~s\"\n"
" body = \"${id}\""
" resource_opts {\n"
" inflight_window = 100\n"
Expand Down Expand Up @@ -257,8 +257,8 @@ t_send_async_connection_timeout(Config) ->
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => 10_000,
request_timeout => ResponseDelayMS * 2,
connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s",
request_timeout => "10s",
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
Expand All @@ -278,8 +278,8 @@ t_async_free_retries(Config) ->
port => Port,
pool_size => 1,
query_mode => "sync",
connect_timeout => 1_000,
request_timeout => 10_000,
connect_timeout => "1s",
request_timeout => "10s",
resource_request_ttl => "10000s"
}),
%% Fail 5 times then succeed.
Expand All @@ -304,8 +304,8 @@ t_async_common_retries(Config) ->
pool_size => 1,
query_mode => "sync",
resume_interval => "100ms",
connect_timeout => 1_000,
request_timeout => 10_000,
connect_timeout => "1s",
request_timeout => "10s",
resource_request_ttl => "10000s"
}),
%% Keeps failing until connector gives up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
<<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ clickhouse_config() ->
]
)
),
connect_timeout => 10000
connect_timeout => <<"10s">>
},
#{<<"config">> => Config}.

Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
" iotdb_version = \"~s\"\n"
" pool_size = 1\n"
" resource_opts = {\n"
" health_check_interval = 5000\n"
" request_ttl = 30000\n"
" health_check_interval = \"5s\"\n"
" request_ttl = 30s\n"
" query_mode = \"async\"\n"
" worker_pool_size = 1\n"
" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" kafka {\n"
" max_batch_bytes = 896KB\n"
" max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n"
" offset_commit_interval_seconds = 3s\n"
%% todo: matrix this
" offset_reset_policy = latest\n"
" }\n"
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer {
kafka {
max_batch_bytes = 896KB
max_rejoin_attempts = 5
offset_commit_interval_seconds = 3
offset_commit_interval_seconds = 3s
offset_reset_policy = latest
}
topic_mapping = [
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ t_write_timeout(Config) ->
Config,
#{
<<"resource_opts">> => #{
<<"request_ttl">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
<<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
),
Expand Down

0 comments on commit f50d733

Please sign in to comment.