Skip to content

Commit

Permalink
fix(stomp): enrich sub-opts if sub-id/ack absent
Browse files Browse the repository at this point in the history
  • Loading branch information
HJianBo committed Nov 3, 2021
1 parent 14515e6 commit 0a7f04c
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions apps/emqx_stomp/src/emqx_stomp_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
default_user :: maybe(list())
}).

-define(DEFAULT_SUB_ACK, <<"auto">>).

-define(TIMER_TABLE, #{
incoming_timer => incoming,
outgoing_timer => outgoing,
Expand Down Expand Up @@ -287,12 +289,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
State = #pstate{subscriptions = Subs}) ->
Id = header(<<"id">>, Headers),
Topic = header(<<"destination">>, Headers),
Ack = header(<<"ack">>, Headers, <<"auto">>),
Ack = header(<<"ack">>, Headers, ?DEFAULT_SUB_ACK),

case find_sub_by_id(Id, Subs) of
{Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
{Topic, #{sub_props := #{id := Id}}} ->
?LOG(info, "Subscription has established: ~s", [Topic]),
maybe_send_receipt(receipt_id(Headers), State);
{InuseTopic, #{sub_props := #{id := InuseId}}} ->
?LOG(info, "Subscription id ~p inused by topic: ~s, "
"request topic: ~s", [InuseId, InuseTopic, Topic]),
send(error_frame(receipt_id(Headers),
["Request sub-id ", Id, " inused "]), State);
undefined ->
case check_acl(subscribe, Topic, State) of
allow ->
Expand Down Expand Up @@ -466,8 +473,9 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) ->

handle_info({subscribe, TopicFilters}, State) ->
NState = lists:foldl(
fun({TopicFilter, SubOpts}, StateAcc) ->
do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc)
fun({TopicFilter, SubOpts}, StateAcc = #pstate{subscriptions = Subs}) ->
NSubOpts = enrich_sub_opts(SubOpts, Subs),
do_subscribe(TopicFilter, NSubOpts, StateAcc)
end, State, parse_topic_filters(TopicFilters)),
{ok, NState};

Expand Down Expand Up @@ -646,15 +654,15 @@ do_subscribe(TopicFilter, SubOpts,
NSubOpts = SubOpts#{is_new => true},
_ = run_hooks('session.subscribed',
[ClientInfo, TopicFilter, NSubOpts]),

send_event_to_self(updated),
State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}.

do_unsubscribe(TopicFilter, SubOpts,
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
ok = emqx_broker:unsubscribe(TopicFilter),
_ = run_hooks('session.unsubscribe',
[ClientInfo, TopicFilter, SubOpts]),

send_event_to_self(updated),
State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}.

find_sub_by_topic(Topic, Subs) ->
Expand All @@ -677,6 +685,21 @@ is_acl_enabled(_) ->
%% TODO: configs from somewhere
true.

%% automaticly fill the next sub-id and ack if sub-id is absent
enrich_sub_opts(SubOpts0, Subs) ->
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
SubProps = maps:get(sub_props, SubOpts, #{}),
SubOpts#{sub_props =>
maps:merge(#{id => next_sub_id(Subs),
ack => ?DEFAULT_SUB_ACK}, SubProps)}.

next_sub_id(Subs) ->
Ids = maps:fold(fun(_, SubOpts, Acc) ->
[binary_to_integer(
maps:get(id, maps:get(sub_props, SubOpts, #{}), <<"0">>)) | Acc]
end, [], Subs),
integer_to_binary(lists:max(Ids) + 1).

%%--------------------------------------------------------------------
%% helpers

Expand All @@ -691,8 +714,7 @@ ensure_connected(State = #pstate{conninfo = ConnInfo,
connected => true,
connected_at => erlang:system_time(millisecond)
},
%% send connected event
self() ! {event, connected},
send_event_to_self(connected),
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
State#pstate{conninfo = NConnInfo,
connected = true
Expand All @@ -703,6 +725,9 @@ ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = Cl
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
State#pstate{conninfo = NConnInfo, connected = false}.

send_event_to_self(Name) ->
self() ! {event, Name}, ok.

run_hooks(Name, Args) ->
emqx_hooks:run(Name, Args).

Expand Down

0 comments on commit 0a7f04c

Please sign in to comment.