Skip to content

Commit

Permalink
Merge pull request #9949 from qzhuyan/dev/william/e5.0.1/multi-stream
Browse files Browse the repository at this point in the history
feat(quic): multi streams
  • Loading branch information
qzhuyan committed Feb 20, 2023
2 parents 291755f + 31cfd72 commit a2762e5
Show file tree
Hide file tree
Showing 22 changed files with 3,089 additions and 116 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_test_cases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
echo "runs-on=${RUNS_ON}" | tee -a $GITHUB_OUTPUT
prepare:
runs-on: aws-amd64
runs-on: ${{ needs.build-matrix.outputs.runs-on }}
needs: [build-matrix]
strategy:
fail-fast: false
Expand Down
10 changes: 7 additions & 3 deletions apps/emqx/etc/emqx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ listeners.wss.default {
# enabled = true
# bind = "0.0.0.0:14567"
# max_connections = 1024000
# keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#}
# ssl_options {
# verify = verify_none
# keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# }
19 changes: 15 additions & 4 deletions apps/emqx/i18n/emqx_schema_i18n.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1815,8 +1815,8 @@ fields_listener_enabled {

fields_mqtt_quic_listener_certfile {
desc {
en: """Path to the certificate file."""
zh: """证书文件。"""
en: """Path to the certificate file. Will be deprecated in 5.1, use .ssl_options.certfile instead."""
zh: """证书文件。在 5.1 中会被废弃,使用 .ssl_options.certfile 代替。"""
}
label: {
en: "Certificate file"
Expand All @@ -1826,8 +1826,8 @@ fields_mqtt_quic_listener_certfile {

fields_mqtt_quic_listener_keyfile {
desc {
en: """Path to the secret key file."""
zh: """私钥文件。"""
en: """Path to the secret key file. Will be deprecated in 5.1, use .ssl_options.keyfile instead."""
zh: """私钥文件。在 5.1 中会被废弃,使用 .ssl_options.keyfile 代替。"""
}
label: {
en: "Key file"
Expand Down Expand Up @@ -1868,6 +1868,17 @@ fields_mqtt_quic_listener_keep_alive_interval {
}
}

fields_mqtt_quic_listener_ssl_options {
desc {
en: """TLS options for QUIC transport"""
zh: """QUIC 传输层的 TLS 选项"""
}
label: {
en: "TLS Options"
zh: "TLS 选项"
}
}

base_listener_bind {
desc {
en: """IP address and port for the listening socket."""
Expand Down
25 changes: 25 additions & 0 deletions apps/emqx/include/emqx_quic.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-ifndef(EMQX_QUIC_HRL).
-define(EMQX_QUIC_HRL, true).

%% MQTT Over QUIC Shutdown Error code.
-define(MQTT_QUIC_CONN_NOERROR, 0).
-define(MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN, 1).
-define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2).

-endif.
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.1"}}}
]},
{extra_src_dirs, [{"test", [recursive]}]}
]}
Expand Down
17 changes: 15 additions & 2 deletions apps/emqx/rebar.config.script
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,20 @@ IsQuicSupp = fun() ->
end,

Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.16"}}}.
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.109"}}}.

Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),
{plt_extra_apps, OldExtra} = lists:keyfind(plt_extra_apps, 1, OldDialyzerConfig),
Extra = OldExtra ++ [quicer || IsQuicSupp()],
NewDialyzerConfig = [{plt_extra_apps, Extra} | OldDialyzerConfig],
lists:keystore(
dialyzer,
1,
Config,
{dialyzer, NewDialyzerConfig}
)
end.

ExtraDeps = fun(C) ->
{deps, Deps0} = lists:keyfind(deps, 1, C),
Expand All @@ -43,4 +56,4 @@ ExtraDeps = fun(C) ->
)
end,

ExtraDeps(CONFIG).
Dialyzer(ExtraDeps(CONFIG)).
83 changes: 60 additions & 23 deletions apps/emqx/src/emqx_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
%% limitations under the License.
%%--------------------------------------------------------------------

%% MQTT/TCP|TLS Connection
%% This module interacts with the transport layer of MQTT
%% Transport:
%% - TCP connection
%% - TCP/TLS connection
%% - QUIC Stream
%%
%% for WebSocket @see emqx_ws_connection.erl
-module(emqx_connection).

-include("emqx.hrl").
Expand Down Expand Up @@ -111,7 +117,10 @@
limiter_buffer :: queue:queue(pending_req()),

%% limiter timers
limiter_timer :: undefined | reference()
limiter_timer :: undefined | reference(),

%% QUIC conn owner pid if in use.
quic_conn_pid :: maybe(pid())
}).

-record(retry, {
Expand Down Expand Up @@ -189,12 +198,16 @@
]}
).

-spec start_link(
esockd:transport(),
esockd:socket() | {pid(), quicer:connection_handler()},
emqx_channel:opts()
) ->
{ok, pid()}.
-spec start_link
(esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
{ok, pid()};
(
emqx_quic_stream,
{ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()},
emqx_quic_connection:cb_state()
) ->
{ok, pid()}.

start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options],
CPid = proc_lib:spawn_link(?MODULE, init, Args),
Expand Down Expand Up @@ -329,6 +342,7 @@ init_state(
},
ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(),
%% Init Channel
Channel = emqx_channel:init(ConnInfo, Opts),
GcState =
case emqx_config:get_zone_conf(Zone, [force_gc]) of
Expand Down Expand Up @@ -359,7 +373,9 @@ init_state(
zone = Zone,
listener = Listener,
limiter_buffer = queue:new(),
limiter_timer = undefined
limiter_timer = undefined,
%% for quic streams to inherit
quic_conn_pid = maps:get(conn_pid, Opts, undefined)
}.

