Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quic): multi streams #9949

Merged
merged 54 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
00b59b4
feat(quic): WIP multi-stream
qzhuyan Nov 1, 2022
2d09a05
chore: add some typing
qzhuyan Nov 2, 2022
a51c886
fix: prepare for multi stream
qzhuyan Nov 2, 2022
9f69692
feat(quic): multi streams
qzhuyan Nov 25, 2022
9b52bea
fix(quic): handle fast_close while handshake fail
qzhuyan Dec 5, 2022
7d9bd33
feat(quic): bump quicer version to 0.0.100
qzhuyan Dec 7, 2022
04a8a49
test: update testcase for new emqtt
qzhuyan Dec 13, 2022
5bdcb05
feat(quic): workaround to flushing the send buffer after conn shutdown
qzhuyan Dec 14, 2022
1840a7f
test(quic): improve coverage
qzhuyan Dec 15, 2022
ceac5a0
feat(quic): bump to quicer 0.0.101
qzhuyan Dec 20, 2022
0544a3c
fix(quic): setops on stream and handle peer needs stream
qzhuyan Dec 22, 2022
0173121
feat(quic): improve coverage and remove unused code
qzhuyan Dec 22, 2022
71d3148
feat(quic): stream use active_n 10
qzhuyan Jan 5, 2023
1e8b2e2
feat(quic): 0-RTT multi-streams data
qzhuyan Jan 6, 2023
5764994
feat(quic): bump to quicer 0.0.103
qzhuyan Jan 6, 2023
f65ac54
test(quic): improve coverage
qzhuyan Jan 8, 2023
22dcf59
feat(quic): bump to quicer 0.0.104
qzhuyan Jan 8, 2023
00f615a
chore(quic): clean code
qzhuyan Jan 9, 2023
2a6cdd9
test(quic): enhance large payload test
qzhuyan Jan 10, 2023
1692a16
feat(quic): handle ctrl stream normal shutdown
qzhuyan Jan 11, 2023
98a72d4
fix(emqx_connection): do not raise an exception for normal shutdown
qzhuyan Jan 11, 2023
de810e0
chore(quic): clean test code
qzhuyan Jan 11, 2023
88cdfcc
test(quic): excl. multistream SUITE when BUILD_WITHOUT_QUIC
qzhuyan Jan 11, 2023
9e9ae50
chore: qzhuyan/emqtt vsn 534541b
qzhuyan Jan 11, 2023
282d1a6
ci: build dialyzer PLT with quicer, jq and bcrypt
qzhuyan Jan 12, 2023
381eb8e
chore(quic): fix dialyzer
qzhuyan Jan 13, 2023
38247a9
feat(quic): bump quicer to 0.0.106
qzhuyan Jan 13, 2023
d8fa65e
fix(quic): handle timeout event in data stream
qzhuyan Jan 13, 2023
f8fd201
test(quic): fix flaky test
qzhuyan Jan 18, 2023
dc26790
test(quic): trace why we get verify_peer
qzhuyan Jan 18, 2023
db544cf
fix: emqtt vsn in rebar after rebase
qzhuyan Jan 18, 2023
f4f346e
test(quic): fix flaky test
qzhuyan Jan 18, 2023
0351b32
test(quic): disable shutdown policy for large payload test
qzhuyan Jan 19, 2023
3c73c6b
feat(quic): bump quicer to 0.0.107
qzhuyan Jan 24, 2023
c457c10
fix(quic): show QUIC listeners in dashboard
qzhuyan Jan 25, 2023
c7efccb
chore: bump emqtt 1.7.1-pre2 & quicer 0.0.108
qzhuyan Feb 3, 2023
04f502f
feat(quic): support mTLS with 'verify' and 'cacertfile'
qzhuyan Feb 3, 2023
fc3e871
feat(quic): bump to emqtt 1.8.0
qzhuyan Feb 8, 2023
0e40f6c
feat(quic): listener use common server ssl_options
qzhuyan Feb 8, 2023
e8380e0
ci: forked repo could run test cases
qzhuyan Feb 8, 2023
4de27d8
chore(quic): changelogs
qzhuyan Feb 9, 2023
c6c3bd0
chore(quic): schema format fix
qzhuyan Feb 10, 2023
8a5db51
chore: fix changelog
qzhuyan Feb 10, 2023
f106f30
chore: fix comments in emqx_connection
qzhuyan Feb 10, 2023
45718dd
chore(quic): debug flaky large payload tc.
qzhuyan Feb 10, 2023
b81b62c
chore(quic): doc about deprecated fields.
qzhuyan Feb 14, 2023
fef0a93
chore(quic): make spell check happy
qzhuyan Feb 15, 2023
3f7032f
chore(quic): troubleshooting large payload
qzhuyan Feb 15, 2023
ebd0fb7
test(quic): by default, bind to port not IPv4
qzhuyan Feb 16, 2023
cf72947
test(quic): use quic.ssl_options
qzhuyan Feb 16, 2023
296e271
fix(quic): bump to emqtt 1.8.1
qzhuyan Feb 17, 2023
3486943
chore(quic): move changelog dir
qzhuyan Feb 20, 2023
bd4a84a
test(quic): adapt to new emqtt reconnect mechanism.
qzhuyan Feb 20, 2023
31cfd72
ci(quic): bump to quicer 0.0.109 for ubuntu22.04 prebuilds
qzhuyan Feb 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run_test_cases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
echo "runs-on=${RUNS_ON}" | tee -a $GITHUB_OUTPUT

