Skip to content

Commit

Permalink
Merge pull request #10746 from lafirest/fix/events_api
Browse files Browse the repository at this point in the history
fix: supports test the `$events/delivery_dropped` event by API
  • Loading branch information
lafirest committed May 22, 2023
2 parents 5f41c49 + 087dc59 commit 42f9c9a
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 1 deletion.
12 changes: 11 additions & 1 deletion apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ fields("rule_test") ->
ref("ctx_disconnected"),
ref("ctx_connack"),
ref("ctx_check_authz_complete"),
ref("ctx_bridge_mqtt")
ref("ctx_bridge_mqtt"),
ref("ctx_delivery_dropped")
]),
#{
desc => ?DESC("test_context"),
Expand Down Expand Up @@ -276,6 +277,15 @@ fields("ctx_bridge_mqtt") ->
{"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
{"message_received_at", publish_received_at_sc()},
qos()
];
fields("ctx_delivery_dropped") ->
[
{"event_type", event_type_sc(delivery_dropped)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
].

qos() ->
Expand Down
277 changes: 277 additions & 0 deletions apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_rule_engine_api_rule_test_SUITE).

-compile(nowarn_export_all).
-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).

all() ->
emqx_common_test_helpers:all(?MODULE).

init_per_suite(Config) ->
application:load(emqx_conf),
ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
Config.

end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
ok.

t_ctx_pub(_) ->
SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>,
Context = #{
clientid => <<"c_emqx">>,
event_type => message_publish,
payload => <<"{\"msg\": \"hello\"}">>,
qos => 1,
topic => <<"t/a">>,
username => <<"u_emqx">>
},
Expected = Context#{msg => <<"hello">>},
do_test(SQL, Context, Expected).

t_ctx_sub(_) ->
SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>,
Context = #{
clientid => <<"c_emqx">>,
event_type => session_subscribed,
qos => 1,
topic => <<"t/a">>,
username => <<"u_emqx">>
},

do_test(SQL, Context, Context).

t_ctx_unsub(_) ->
SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>,
Context = #{
clientid => <<"c_emqx">>,
event_type => session_unsubscribed,
qos => 1,
topic => <<"t/a">>,
username => <<"u_emqx">>
},
do_test(SQL, Context, Context).

t_ctx_delivered(_) ->
SQL =
<<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>,
Context = #{
clientid => <<"c_emqx_2">>,
event_type => message_delivered,
from_clientid => <<"c_emqx_1">>,
from_username => <<"u_emqx_1">>,
payload => <<"{\"msg\": \"hello\"}">>,
qos => 1,
topic => <<"t/a">>,
username => <<"u_emqx_2">>
},
Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context),
do_test(SQL, Context, Expected).

t_ctx_acked(_) ->
SQL =
<<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>,

Context = #{
clientid => <<"c_emqx_2">>,
event_type => message_acked,
from_clientid => <<"c_emqx_1">>,
from_username => <<"u_emqx_1">>,
payload => <<"{\"msg\": \"hello\"}">>,
qos => 1,
topic => <<"t/a">>,
username => <<"u_emqx_2">>
},

Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context),

do_test(SQL, Context, Expected).

t_ctx_droped(_) ->
SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>,
Topic = <<"t/a">>,
QoS = 1,
Reason = <<"no_subscribers">>,
Context = #{
clientid => <<"c_emqx">>,
event_type => message_dropped,
payload => <<"{\"msg\": \"hello\"}">>,
qos => QoS,
reason => Reason,
topic => Topic,
username => <<"u_emqx">>
},

Expected = with_node_timestampe([reason, topic, qos], Context),
do_test(SQL, Context, Expected).

t_ctx_connected(_) ->
SQL =
<<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>,

Context =
#{
clean_start => true,
clientid => <<"c_emqx">>,
event_type => client_connected,
is_bridge => false,
peername => <<"127.0.0.1:52918">>,
username => <<"u_emqx">>
},
Expected = check_result([clientid, username, keepalive, is_bridge], [], Context),
do_test(SQL, Context, Expected).

t_ctx_disconnected(_) ->
SQL =
<<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>,

Context =
#{
clientid => <<"c_emqx">>,
event_type => client_disconnected,
reason => <<"normal">>,
username => <<"u_emqx">>
},
Expected = check_result([clientid, username, reason], [disconnected_at, node], Context),
do_test(SQL, Context, Expected).

t_ctx_connack(_) ->
SQL =
<<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>,

Context =
#{
clean_start => true,
clientid => <<"c_emqx">>,
event_type => client_connack,
reason_code => <<"sucess">>,
username => <<"u_emqx">>
},
Expected = check_result([clientid, username, reason_code], [node], Context),
do_test(SQL, Context, Expected).

t_ctx_check_authz_complete(_) ->
SQL =
<<
"SELECT clientid, username, topic, action, result,\n"
"authz_source, node FROM \"$events/client_check_authz_complete\""
>>,

Context =
#{
action => <<"publish">>,
clientid => <<"c_emqx">>,
event_type => client_check_authz_complete,
result => <<"allow">>,
topic => <<"t/1">>,
username => <<"u_emqx">>
},
Expected = check_result(
[clientid, username, topic, action],
[authz_source, node, result],
Context
),

do_test(SQL, Context, Expected).

t_ctx_delivery_dropped(_) ->
SQL =
<<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>,

Context =
#{
clientid => <<"c_emqx_2">>,
event_type => delivery_dropped,
from_clientid => <<"c_emqx_1">>,
from_username => <<"u_emqx_1">>,
payload => <<"{\"msg\": \"hello\"}">>,
qos => 1,
reason => <<"queue_full">>,
topic => <<"t/a">>,
username => <<"u_emqx_2">>
},
Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context),
do_test(SQL, Context, Expected).

do_test(SQL, Context, Expected0) ->
Res = emqx_rule_engine_api:'/rule_test'(
post,
test_rule_params(SQL, Context)
),
?assertMatch({200, _}, Res),
{200, Result0} = Res,
Result = emqx_utils_maps:unsafe_atom_key_map(Result0),
case is_function(Expected0) of
false ->
Expected = maps:without([event_type], Expected0),
?assertMatch(Expected, Result, Expected);
_ ->
Expected0(Result)
end,
ok.

test_rule_params(Sql, Context) ->
#{
body => #{
<<"context">> => Context,
<<"sql">> => Sql
}
}.

with_node_timestampe(Keys, Context) ->
check_result(Keys, [node, timestamp], Context).

check_result(Keys, Exists, Context) ->
Log = fun(Format, Args) ->
lists:flatten(io_lib:format(Format, Args))
end,

Base = maps:with(Keys, Context),

fun(Result) ->
maps:foreach(
fun(Key, Value) ->
?assertEqual(
Value,
maps:get(Key, Result, undefined),
Log("Key:~p value error~nResult:~p~n", [Key, Result])
)
end,
Base
),

NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end,
lists:foreach(
fun(Key) ->
Find = maps:find(Key, Result),
Formatter = NotExists(Key),
?assertMatch({ok, _}, Find, Formatter),
?assertNotMatch({ok, undefined}, Find, Formatter),
?assertNotMatch({ok, <<"undefined">>}, Find, Formatter)
end,
Exists
),

?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result)
end.
1 change: 1 addition & 0 deletions changes/ce/fix-10746.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add missing support of the event `$events/delivery_dropped` into the rule engine test API `rule_test`.

0 comments on commit 42f9c9a

Please sign in to comment.