Skip to content

Commit

Permalink
feat(session): introduce session implementation concept
Browse files Browse the repository at this point in the history
  • Loading branch information
keynslug committed Sep 20, 2023
1 parent 780ca15 commit bf16417
Show file tree
Hide file tree
Showing 23 changed files with 2,550 additions and 1,805 deletions.
38 changes: 3 additions & 35 deletions apps/emqx/include/emqx_session.hrl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand All @@ -17,39 +17,7 @@
-ifndef(EMQX_SESSION_HRL).
-define(EMQX_SESSION_HRL, true).

-record(session, {
%% Client's id
clientid :: emqx_types:clientid(),
id :: emqx_session:session_id(),
%% Is this session a persistent session i.e. was it started with Session-Expiry > 0
is_persistent :: boolean(),
%% Client’s Subscriptions.
subscriptions :: map(),
%% Max subscriptions allowed
max_subscriptions :: non_neg_integer() | infinity,
%% Upgrade QoS?
upgrade_qos :: boolean(),
%% Client <- Broker: QoS1/2 messages sent to the client but
%% have not been unacked.
inflight :: emqx_inflight:inflight(),
%% All QoS1/2 messages published to when client is disconnected,
%% or QoS1/2 messages pending transmission to the Client.
%%
%% Optionally, QoS0 messages pending transmission to the Client.
mqueue :: emqx_mqueue:mqueue(),
%% Next packet id of the session
next_pkt_id = 1 :: emqx_types:packet_id(),
%% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
retry_interval :: timeout(),
%% Client -> Broker: QoS2 messages received from the client, but
%% have not been completely acknowledged
awaiting_rel :: map(),
%% Maximum number of awaiting QoS2 messages allowed
max_awaiting_rel :: non_neg_integer() | infinity,
%% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer()
}).
-define(IS_SESSION_IMPL_MEM(S), (is_tuple(S) andalso element(1, S) =:= session)).
-define(IS_SESSION_IMPL_DS(S), (is_tuple(S) andalso element(1, S) =:= sessionds)).

-endif.
58 changes: 58 additions & 0 deletions apps/emqx/include/emqx_session_mem.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-ifndef(EMQX_SESSION_MEM_HRL).
-define(EMQX_SESSION_MEM_HRL, true).

-record(session, {
%% Client's id
clientid :: emqx_types:clientid(),
id :: emqx_session:session_id(),
%% Is this session a persistent session i.e. was it started with Session-Expiry > 0
is_persistent :: boolean(),
%% Client’s Subscriptions.
subscriptions :: map(),
%% Max subscriptions allowed
max_subscriptions :: non_neg_integer() | infinity,
%% Upgrade QoS?
upgrade_qos :: boolean(),
%% Client <- Broker: QoS1/2 messages sent to the client but
%% have not been unacked.
inflight :: emqx_inflight:inflight(),
%% All QoS1/2 messages published to when client is disconnected,
%% or QoS1/2 messages pending transmission to the Client.
%%
%% Optionally, QoS0 messages pending transmission to the Client.
mqueue :: emqx_mqueue:mqueue(),
%% Next packet id of the session
next_pkt_id = 1 :: emqx_types:packet_id(),
%% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
retry_interval :: timeout(),
%% Client -> Broker: QoS2 messages received from the client, but
%% have not been completely acknowledged
awaiting_rel :: map(),
%% Maximum number of awaiting QoS2 messages allowed
max_awaiting_rel :: non_neg_integer() | infinity,
%% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer(),

%% Timers
timers :: #{_Name => reference()}
}).

-endif.
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