prepare:
runs-on: aws-amd64
runs-on: ${{ needs.build-matrix.outputs.runs-on }}
needs: [build-matrix]
strategy:
fail-fast: false
Expand Down
10 changes: 7 additions & 3 deletions apps/emqx/etc/emqx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ listeners.wss.default {
# enabled = true
# bind = "0.0.0.0:14567"
# max_connections = 1024000
# keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#}
# ssl_options {
# verify = verify_none
# keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# }
19 changes: 15 additions & 4 deletions apps/emqx/i18n/emqx_schema_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1815,8 +1815,8 @@ fields_listener_enabled {

fields_mqtt_quic_listener_certfile {
desc {
en: """Path to the certificate file."""
zh: """证书文件。"""
en: """Path to the certificate file. Will be deprecated in 5.1, use .ssl_options.certfile instead."""
zh: """证书文件。在 5.1 中会被废弃,使用 .ssl_options.certfile 代替。"""
}
label: {
en: "Certificate file"
Expand All @@ -1826,8 +1826,8 @@ fields_mqtt_quic_listener_certfile {

fields_mqtt_quic_listener_keyfile {
desc {
en: """Path to the secret key file."""
zh: """私钥文件。"""
en: """Path to the secret key file. Will be deprecated in 5.1, use .ssl_options.keyfile instead."""
zh: """私钥文件。在 5.1 中会被废弃,使用 .ssl_options.keyfile 代替。"""
}
label: {
en: "Key file"
Expand Down Expand Up @@ -1868,6 +1868,17 @@ fields_mqtt_quic_listener_keep_alive_interval {
}
}

fields_mqtt_quic_listener_ssl_options {
desc {
en: """TLS options for QUIC transport"""
zh: """QUIC 传输层的 TLS 选项"""
}
label: {
en: "TLS Options"
zh: "TLS 选项"
}
}

base_listener_bind {
desc {
en: """IP address and port for the listening socket."""
Expand Down
25 changes: 25 additions & 0 deletions apps/emqx/include/emqx_quic.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-ifndef(EMQX_QUIC_HRL).
-define(EMQX_QUIC_HRL, true).

%% MQTT Over QUIC Shutdown Error code.
-define(MQTT_QUIC_CONN_NOERROR, 0).
-define(MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN, 1).
-define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2).

-endif.
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.1"}}}
]},
{extra_src_dirs, [{"test", [recursive]}]}
]}
Expand Down
17 changes: 15 additions & 2 deletions apps/emqx/rebar.config.script
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,20 @@ IsQuicSupp = fun() ->
end,

Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.16"}}}.
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.109"}}}.

Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),
{plt_extra_apps, OldExtra} = lists:keyfind(plt_extra_apps, 1, OldDialyzerConfig),
Extra = OldExtra ++ [quicer || IsQuicSupp()],
NewDialyzerConfig = [{plt_extra_apps, Extra} | OldDialyzerConfig],
lists:keystore(
dialyzer,
1,
Config,
{dialyzer, NewDialyzerConfig}
)
end.

ExtraDeps = fun(C) ->
{deps, Deps0} = lists:keyfind(deps, 1, C),
Expand All @@ -43,4 +56,4 @@ ExtraDeps = fun(C) ->
)
end,

ExtraDeps(CONFIG).
Dialyzer(ExtraDeps(CONFIG)).
83 changes: 60 additions & 23 deletions apps/emqx/src/emqx_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
%% limitations under the License.
%%--------------------------------------------------------------------

%% MQTT/TCP|TLS Connection
%% This module interacts with the transport layer of MQTT
%% Transport:
%% - TCP connection
%% - TCP/TLS connection
%% - QUIC Stream
%%
%% for WebSocket @see emqx_ws_connection.erl
-module(emqx_connection).

-include("emqx.hrl").
Expand Down Expand Up @@ -111,7 +117,10 @@
limiter_buffer :: queue:queue(pending_req()),

%% limiter timers
limiter_timer :: undefined | reference()
limiter_timer :: undefined | reference(),

%% QUIC conn owner pid if in use.
quic_conn_pid :: maybe(pid())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need concern about the Hot-upgrade compatibility once e5.0.0 released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with @id , We don't support hot upgrade for e5.0.x

}).

