diff --git a/include/rule_events.hrl b/include/rule_events.hrl index 18085b664..a3abc01cc 100644 --- a/include/rule_events.hrl +++ b/include/rule_events.hrl @@ -46,6 +46,7 @@ , <<"qos">> , <<"timestamp">> , <<"topic">> + , <<"node">> ]; 'message.deliver' -> [ <<"client_id">> @@ -59,6 +60,7 @@ , <<"topic">> , <<"qos">> , <<"timestamp">> + , <<"node">> ]; 'message.acked' -> [ <<"client_id">> @@ -70,18 +72,19 @@ , <<"topic">> , <<"qos">> , <<"timestamp">> + , <<"node">> ]; 'message.dropped' -> [ <<"client_id">> , <<"username">> , <<"event">> , <<"id">> - , <<"node">> , <<"payload">> , <<"peername">> , <<"qos">> , <<"timestamp">> , <<"topic">> + , <<"node">> ]; 'client.connected' -> [ <<"client_id">> @@ -96,6 +99,8 @@ , <<"mountpoint">> , <<"peername">> , <<"proto_ver">> + , <<"timestamp">> + , <<"node">> ]; 'client.disconnected' -> [ <<"client_id">> @@ -105,6 +110,8 @@ , <<"mountpoint">> , <<"peername">> , <<"reason_code">> + , <<"timestamp">> + , <<"node">> ]; 'client.subscribe' -> [ <<"client_id">> @@ -116,6 +123,8 @@ , <<"topic_filters">> , <<"topic">> , <<"qos">> + , <<"timestamp">> + , <<"node">> ]; 'client.unsubscribe' -> [ <<"client_id">> @@ -127,6 +136,8 @@ , <<"topic_filters">> , <<"topic">> , <<"qos">> + , <<"timestamp">> + , <<"node">> ]; RuleType -> error({unknown_rule_type, RuleType}) @@ -273,6 +284,7 @@ id => <<0,5,137,164,41,233,87,47,180,75,0,0,5,124,0,1>>, payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, timestamp => erlang:timestamp(), + node => node(), topic => <<"t1">>}; 'message.deliver' -> #{anonymous => true,auth_result => success, @@ -290,6 +302,7 @@ peername => {{127,0,0,1},50891}, qos => 1, sockname => {{127,0,0,1},1883}, + node => node(), timestamp => erlang:timestamp(), topic => <<"t1">>,username => <<"u_emqx">>, ws_cookie => undefined,zone => external}; @@ -305,6 +318,7 @@ id => <<0,5,137,164,41,233,87,47,180,75,0,0,5,124,0,1>>, payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, timestamp => erlang:timestamp(), + node => node(), topic => <<"t1">>,username => <<"u_emqx">>}; 'message.dropped' -> #{event => 'message.dropped', @@ -315,7 +329,7 @@ peername => {{127,0,0,1},50891}, username => <<"u_emqx">>}, id => <<0,5,137,164,41,236,124,3,180,75,0,0,5,124,0,2>>, - node => nonode@nohost, + node => node(), payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,qos => 1, timestamp => erlang:timestamp(), topic => <<"t1">>}; @@ -345,6 +359,7 @@ peername => {{127,0,0,1},50891}, sockname => {{127,0,0,1},1883}, username => <<"u_emqx">>,ws_cookie => undefined, + node => node(), zone => external}; 'client.disconnected' -> #{anonymous => true,auth_result => success, @@ -354,6 +369,8 @@ reason_code => closed, sockname => {{127,0,0,1},1883}, username => <<"u_emqx">>,ws_cookie => undefined, + node => node(), + timestamp => erlang:timestamp(), zone => external}; 'client.subscribe' -> #{anonymous => true,auth_result => success, @@ -364,6 +381,8 @@ topic_filters => [{<<"t1">>,#{nl => 0,qos => 1,rap => 0,rc => 1,rh => 0}}], username => <<"u_emqx">>,ws_cookie => undefined, + node => node(), + timestamp => erlang:timestamp(), zone => external}; 'client.unsubscribe' -> #{anonymous => true,auth_result => success, @@ -373,7 +392,9 @@ sockname => {{127,0,0,1},1883}, topic_filters => [{<<"t1">>,#{}}], username => <<"u_emqx">>,ws_cookie => undefined, + node => node(), + timestamp => erlang:timestamp(), zone => external}; RuleType -> - error({unknown_event_type, RuleType}) + error({unknown_event_type, RuleType}) end). \ No newline at end of file diff --git a/src/emqx_rule_runtime.erl b/src/emqx_rule_runtime.erl index 255567d0b..7839d32b5 100644 --- a/src/emqx_rule_runtime.erl +++ b/src/emqx_rule_runtime.erl @@ -62,23 +62,23 @@ hook_rules(Name, Fun, Env) -> on_client_connected(Credentials = #{client_id := ClientId}, ConnAck, ConnAttrs, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Client(~s) connected, connack: ~w", [ClientId, ConnAck]), - ApplyRules(maps:merge(Credentials, #{event => 'client.connected', connack => ConnAck, connattrs => ConnAttrs})). + ApplyRules(maps:merge(Credentials, #{event => 'client.connected', connack => ConnAck, connattrs => ConnAttrs, node => node()})). on_client_disconnected(Credentials = #{client_id := ClientId}, ReasonCode, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Client(~s) disconnected, reason_code: ~w", [ClientId, ReasonCode]), - ApplyRules(maps:merge(Credentials, #{event => 'client.disconnected', reason_code => ReasonCode})). + ApplyRules(maps:merge(Credentials, #{event => 'client.disconnected', reason_code => ReasonCode, node => node(), timestamp => erlang:timestamp()})). on_client_subscribe(Credentials = #{client_id := ClientId}, TopicFilters, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Client(~s) will subscribe: ~p", [ClientId, TopicFilters]), - ApplyRules(maps:merge(Credentials, #{event => 'client.subscribe', topic_filters => TopicFilters})), + ApplyRules(maps:merge(Credentials, #{event => 'client.subscribe', topic_filters => TopicFilters, node => node(), timestamp => erlang:timestamp()})), {ok, TopicFilters}. on_client_unsubscribe(Credentials = #{client_id := ClientId}, TopicFilters, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Client(~s) unsubscribe ~p", [ClientId, TopicFilters]), - ApplyRules(maps:merge(Credentials, #{event => 'client.unsubscribe', topic_filters => TopicFilters})), + ApplyRules(maps:merge(Credentials, #{event => 'client.unsubscribe', topic_filters => TopicFilters, node => node(), timestamp => erlang:timestamp()})), {ok, TopicFilters}. on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, @@ -87,7 +87,7 @@ on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, on_message_publish(Message, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Publish ~s", [emqx_message:format(Message)]), - ApplyRules(maps:merge(emqx_message:to_map(Message), #{event => 'message.publish'})), + ApplyRules(maps:merge(emqx_message:to_map(Message), #{event => 'message.publish', node => node()})), {ok, Message}. on_message_dropped(_, Message = #message{topic = <<"$SYS/", _/binary>>}, @@ -103,13 +103,13 @@ on_message_dropped(_, Message, #{apply_fun := ApplyRules}) -> on_message_deliver(Credentials = #{client_id := ClientId}, Message, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Deliver message to client(~s): ~s", [ClientId, emqx_message:format(Message)]), - ApplyRules(maps:merge(Credentials#{event => 'message.deliver'}, emqx_message:to_map(Message))), + ApplyRules(maps:merge(Credentials#{event => 'message.deliver', node => node()}, emqx_message:to_map(Message))), {ok, Message}. on_message_acked(#{client_id := ClientId, username := Username}, Message, #{apply_fun := ApplyRules}) -> ?LOG(debug, "[RuleEngine] Session(~s) acked message: ~s", [ClientId, emqx_message:format(Message)]), - ApplyRules(maps:merge(emqx_message:to_map(Message), #{event => 'message.acked', client_id => ClientId, username => Username})), + ApplyRules(maps:merge(emqx_message:to_map(Message), #{event => 'message.acked', client_id => ClientId, username => Username, node => node()})), {ok, Message}. %%------------------------------------------------------------------------------