run_loop(
Expand Down Expand Up @@ -476,7 +492,9 @@ process_msg([Msg | More], State) ->
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} ->
{stop, Reason, NState}
{stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
end
catch
exit:normal ->
Expand Down Expand Up @@ -507,7 +525,6 @@ append_msg(Q, Msg) ->

%%--------------------------------------------------------------------
%% Handle a Msg

handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
Expand All @@ -525,11 +542,10 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
inc_counter(incoming_bytes, Len),
ok = emqx_metrics:inc('bytes.received', Len),
when_bytes_in(Len, Data, State);
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
case queue:peek(Cache) of
empty ->
Expand Down Expand Up @@ -595,9 +611,20 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) ->
%% @FIXME here it could be close due to appl error.
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
handle_msg(
{event, connected},
State = #state{
channel = Channel,
serialize = Serialize,
parse_state = PS,
quic_conn_pid = QuicConnPid
}
) ->
QuicConnPid =/= undefined andalso
emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
Expand Down Expand Up @@ -654,6 +681,12 @@ maybe_raise_exception(#{
stacktrace := Stacktrace
}) ->
erlang:raise(Exception, Context, Stacktrace);
maybe_raise_exception({shutdown, normal}) ->
ok;
maybe_raise_exception(normal) ->
ok;
maybe_raise_exception(shutdown) ->
ok;
maybe_raise_exception(Reason) ->
exit(Reason).

Expand Down Expand Up @@ -748,6 +781,7 @@ when_bytes_in(Oct, Data, State) ->
NState
).

%% @doc: return a reversed Msg list
-compile({inline, [next_incoming_msgs/3]}).
next_incoming_msgs([Packet], Msgs, State) ->
{ok, [{incoming, Packet} | Msgs], State};
Expand Down Expand Up @@ -870,6 +904,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
ok;
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
%% @FIXME: why not just return error?
self() ! {inet_reply, Socket, Error},
ok
end.
Expand All @@ -893,12 +928,14 @@ handle_info({sock_error, Reason}, State) ->
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, peer_send_shutdown, _Stream}, State) ->
handle_info({sock_closed, force}, close_socket(State));
handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
handle_info({sock_closed, ReasonFlag}, State);
handle_info({quic, closed, _Stream}, State) ->
handle_info({sock_closed, force}, State);
%% handle QUIC control stream events
handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
case emqx_quic_stream:Event(Handle, Prop, State) of
{{continue, Msgs}, NewState} ->
{ok, Msgs, NewState};
Other ->
Other
end;
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).

Expand Down
46 changes: 31 additions & 15 deletions apps/emqx/src/emqx_listeners.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ id_example() -> 'tcp:default'.
list_raw() ->
[
{listener_id(Type, LName), Type, LConf}
|| %% FIXME: quic is not supported update vi dashboard yet
{Type, LName, LConf} <- do_list_raw(),
Type =/= <<"quic">>
|| {Type, LName, LConf} <- do_list_raw()
].

list() ->
Expand Down Expand Up @@ -170,6 +168,11 @@ current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
esockd:get_current_connections({listener_id(Type, Name), ListenOn});
current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
proplists:get_value(all_connections, ranch:info(listener_id(Type, Name)));
current_conns(quic, _Name, _ListenOn) ->
case quicer:perf_counters() of
{ok, PerfCnts} -> proplists:get_value(conn_active, PerfCnts);
_ -> 0
end;
current_conns(_, _, _) ->
{error, not_support}.

Expand Down Expand Up @@ -367,16 +370,26 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
case [A || {quicer, _, _} = A <- application:which_applications()] of
[_] ->
DefAcceptors = erlang:system_info(schedulers_online) * 8,
ListenOpts = [
{cert, maps:get(certfile, Opts)},
{key, maps:get(keyfile, Opts)},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, 2}
],
SSLOpts = maps:merge(
maps:with([certfile, keyfile], Opts),
maps:get(ssl_options, Opts, #{})
),
ListenOpts =
[
{certfile, str(maps:get(certfile, SSLOpts))},
{keyfile, str(maps:get(keyfile, SSLOpts))},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, 2},
{verify, maps:get(verify, SSLOpts, verify_none)}
] ++
case maps:get(cacertfile, SSLOpts, undefined) of
undefined -> [];
CaCertFile -> [{cacertfile, binary_to_list(CaCertFile)}]
end,
ConnectionOpts = #{
conn_callback => emqx_quic_connection,
peer_unidi_stream_count => 1,
Expand All @@ -385,13 +398,16 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
listener => {quic, ListenerName},
limiter => limiter(Opts)
},
StreamOpts = [{stream_callback, emqx_quic_stream}],
StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},
Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts),
quicer:start_listener(
Id,
ListenOn,
{ListenOpts, ConnectionOpts, StreamOpts}
{maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
);
[] ->
{ok, {skipped, quic_app_missing}}
Expand Down

0 comments on commit a2762e5

Please sign in to comment.