start(_Type, _Args) ->
ok = maybe_load_config(),
_ = emqx_persistent_session_ds:init(),
_ = emqx_persistent_message:init(),
ok = maybe_start_quicer(),
ok = emqx_bpapi:start(),
ok = emqx_alarm_handler:load(),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ publish(Msg) when is_record(Msg, message) ->
}),
[];
Msg1 = #message{topic = Topic} ->
_ = emqx_persistent_session_ds:persist_message(Msg1),
_ = emqx_persistent_message:persist(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end.

Expand Down
112 changes: 39 additions & 73 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
%% Takeover
takeover :: boolean(),
%% Resume
resuming :: boolean(),
resuming :: false | _ReplayContext,
%% Pending delivers when takeovering
pendings :: list()
}).
Expand Down Expand Up @@ -403,7 +403,7 @@ handle_in(
#channel{clientinfo = ClientInfo, session = Session}
) ->
case emqx_session:puback(ClientInfo, PacketId, Session) of
{ok, Msg, NSession} ->
{ok, Msg, [], NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
{ok, Channel#channel{session = NSession}};
{ok, Msg, Publishes, NSession} ->
Expand Down Expand Up @@ -460,7 +460,7 @@ handle_in(
}
) ->
case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
{ok, NSession} ->
{ok, [], NSession} ->
{ok, Channel#channel{session = NSession}};
{ok, Publishes, NSession} ->
handle_out(publish, Publishes, Channel#channel{session = NSession});
Expand Down Expand Up @@ -593,12 +593,10 @@ process_connect(
{ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())),
{ok, #{session := Session, present := true, replay := ReplayContext}} ->
NChannel = Channel#channel{
session = Session,
resuming = true,
pendings = Pendings1
resuming = ReplayContext
},
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, ensure_connected(NChannel));
{error, client_id_unavailable} ->
Expand Down Expand Up @@ -725,9 +723,8 @@ do_publish(
{ok, PubRes, NSession} ->
RC = pubrec_reason_code(PubRes),
NChannel0 = Channel#channel{session = NSession},
NChannel1 = ensure_timer(expire_awaiting_rel, NChannel0),
NChannel2 = ensure_quota(PubRes, NChannel1),
handle_out(pubrec, {PacketId, RC}, NChannel2);
NChannel1 = ensure_quota(PubRes, NChannel0),
handle_out(pubrec, {PacketId, RC}, NChannel1);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out(pubrec, {PacketId, RC}, Channel);
Expand Down Expand Up @@ -900,8 +897,8 @@ maybe_update_expiry_interval(
%% Check if the client turns off persistence (turning it on is disallowed)
case EI =:= 0 andalso OldEI > 0 of
true ->
NSession = emqx_session:unpersist(NChannel#channel.session),
NChannel#channel{session = NSession};
ok = emqx_session:destroy(NChannel#channel.session),
NChannel#channel{session = undefined};
false ->
NChannel
end
Expand Down Expand Up @@ -937,10 +934,12 @@ handle_deliver(
clientinfo = ClientInfo
}
) ->
% NOTE
% This is essentially part of `emqx_session_mem` logic, thus call it directly.
Delivers1 = maybe_nack(Delivers),
NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session),
NChannel = Channel#channel{session = NSession},
{ok, NChannel};
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(
Delivers,
Channel = #channel{
Expand All @@ -950,11 +949,11 @@ handle_deliver(
}
) ->
case emqx_session:deliver(ClientInfo, Delivers, Session) of
{ok, [], NSession} ->
{ok, Channel#channel{session = NSession}};
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, ensure_timer(retry_delivery, NChannel));
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
handle_out(publish, Publishes, NChannel)
end.

%% Nack delivers from shared subscription
Expand Down Expand Up @@ -1164,7 +1163,9 @@ handle_call(
conninfo = #{clientid := ClientId}
}
) ->
ok = emqx_session:takeover(Session),
% NOTE
% This is essentially part of `emqx_session_mem` logic, thus call it directly.
ok = emqx_session_mem:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
Expand Down Expand Up @@ -1222,14 +1223,18 @@ handle_info(
{sock_closed, Reason},
Channel =
#channel{
conn_state = ConnState
conn_state = ConnState,
clientinfo = ClientInfo,
session = Session
}
) when
ConnState =:= connected orelse ConnState =:= reauthenticating
->
{Intent, Session1} = emqx_session:disconnect(ClientInfo, Session),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, ?REPLY_EVENT(disconnected), Channel2};
Channel2 = Channel1#channel{session = Session1},
case maybe_shutdown(Reason, Intent, Channel2) of
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
Shutdown -> Shutdown
end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
Expand Down Expand Up @@ -1302,41 +1307,14 @@ handle_timeout(
end;
handle_timeout(
_TRef,
_Name = retry_delivery,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
Name = retry_delivery,
Channel = #channel{session = Session, clientinfo = ClientInfo}
) ->
case emqx_session:retry(ClientInfo, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, clean_timer(Name, NChannel)};
{ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel))
end;
handle_timeout(
_TRef,
_Name = expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
Name = expire_awaiting_rel,
{emqx_session, Name},
Channel = #channel{session = Session, clientinfo = ClientInfo}
) ->
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, clean_timer(Name, NChannel)};
{ok, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, reset_timer(Name, Timeout, NChannel)}
case emqx_session:handle_timeout(ClientInfo, Name, Session) of
{ok, [], NSession} ->
{ok, Channel#channel{session = NSession}};
{ok, Replies, NSession} ->
handle_out(publish, Replies, Channel#channel{session = NSession})
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
Expand Down Expand Up @@ -1391,18 +1369,11 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).

reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)).

clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.

interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) ->
emqx_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) ->
emqx_session:info(await_rel_timeout, Session);
interval(expire_session, #channel{conninfo = ConnInfo}) ->
maps:get(expiry_interval, ConnInfo);
interval(will_message, #channel{will_msg = WillMsg}) ->
Expand Down Expand Up @@ -2053,30 +2024,25 @@ maybe_resume_session(#channel{resuming = false}) ->
ignore;
maybe_resume_session(#channel{
session = Session,
resuming = true,
pendings = Pendings,
resuming = ReplayContext,
clientinfo = ClientInfo
}) ->
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
case emqx_session:deliver(ClientInfo, Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};
{ok, More, Session2} ->
{ok, lists:append(Publishes, More), Session2}
end.
emqx_session:replay(ClientInfo, ReplayContext, Session).

%%--------------------------------------------------------------------
%% Maybe Shutdown the Channel

maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
maybe_shutdown(Reason, _Intent = idle, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_session, I, Channel)};
_ ->
shutdown(Reason, Channel)
end.
end;
maybe_shutdown(Reason, _Intent = shutdown, Channel) ->
shutdown(Reason, Channel).

%%--------------------------------------------------------------------
%% Parse Topic Filters
Expand Down

0 comments on commit bf16417

Please sign in to comment.