Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support retain as published in subscription options #1788

Merged
merged 11 commits into from Sep 7, 2018
7 changes: 5 additions & 2 deletions src/emqx_frame.erl
Expand Up @@ -76,6 +76,9 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length,
error(mqtt_frame_too_large);
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
%% Match DISCONNECT without payload
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, _Options) ->
wrap(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}, Rest);
%% Match PINGREQ.
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 0, Options);
Expand Down Expand Up @@ -233,8 +236,8 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
parse_will_message(Packet, Bin) ->
{Packet, Bin}.

protocol_approved(Ver, Name) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
% protocol_approved(Ver, Name) ->
% lists:member({Ver, Name}, ?PROTOCOL_NAMES).

parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
{PacketId, Rest}.
Expand Down
57 changes: 30 additions & 27 deletions src/emqx_metrics.erl
Expand Up @@ -34,32 +34,33 @@

%% Packets sent and received of broker
-define(PACKET_METRICS, [
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect'}, % DISCONNECT Packets received
{counter, 'packets/auth'} % Auth Packets received
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect/received'}, % DISCONNECT Packets received
{counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent
{counter, 'packets/auth'} % Auth Packets received
]).

%% Messages sent and received of broker
Expand Down Expand Up @@ -194,7 +195,7 @@ received2(?UNSUBSCRIBE) ->
received2(?PINGREQ) ->
inc('packets/pingreq');
received2(?DISCONNECT) ->
inc('packets/disconnect');
inc('packets/disconnect/received');
received2(_) ->
ignore.
qos_received(?QOS_0) ->
Expand Down Expand Up @@ -233,6 +234,8 @@ sent2(?UNSUBACK) ->
inc('packets/unsuback');
sent2(?PINGRESP) ->
inc('packets/pingresp');
sent2(?DISCONNECT) ->
inc('packets/disconnect/sent');
sent2(_Type) ->
ignore.
qos_sent(?QOS_0) ->
Expand Down
4 changes: 2 additions & 2 deletions src/emqx_mqtt_types.erl
Expand Up @@ -32,8 +32,8 @@
-type(reason_code() :: 0..16#FF).
-type(packet_id() :: 1..16#FFFF).
-type(properties() :: #{atom() => term()}).
-type(subopts() :: #{rh := 0 | 1,
rap := 0 | 1 | 2,
-type(subopts() :: #{rh := 0 | 1 | 2,
rap := 0 | 1,
nl := 0 | 1,
qos := qos(),
rc => reason_code()
Expand Down
37 changes: 19 additions & 18 deletions src/emqx_protocol.erl
Expand Up @@ -208,6 +208,9 @@ received(Packet = ?PACKET(Type), PState) ->
true ->
{Packet1, PState1} = preprocess_properties(Packet, PState),
process_packet(Packet1, inc_stats(recv, Type, PState1));
{'EXIT', {topic_filters_invalid, _Stacktrace}} ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, topic_filters_invalid, PState};
{'EXIT', {Reason, _Stacktrace}} ->
deliver({disconnect, ?RC_MALFORMED_PACKET}, PState),
{error, Reason, PState}
Expand Down Expand Up @@ -356,9 +359,17 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session =
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};

process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint}) ->
PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
true ->
RawTopicFilters
end,
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
Expand Down Expand Up @@ -396,9 +407,11 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);

process_packet(?PACKET(?DISCONNECT), PState) ->
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
%% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}}.
{stop, normal, PState#pstate{will_msg = undefined}};
process_packet(?DISCONNECT_PACKET(_), PState) ->
{stop, normal, PState}.

%%------------------------------------------------------------------------------
%% ConnAck --> Client
Expand Down Expand Up @@ -485,10 +498,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);

deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);

deliver({puback, PacketId, ReasonCode}, PState) ->
Expand Down Expand Up @@ -736,18 +749,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
lists:map(fun emqx_topic:parse/1, RawTopicFilters).

%%-----------------------------------------------------------------------------
%% The retained flag should be propagated for bridge.
%%-----------------------------------------------------------------------------

clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) ->
case maps:get(retained, Headers, false) of
true -> Msg;
false -> emqx_message:set_flag(retain, false, Msg)
end;
clean_retain(_IsBridge, Msg) ->
Msg.

%%------------------------------------------------------------------------------
%% Update mountpoint

Expand Down
17 changes: 11 additions & 6 deletions src/emqx_session.erl
Expand Up @@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
Expand Down Expand Up @@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
%% Dispatch message
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
noreply(case maps:find(Topic, SubMap) of
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
error ->
dispatch(emqx_message:unset_flag(dup, Msg), State)
end);
Expand Down Expand Up @@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State =
run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State);
run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) ->
Flags1 = maps:put(retain, false, Flags),
run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State);
run_dispatch_steps([{rap, _}|Steps], Msg, State) ->
run_dispatch_steps(Steps, Msg, State);
run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).

Expand Down
13 changes: 10 additions & 3 deletions src/emqx_topic.erl
Expand Up @@ -184,9 +184,16 @@ parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic});
parse(<<"$queue/", Topic1/binary>>, Options) ->
parse(Topic1, maps:put(share, <<"$queue">>, Options));
parse(<<"$share/", Topic1/binary>>, Options) ->
[Group, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, maps:put(share, Group, Options)};
parse(Topic = <<"$share/", Topic1/binary>>, Options) ->
case binary:split(Topic1, <<"/">>) of
[<<>>] -> error({invalid_topic, Topic});
[_] -> error({invalid_topic, Topic});
[Group, Topic2] ->
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
nomatch -> {Topic2, maps:put(share, Group, Options)};
_ -> error({invalid_topic, Topic})
end
end;
parse(Topic, Options) ->
{Topic, Options}.