Skip to content

Commit

Permalink
Merge pull request #1213 from esl/reimplement_shaper
Browse files Browse the repository at this point in the history
Reimplement shaper as a token bucket.
  • Loading branch information
michalwski committed Mar 23, 2017
2 parents 7d59e2c + 381cf2c commit 524ae59
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 147 deletions.
30 changes: 22 additions & 8 deletions apps/ejabberd/src/ejabberd_c2s.erl
Expand Up @@ -831,7 +831,20 @@ do_open_session_common(JID, #state{user = U, resource = R} = NewStateData0) ->
Conn = get_conn_type(NewStateData0),
Info = [{ip, NewStateData0#state.ip}, {conn, Conn},
{auth_module, NewStateData0#state.auth_module}],
ejabberd_sm:open_session(SID, U, NewStateData0#state.server, R, Info),
ReplacedPids = ejabberd_sm:open_session(SID, U, NewStateData0#state.server, R, Info),

MonitorRefs = ordsets:from_list([{monitor(process, PID), PID} || PID <- ReplacedPids]),
lists:foreach(
fun({MonitorRef, PID}) ->
receive
{'DOWN', MonitorRef, _, _, _} -> ok
after 100 ->
?WARNING_MSG("C2S process ~p for ~s replaced by ~p has not stopped before timeout",
[PID, jid:to_binary(NewStateData0#state.jid), self()])
end
end,
MonitorRefs),

NewStateData =
NewStateData0#state{sid = SID,
conn = Conn,
Expand Down Expand Up @@ -1126,7 +1139,11 @@ handle_info({route, From, To, Packet}, StateName, StateData) ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
Name = Packet#xmlel.name,
process_incoming_stanza(Name, From, To, Packet, StateName, StateData);

handle_info(new_offline_messages, session_established,
#state{pres_last = Presence, pres_invis = Invisible} = StateData)
when Presence =/= undefined orelse Invisible ->
resend_offline_messages(mongoose_acc:new(), StateData),
{next_state, session_established, StateData};
handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
maybe_enter_resume_session(StateData#state.stream_mgmt_id, StateData);
Expand Down Expand Up @@ -2282,11 +2299,9 @@ resend_offline_messages(Acc, StateData) ->
Acc,
[StateData#state.user, StateData#state.server]),
Rs = mongoose_acc:get(offline_messages, Acc1, []),
Acc2 = lists:foldl(fun({route, From, To, #xmlel{} = Packet}, A) ->
check_privacy_and_route_or_ignore(A, StateData, From, To, Packet, in)
end,
Acc1, Rs),
mongoose_acc:remove(offline_messages, Acc2). % they are gone from db backend and sent
[check_privacy_and_route_or_ignore(StateData, From, To, Packet, in)
|| {route, From, To, #xmlel{} = Packet} <- Rs],
mongoose_acc:remove(offline_messages, Acc1). % they are gone from db backend and sent


-spec check_privacy_and_route_or_ignore(StateData :: state(),
Expand Down Expand Up @@ -3169,4 +3184,3 @@ terminate_when_tls_required_but_not_enabled(true, false, StateData, _El) ->
terminate_when_tls_required_but_not_enabled(_, _, StateData, El) ->
process_unauthenticated_stanza(StateData, El),
fsm_next_state(wait_for_feature_before_auth, StateData).

70 changes: 39 additions & 31 deletions apps/ejabberd/src/ejabberd_sm.erl
Expand Up @@ -138,29 +138,31 @@ route(From, To, Packet) ->
_ -> ok
end.

-spec open_session(SID, User, Server, Resource, Info) -> ok when
-spec open_session(SID, User, Server, Resource, Info) -> ReplacedPids when
SID :: 'undefined' | sid(),
User :: ejabberd:user(),
Server :: ejabberd:server(),
Resource :: binary(),
Info :: 'undefined' | [any()].
Info :: 'undefined' | [any()],
ReplacedPids :: [pid()].
open_session(SID, User, Server, Resource, Info) ->
open_session(SID, User, Server, Resource, undefined, Info).

-spec open_session(SID, User, Server, Resource, Priority, Info) -> ok when
-spec open_session(SID, User, Server, Resource, Priority, Info) -> ReplacedPids when
SID :: 'undefined' | sid(),
User :: ejabberd:user(),
Server :: ejabberd:server(),
Resource :: binary(),
Priority :: integer() | undefined,
Info :: 'undefined' | [any()].
Info :: 'undefined' | [any()],
ReplacedPids :: [pid()].
open_session(SID, User, Server, Resource, Priority, Info) ->
set_session(SID, User, Server, Resource, Priority, Info),
check_for_sessions_to_replace(User, Server, Resource),
ReplacedPIDs = check_for_sessions_to_replace(User, Server, Resource),
JID = jid:make(User, Server, Resource),
ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver,
[SID, JID, Info]),
ok.
ReplacedPIDs.

-spec close_session(SID, User, Server, Resource, Reason) -> ok when
SID :: 'undefined' | sid(),
Expand Down Expand Up @@ -835,54 +837,60 @@ get_user_present_resources(LUser, LServer) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc On new session, check if some existing connections need to be replace
-spec check_for_sessions_to_replace(User, Server, Resource) -> ok | replaced when
-spec check_for_sessions_to_replace(User, Server, Resource) -> ReplacedPids when
User :: ejabberd:user(),
Server :: ejabberd:server(),
Resource :: ejabberd:resource().
Resource :: ejabberd:resource(),
ReplacedPids :: [pid()].
check_for_sessions_to_replace(User, Server, Resource) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),

%% TODO: Depending on how this is executed, there could be an unneeded
%% replacement for max_sessions. We need to check this at some point.
check_existing_resources(LUser, LServer, LResource),
check_max_sessions(LUser, LServer).
ReplacedRedundantSessions = check_existing_resources(LUser, LServer, LResource),
AllReplacedSessionPids = check_max_sessions(LUser, LServer, ReplacedRedundantSessions),
[Pid ! replaced || Pid <- AllReplacedSessionPids],
AllReplacedSessionPids.

-spec check_existing_resources(LUser, LServer, LResource) -> ok when
-spec check_existing_resources(LUser, LServer, LResource) -> ReplacedSessionsPIDs when
LUser :: 'error' | ejabberd:luser() | tuple(),
LServer :: 'error' | ejabberd:lserver() | tuple(),
LResource :: 'error' | ejabberd:lresource() | [byte()] | tuple().
LResource :: 'error' | ejabberd:lresource() | [byte()] | tuple(),
ReplacedSessionsPIDs :: ordsets:ordset(pid()).
check_existing_resources(LUser, LServer, LResource) ->
%% A connection exist with the same resource. We replace it:
Sessions = ?SM_BACKEND:get_sessions(LUser, LServer, LResource),
SIDs = [S#session.sid || S <- Sessions],
if
SIDs == [] ->
ok;
true ->
case [S#session.sid || S <- Sessions] of
[] -> [];
SIDs ->
MaxSID = lists:max(SIDs),
lists:foreach(
fun({_, Pid} = S) when S /= MaxSID ->
Pid ! replaced;
(_) -> ok
end, SIDs)
ordsets:from_list([Pid || {_, Pid} = S <- SIDs, S /= MaxSID])
end.


-spec check_max_sessions(LUser :: ejabberd:user(), LServer :: ejabberd:server()) -> ok | replaced.
check_max_sessions(LUser, LServer) ->
-spec check_max_sessions(LUser :: ejabberd:user(), LServer :: ejabberd:server(),
ReplacedPIDs :: [pid()]) -> AllReplacedPIDs :: ordsets:ordset(pid()).
check_max_sessions(LUser, LServer, ReplacedPIDs) ->
%% If the max number of sessions for a given is reached, we replace the
%% first one
Sessions = ?SM_BACKEND:get_sessions(LUser, LServer),
SIDs = [S#session.sid || S <- Sessions],
SIDs = lists:filtermap(
fun(Session) ->
{_, Pid} = SID = Session#session.sid,
case ordsets:is_element(Pid, ReplacedPIDs) of
true -> false;
false -> {true, SID}
end
end,
?SM_BACKEND:get_sessions(LUser, LServer)),

MaxSessions = get_max_user_sessions(LUser, LServer),
if
length(SIDs) =< MaxSessions ->
ok;
true ->
case length(SIDs) =< MaxSessions of
true -> ordsets:to_list(ReplacedPIDs);
false ->
{_, Pid} = lists:min(SIDs),
Pid ! replaced
[Pid | ordsets:to_list(ReplacedPIDs)]
end.


Expand Down
19 changes: 5 additions & 14 deletions apps/ejabberd/src/mod_muc.erl
Expand Up @@ -35,7 +35,7 @@
-export([start_link/2,
start/2,
stop/1,
room_destroyed/4,
room_destroyed/3,
store_room/3,
restore_room/2,
forget_room/2,
Expand Down Expand Up @@ -176,11 +176,10 @@ stop(Host) ->
%% C) mod_muc:stop was called, and each room is being terminated
%% In this case, the mod_muc process died before the room processes
%% So the message sending must be catched
-spec room_destroyed(ejabberd:server(), room(), pid(),
ejabberd:server()) -> 'ok'.
room_destroyed(Host, Room, Pid, ServerHost) ->
catch gen_mod:get_module_proc(ServerHost, ?PROCNAME) !
{room_destroyed, {Room, Host}, Pid},
-spec room_destroyed(ejabberd:server(), room(), pid()) -> 'ok'.
room_destroyed(Host, Room, Pid) ->
F = fun() -> mnesia:delete_object(#muc_online_room{name_host = {Room, Host}, pid = Pid}) end,
{atomic, ok} = mnesia:transaction(F),
ok.


Expand Down Expand Up @@ -393,13 +392,6 @@ handle_info({route, From, To, Packet}, State) ->
ok
end,
{noreply, State};
handle_info({room_destroyed, RoomHost, Pid}, State) ->
F = fun() ->
mnesia:delete_object(#muc_online_room{name_host = RoomHost,
pid = Pid})
end,
mnesia:transaction(F),
{noreply, State};
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
clean_table_from_bad_node(Node),
{noreply, State};
Expand Down Expand Up @@ -1246,4 +1238,3 @@ ensure_metrics(_Host) ->
mongoose_metrics:ensure_metric(global, [mod_muc, online_rooms],
{function, mod_muc, online_rooms_number, [],
eval, ?EX_EVAL_SINGLE_VALUE}).

3 changes: 1 addition & 2 deletions apps/ejabberd/src/mod_muc_room.erl
Expand Up @@ -744,8 +744,7 @@ terminate(Reason, _StateName, StateData) ->
tab_remove_online_user(LJID, StateData)
end, [], StateData#state.users),
add_to_log(room_existence, stopped, StateData),
mod_muc:room_destroyed(StateData#state.host, StateData#state.room, self(),
StateData#state.server_host),
mod_muc:room_destroyed(StateData#state.host, StateData#state.room, self()),
ok.

%%%----------------------------------------------------------------------
Expand Down
26 changes: 21 additions & 5 deletions apps/ejabberd/src/mod_offline.erl
Expand Up @@ -77,7 +77,12 @@

-export_type([msg/0]).

-record(state, {host, access_max_user_messages}).
-record(state, {
host :: ejabberd:server(),
access_max_user_messages,
message_poppers = monitored_map:new() ::
monitored_map:t({LUser :: binary(), LServer :: binary}, pid())
}).

%% ------------------------------------------------------------------
%% Backend callbacks
Expand Down Expand Up @@ -285,6 +290,10 @@ init([Host, AccessMaxOfflineMsgs]) ->
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({pop_offline_messages, LUser, LServer}, {Pid, _}, State) ->
Result = ?BACKEND:pop_messages(LUser, LServer),
NewPoppers = monitored_map:put({LUser, LServer}, Pid, Pid, State#state.message_poppers),
{reply, Result, State#state{message_poppers = NewPoppers}};
handle_call(_, _, State) ->
{reply, ok, State}.

Expand All @@ -294,7 +303,6 @@ handle_call(_, _, State) ->
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------

handle_cast(Msg, State) ->
?WARNING_MSG("Strange message ~p.", [Msg]),
{noreply, State}.
Expand All @@ -305,9 +313,17 @@ handle_cast(Msg, State) ->
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(Msg=#offline_msg{},
State=#state{access_max_user_messages = AccessMaxOfflineMsgs}) ->
handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info} = Msg, State) ->
NewPoppers = monitored_map:handle_info(Msg, State#state.message_poppers),
{noreply, State#state{message_poppers = NewPoppers}};
handle_info(Msg = #offline_msg{us = US},
State = #state{access_max_user_messages = AccessMaxOfflineMsgs}) ->
handle_offline_msg(Msg, AccessMaxOfflineMsgs),
case monitored_map:find(US, State#state.message_poppers) of
{ok, Pid} ->
Pid ! new_offline_messages;
error -> ok
end,
{noreply, State};
handle_info(Msg, State) ->
?WARNING_MSG("Strange message ~p.", [Msg]),
Expand Down Expand Up @@ -492,7 +508,7 @@ pop_offline_messages(User, Server) ->
end.

pop_messages(LUser, LServer) ->
case ?BACKEND:pop_messages(LUser, LServer) of
case gen_server:call(srv_name(LServer), {pop_offline_messages, LUser, LServer}) of
{ok, RsAll} ->
TimeStamp = os:timestamp(),
Rs = skip_expired_messages(TimeStamp, lists:keysort(#offline_msg.timestamp, RsAll)),
Expand Down
3 changes: 1 addition & 2 deletions apps/ejabberd/src/mongoose_client_api_sse.erl
Expand Up @@ -23,7 +23,7 @@ maybe_init(true, Req, #{jid := JID} = State) ->
UUID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
Resource = <<"sse-", UUID/binary>>,

ok = ejabberd_sm:open_session(SID, User, Server, Resource, 1, []),
ejabberd_sm:open_session(SID, User, Server, Resource, 1, []),

{ok, Req, State#{sid => SID, jid => jid:replace_resource(JID, Resource)}};
maybe_init(true, Req, State) ->
Expand Down Expand Up @@ -66,4 +66,3 @@ maybe_send_message_event(<<"groupchat">>, Packet, Timestamp, #{id := ID} = State
{send, Event, State#{id := ID + 1}};
maybe_send_message_event(_, _, _, State) ->
{nosend, State}.

0 comments on commit 524ae59

Please sign in to comment.