Skip to content

Commit

Permalink
Merge pull request #8887 from thalesmg/bugfix-will-msg-not-connected
Browse files Browse the repository at this point in the history
bugfix: do not publish last will when authentication failed
  • Loading branch information
thalesmg committed Sep 5, 2022
2 parents bf87889 + e0fcf07 commit 09d640c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES-5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Fix exhook `client.authorize` never being execauted. [#8780](https://github.com/emqx/emqx/pull/8780)
* Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
* Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)

## Enhancements

Expand Down
14 changes: 10 additions & 4 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
-include("logger.hrl").
-include("types.hrl").

-include_lib("snabbkaffe/include/snabbkaffe.hrl").

-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
Expand Down Expand Up @@ -1423,15 +1425,17 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
%%--------------------------------------------------------------------

-spec terminate(any(), channel()) -> ok.
terminate(_, #channel{conn_state = idle}) ->
terminate(_, #channel{conn_state = idle} = _Channel) ->
?tp(channel_terminated, #{channel => _Channel}),
ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate({shutdown, kicked}, Channel) ->
run_terminate_hook(kicked, Channel);
terminate({shutdown, Reason}, Channel) when
Reason =:= discarded;
Reason =:= takenover
Reason =:= takenover;
Reason =:= not_authorized
->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
Expand All @@ -1452,9 +1456,11 @@ persist_if_session(#channel{session = Session} = Channel) ->
ok
end.

run_terminate_hook(_Reason, #channel{session = undefined}) ->
run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
?tp(channel_terminated, #{channel => _Channel}),
ok;
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session} = _Channel) ->
?tp(channel_terminated, #{channel => _Channel}),
emqx_session:terminate(ClientInfo, Reason, Session).

%%--------------------------------------------------------------------
Expand Down
117 changes: 117 additions & 0 deletions apps/emqx_authn/test/emqx_authn_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_authn_SUITE).

-compile(export_all).
-compile(nowarn_export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").

%%=================================================================================
%% CT boilerplate
%%=================================================================================

all() ->
emqx_common_test_helpers:all(?MODULE).

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).

end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).

%%=================================================================================
%% Helpers fns
%%=================================================================================

%%=================================================================================
%% Testcases
%%=================================================================================

t_will_message_connection_denied({init, Config}) ->
emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]),
mria:clear_table(emqx_authn_mnesia),
AuthnConfig = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"built_in_database">>,
<<"user_id_type">> => <<"clientid">>
},
Chain = 'mqtt:global',
emqx:update_config(
[authentication],
{create_authenticator, Chain, AuthnConfig}
),
User = #{user_id => <<"subscriber">>, password => <<"p">>},
AuthenticatorID = <<"password_based:built_in_database">>,
{ok, _} = emqx_authentication:add_user(
Chain,
AuthenticatorID,
User
),
Config;
t_will_message_connection_denied({'end', _Config}) ->
emqx:update_config(
[authentication],
{delete_authenticator, 'mqtt:global', <<"password_based:built_in_database">>}
),
emqx_common_test_helpers:stop_apps([emqx_authn, emqx_conf]),
mria:clear_table(emqx_authn_mnesia),
ok;
t_will_message_connection_denied(Config) when is_list(Config) ->
{ok, Subscriber} = emqtt:start_link([
{clientid, <<"subscriber">>},
{password, <<"p">>}
]),
{ok, _} = emqtt:connect(Subscriber),
{ok, _, [?RC_SUCCESS]} = emqtt:subscribe(Subscriber, <<"lwt">>),

process_flag(trap_exit, true),

{ok, Publisher} = emqtt:start_link([
{clientid, <<"publisher">>},
{will_topic, <<"lwt">>},
{will_payload, <<"should not be published">>}
]),
snabbkaffe:start_trace(),
?wait_async_action(
{error, _} = emqtt:connect(Publisher),
#{?snk_kind := channel_terminated}
),
snabbkaffe:stop(),

receive
{publish, #{
topic := <<"lwt">>,
payload := <<"should not be published">>
}} ->
ct:fail("should not publish will message")
after 0 ->
ok
end,

ok.

0 comments on commit 09d640c

Please sign in to comment.