Skip to content

Commit

Permalink
Merge pull request #8260 from terry-xiaoyu/remove_headers_field_from_…
Browse files Browse the repository at this point in the history
…rule_events

fix: remove the 'headers' field from the rule events
  • Loading branch information
terry-xiaoyu committed Jun 17, 2022
2 parents 0be8fd9 + f132b6f commit 89a51ac
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 174 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_bridge/src/emqx_bridge.erl
Expand Up @@ -118,7 +118,7 @@ unload_hook() ->
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of
false ->
Msg = emqx_rule_events:eventmsg_publish(Message),
{Msg, _} = emqx_rule_events:eventmsg_publish(Message),
send_to_matched_egress_bridges(Topic, Msg);
true ->
ok
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
Expand Up @@ -63,7 +63,8 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
exp_msg().
to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
{Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
MapMsg = maps:put(retain, Retain0, Columns),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{
remote_topic := TopicToken,
Expand Down
2 changes: 2 additions & 0 deletions apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
Expand Up @@ -25,6 +25,8 @@

-import(hoconsc, [mk/2, ref/2, array/1]).

-export([printable_function_name/2]).

%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).

Expand Down
15 changes: 11 additions & 4 deletions apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl
Expand Up @@ -84,10 +84,17 @@ pretty_print_rule(ID) ->
end.

%% erlfmt-ignore
format_action(#{func := Func, args := Args}) ->
io_lib:format("Function:\n ~p\n"
"Args:\n ~p\n"
,[Func, maps:without([preprocessed_tmpl], Args)]
format_action(#{mod := Mod, func := Func, args := Args}) ->
Name = emqx_rule_engine_api:printable_function_name(Mod, Func),
io_lib:format("- Name: ~s\n"
" Type: function\n"
" Args: ~p\n"
,[Name, maps:without([preprocessed_tmpl], Args)]
);
format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
io_lib:format("- Name: ~s\n"
" Type: data-bridge\n"
,[BridgeChannelId]
).

left_pad(Str) ->
Expand Down
114 changes: 61 additions & 53 deletions apps/emqx_rule_engine/src/emqx_rule_events.erl
Expand Up @@ -107,36 +107,41 @@ unload(Topic) ->
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
on_message_publish(Message = #message{topic = Topic}, _Env) ->
on_message_publish(Message = #message{topic = Topic}, _Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
case emqx_rule_engine:get_rules_for_topic(Topic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
[] ->
ok;
Rules ->
%% ENVs are the fields that can't be refereced by the SQL, but can be used
%% from actions. e.g. The 'headers' field in the internal record `#message{}`.
{Columns, Envs} = eventmsg_publish(Message),
emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
end
end,
{ok, Message}.

on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) ->
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
on_bridge_message_received(Message, Conf = #{event_topic := BridgeTopic}) ->
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message, #{}) end, Conf).

on_client_connected(ClientInfo, ConnInfo, Env) ->
on_client_connected(ClientInfo, ConnInfo, Conf) ->
apply_event(
'client.connected',
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end,
Env
Conf
).

on_client_connack(ConnInfo, Reason, _, Env) ->
on_client_connack(ConnInfo, Reason, _, Conf) ->
apply_event(
'client.connack',
fun() -> eventmsg_connack(ConnInfo, Reason) end,
Env
Conf
).

on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) ->
on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Conf) ->
apply_event(
'client.check_authz_complete',
fun() ->
Expand All @@ -148,82 +153,82 @@ on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, E
AuthzSource
)
end,
Env
Conf
).

on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) ->
apply_event(
'client.disconnected',
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end,
Env
Conf
).

on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
on_session_subscribed(ClientInfo, Topic, SubOpts, Conf) ->
apply_event(
'session.subscribed',
fun() ->
eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
end,
Env
Conf
).

on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Conf) ->
apply_event(
'session.unsubscribed',
fun() ->
eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
end,
Env
Conf
).

on_message_dropped(Message, _, Reason, Env) ->
on_message_dropped(Message, _, Reason, Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
apply_event(
'message.dropped',
fun() -> eventmsg_dropped(Message, Reason) end,
Env
Conf
)
end,
{ok, Message}.

on_message_delivered(ClientInfo, Message, Env) ->
on_message_delivered(ClientInfo, Message, Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
apply_event(
'message.delivered',
fun() -> eventmsg_delivered(ClientInfo, Message) end,
Env
Conf
)
end,
{ok, Message}.

on_message_acked(ClientInfo, Message, Env) ->
on_message_acked(ClientInfo, Message, Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
apply_event(
'message.acked',
fun() -> eventmsg_acked(ClientInfo, Message) end,
Env
Conf
)
end,
{ok, Message}.

on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
on_delivery_dropped(ClientInfo, Message, Reason, Conf) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
apply_event(
'delivery.dropped',
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end,
Env
Conf
)
end,
{ok, Message}.
Expand Down Expand Up @@ -256,10 +261,9 @@ eventmsg_publish(
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
publish_received_at => Timestamp
}
},
#{headers => Headers}
).

eventmsg_connected(
Expand Down Expand Up @@ -299,7 +303,8 @@ eventmsg_connected(
is_bridge => IsBridge,
conn_props => printable_maps(ConnProps),
connected_at => ConnectedAt
}
},
#{}
).

eventmsg_disconnected(
Expand Down Expand Up @@ -328,7 +333,8 @@ eventmsg_disconnected(
proto_ver => ProtoVer,
disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})),
disconnected_at => DisconnectedAt
}
},
#{}
).