-record(retry, {
Expand Down Expand Up @@ -189,12 +198,16 @@
]}
).

-spec start_link(
esockd:transport(),
esockd:socket() | {pid(), quicer:connection_handler()},
emqx_channel:opts()
) ->
{ok, pid()}.
-spec start_link
(esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
{ok, pid()};
(
emqx_quic_stream,
{ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()},
emqx_quic_connection:cb_state()
) ->
{ok, pid()}.

start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options],
CPid = proc_lib:spawn_link(?MODULE, init, Args),
Expand Down Expand Up @@ -329,6 +342,7 @@ init_state(
},
ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(),
%% Init Channel
Channel = emqx_channel:init(ConnInfo, Opts),
GcState =
case emqx_config:get_zone_conf(Zone, [force_gc]) of
Expand Down Expand Up @@ -359,7 +373,9 @@ init_state(
zone = Zone,
listener = Listener,
limiter_buffer = queue:new(),
limiter_timer = undefined
limiter_timer = undefined,
%% for quic streams to inherit
quic_conn_pid = maps:get(conn_pid, Opts, undefined)
}.

run_loop(
Expand Down Expand Up @@ -476,7 +492,9 @@ process_msg([Msg | More], State) ->
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} ->
{stop, Reason, NState}
{stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
end
catch
exit:normal ->
Expand Down Expand Up @@ -507,7 +525,6 @@ append_msg(Q, Msg) ->

%%--------------------------------------------------------------------
%% Handle a Msg

handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
Expand All @@ -525,11 +542,10 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
inc_counter(incoming_bytes, Len),
ok = emqx_metrics:inc('bytes.received', Len),
when_bytes_in(Len, Data, State);
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
case queue:peek(Cache) of
empty ->
Expand Down Expand Up @@ -595,9 +611,20 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) ->
%% @FIXME here it could be close due to appl error.
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
handle_msg(
{event, connected},
State = #state{
channel = Channel,
serialize = Serialize,
parse_state = PS,
quic_conn_pid = QuicConnPid
}
) ->
QuicConnPid =/= undefined andalso
emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
Expand Down Expand Up @@ -654,6 +681,12 @@ maybe_raise_exception(#{
stacktrace := Stacktrace
}) ->
erlang:raise(Exception, Context, Stacktrace);
maybe_raise_exception({shutdown, normal}) ->
ok;
maybe_raise_exception(normal) ->
ok;
maybe_raise_exception(shutdown) ->
ok;
maybe_raise_exception(Reason) ->
exit(Reason).

