Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the massages order when batch dispatching #2650

Merged
merged 1 commit into from Jun 26, 2019

Conversation

@tradingtrace
Copy link
Contributor

tradingtrace commented Jun 23, 2019

handle_dispatch(Msgs, State = #state{inflight = Inflight,            % [1, 2, 3]
                                     client_id = ClientId,
                                     username = Username,
                                     subscriptions = SubMap}) ->
    SessProps = #{client_id => ClientId, username => Username},
    %% Drain the mailbox and batch deliver
    Msgs1 = drain_m(batch_n(Inflight), Msgs),                        % [3, 2, 1, 4, 5, 6]
    %% Ack the messages for shared subscription
    Msgs2 = maybe_ack_shared(Msgs1, State),
    %% Process suboptions
    Msgs3 = lists:foldr(
              fun({Topic, Msg}, Acc) ->
                      SubOpts = find_subopts(Topic, SubMap),
                      case process_subopts(SubOpts, Msg, State) of
                          {ok, Msg1} -> [Msg1|Acc];
                          ignore ->
                              emqx_hooks:run('message.dropped', [SessProps, Msg]),
                              Acc
                      end
              end, [], Msgs2),
    NState = batch_process(Msgs3, State),
    noreply(ensure_stats_timer(NState)).

drain_m(Cnt, Msgs) when Cnt =< 0 ->
    lists:reverse(Msgs);
drain_m(Cnt, Msgs) ->
    receive
        {dispatch, Topic, Msg} when is_record(Msg, message)->
            drain_m(Cnt-1, [{Topic, Msg} | Msgs]);                   % [4, 1, 2, 3]
        {dispatch, Topic, InMsgs} when is_list(InMsgs) ->
            Msgs1 = lists:foldl(
                      fun(Msg, Acc) ->
                              [{Topic, Msg} | Acc]                   % [6, 5, 4, 1, 2, 3]
                      end, Msgs, InMsgs),
            drain_m(Cnt-length(InMsgs), Msgs1)
    after 0 ->
        lists:reverse(Msgs)
    end.

The Msgs has not been sorted in function handle_dispatch, so I think we assume that it has been sorted before passed in. But before returning from drain_m, the Msgs is reversed.

i.e.

%% at first
Msgs = [1, 2, 3]

%% receive 1 message from mailbox ( receive {dispatch, Topic, Msg} )
[4 | [1, 2, 3]] -> [4, 1, 2, 3]

%% receive 2 messages at once ( receive {dispatch, Topic, InMsgs} -> foldl )
[6| [5| [4, 1, 2, 3]]] -> [6, 5, 4, 1, 2, 3]

%% then reverse
[3, 2, 1, 4, 5, 6]
                  
%% but it should be
[1, 2, 3, 4, 5, 6]

And we don't know the length of the passed in Msgs, it has already been a batch, what about draining the mailbox only when handling one message, not a message list? I notice that only emqx_retainer will dispatch a bunch of messages.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 23, 2019

Pull Request Test Coverage Report for Build 5893

  • 2 of 2 (100.0%) changed or added relevant lines in 1 file are covered.
  • 2 unchanged lines in 2 files lost coverage.
  • Overall coverage increased (+0.03%) to 70.555%

Files with Coverage Reduction New Missed Lines %
src/emqx_router.erl 1 91.8%
src/emqx_alarm_handler.erl 1 75.0%
Totals Coverage Status
Change from base Build 5890: 0.03%
Covered Lines: 3647
Relevant Lines: 5169

💛 - Coveralls
@gilbertwong96 gilbertwong96 requested a review from emqplus Jun 23, 2019
@turtleDeng turtleDeng added the BUG label Jun 24, 2019
@turtleDeng turtleDeng added this to the v3.2-rc.2 milestone Jun 24, 2019
@turtleDeng turtleDeng requested review from gilbertwong96 and HJianBo and removed request for emqplus Jun 24, 2019
@tradingtrace tradingtrace force-pushed the tradingtrace:develop branch from 294e599 to b14a060 Jun 24, 2019
@tradingtrace

This comment has been minimized.

Copy link
Contributor Author

tradingtrace commented Jun 24, 2019

Thanks @Gilbert-Wong

@turtleDeng turtleDeng merged commit 0b116bc into emqx:develop Jun 26, 2019
2 checks passed
2 checks passed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
coverage/coveralls Coverage increased (+0.03%) to 70.555%
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.