eventmsg_connack(
Expand Down Expand Up @@ -360,7 +366,8 @@ eventmsg_connack(
keepalive => Keepalive,
expiry_interval => ExpiryInterval,
conn_props => printable_maps(ConnProps)
}
},
#{}
).

eventmsg_check_authz_complete(
Expand All @@ -384,7 +391,8 @@ eventmsg_check_authz_complete(
action => PubSub,
authz_source => AuthzSource,
result => Result
}
},
#{}
).

eventmsg_sub_or_unsub(
Expand All @@ -407,7 +415,8 @@ eventmsg_sub_or_unsub(
PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})),
topic => Topic,
qos => QoS
}
},
#{}
).

eventmsg_dropped(
Expand Down Expand Up @@ -435,11 +444,10 @@ eventmsg_dropped(
topic => Topic,
qos => QoS,
flags => Flags,
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}
},
#{headers => Headers}
).

eventmsg_delivered(
Expand Down Expand Up @@ -472,11 +480,10 @@ eventmsg_delivered(
topic => Topic,
qos => QoS,
flags => Flags,
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}
},
#{headers => Headers}
).

eventmsg_acked(
Expand Down Expand Up @@ -509,12 +516,11 @@ eventmsg_acked(
topic => Topic,
qos => QoS,
flags => Flags,
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})),
publish_received_at => Timestamp
}
},
#{headers => Headers}
).

eventmsg_delivery_dropped(
Expand Down Expand Up @@ -549,34 +555,37 @@ eventmsg_delivery_dropped(
topic => Topic,
qos => QoS,
flags => Flags,
%% the column 'headers' will be removed in the next major release
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}
},
#{headers => Headers}
).

sub_unsub_prop_key('session.subscribed') -> sub_props;
sub_unsub_prop_key('session.unsubscribed') -> unsub_props.

with_basic_columns(EventName, Data) when is_map(Data) ->
Data#{
event => EventName,
timestamp => erlang:system_time(millisecond),
node => node()
with_basic_columns(EventName, Columns, Envs) when is_map(Columns) ->
{
Columns#{
event => EventName,
timestamp => erlang:system_time(millisecond),
node => node()
},
Envs
}.

%%--------------------------------------------------------------------
%% rules applying
%%--------------------------------------------------------------------
apply_event(EventName, GenEventMsg, _Env) ->
apply_event(EventName, GenEventMsg, _Conf) ->
EventTopic = event_topic(EventName),
case emqx_rule_engine:get_rules_for_topic(EventTopic) of
[] ->
ok;
Rules ->
%% delay the generating of eventmsg after we have found some rules to apply
emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
{Columns, Envs} = GenEventMsg(),
emqx_rule_runtime:apply_rules(Rules, Columns, Envs)
end.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -777,7 +786,6 @@ columns_with_exam('message.publish') ->
{<<"topic">>, <<"t/a">>},
{<<"qos">>, 1},
{<<"flags">>, #{}},
{<<"headers">>, undefined},
{<<"publish_received_at">>, erlang:system_time(millisecond)},
columns_example_props(pub_props),
{<<"timestamp">>, erlang:system_time(millisecond)},
Expand Down

0 comments on commit 89a51ac

Please sign in to comment.