Expand Down Expand Up @@ -748,6 +781,7 @@ when_bytes_in(Oct, Data, State) ->
NState
).

%% @doc: return a reversed Msg list
-compile({inline, [next_incoming_msgs/3]}).
next_incoming_msgs([Packet], Msgs, State) ->
{ok, [{incoming, Packet} | Msgs], State};
Expand Down Expand Up @@ -870,6 +904,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
ok;
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
%% @FIXME: why not just return error?
self() ! {inet_reply, Socket, Error},
ok
end.
Expand All @@ -893,12 +928,14 @@ handle_info({sock_error, Reason}, State) ->
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, peer_send_shutdown, _Stream}, State) ->
handle_info({sock_closed, force}, close_socket(State));
handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
handle_info({sock_closed, ReasonFlag}, State);
handle_info({quic, closed, _Stream}, State) ->
handle_info({sock_closed, force}, State);
%% handle QUIC control stream events
handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
case emqx_quic_stream:Event(Handle, Prop, State) of
{{continue, Msgs}, NewState} ->
{ok, Msgs, NewState};
Other ->
Other
end;
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).

Expand Down
46 changes: 31 additions & 15 deletions apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ id_example() -> 'tcp:default'.
list_raw() ->
[
{listener_id(Type, LName), Type, LConf}
|| %% FIXME: quic is not supported update vi dashboard yet
{Type, LName, LConf} <- do_list_raw(),
Type =/= <<"quic">>
|| {Type, LName, LConf} <- do_list_raw()
].

list() ->
Expand Down Expand Up @@ -170,6 +168,11 @@ current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
esockd:get_current_connections({listener_id(Type, Name), ListenOn});
current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
proplists:get_value(all_connections, ranch:info(listener_id(Type, Name)));
current_conns(quic, _Name, _ListenOn) ->
case quicer:perf_counters() of
{ok, PerfCnts} -> proplists:get_value(conn_active, PerfCnts);
_ -> 0
end;
current_conns(_, _, _) ->
{error, not_support}.

Expand Down Expand Up @@ -367,16 +370,26 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
case [A || {quicer, _, _} = A <- application:which_applications()] of
[_] ->
DefAcceptors = erlang:system_info(schedulers_online) * 8,
ListenOpts = [
{cert, maps:get(certfile, Opts)},
{key, maps:get(keyfile, Opts)},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, 2}
],
SSLOpts = maps:merge(
maps:with([certfile, keyfile], Opts),
maps:get(ssl_options, Opts, #{})
),
ListenOpts =
[
{certfile, str(maps:get(certfile, SSLOpts))},
{keyfile, str(maps:get(keyfile, SSLOpts))},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, 2},
{verify, maps:get(verify, SSLOpts, verify_none)}
] ++
case maps:get(cacertfile, SSLOpts, undefined) of
undefined -> [];
CaCertFile -> [{cacertfile, binary_to_list(CaCertFile)}]
end,
ConnectionOpts = #{
conn_callback => emqx_quic_connection,
peer_unidi_stream_count => 1,
Expand All @@ -385,13 +398,16 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
listener => {quic, ListenerName},
limiter => limiter(Opts)
},
StreamOpts = [{stream_callback, emqx_quic_stream}],
StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},
Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts),
quicer:start_listener(
Id,
ListenOn,
{ListenOpts, ConnectionOpts, StreamOpts}
{maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
);
[] ->
{ok, {skipped, quic_app_missing}}
Expand Down