Skip to content

Commit

Permalink
Merge pull request #11587 from thalesmg/sync-m-r52-20230911
Browse files Browse the repository at this point in the history
sync `master` to `release-52` (prepare for `v5.2.0-build.1`)
  • Loading branch information
thalesmg committed Sep 11, 2023
2 parents 2691d27 + bbdcf09 commit f68c75a
Show file tree
Hide file tree
Showing 22 changed files with 638 additions and 79 deletions.
2 changes: 1 addition & 1 deletion apps/emqx/include/emqx_release.hrl
Expand Up @@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'

%% Opensource edition
-define(EMQX_RELEASE_CE, "5.2.0").
-define(EMQX_RELEASE_CE, "5.2.0-build.1").

%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.2.0").
Expand Down
6 changes: 1 addition & 5 deletions apps/emqx/include/emqx_session.hrl
Expand Up @@ -49,11 +49,7 @@
%% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer(),
%% Topic filter to iterator ID mapping.
%% Note: we shouldn't serialize this when persisting sessions, as this information
%% also exists in the `?ITERATOR_REF_TAB' table.
iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()}
created_at :: pos_integer()
}).

-endif.
21 changes: 4 additions & 17 deletions apps/emqx/integration_test/emqx_ds_SUITE.erl
Expand Up @@ -108,12 +108,6 @@ get_all_iterator_ids(Node) ->
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
end).

get_session_iterators(Node, ClientId) ->
erpc:call(Node, fun() ->
[ConnPid] = emqx_cm:lookup_channels(ClientId),
emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid))
end).

wait_nodeup(Node) ->
?retry(
_Sleep0 = 500,
Expand Down Expand Up @@ -209,18 +203,14 @@ t_session_subscription_idempotency(Config) ->
{ok, _} = emqtt:connect(Client1),
ct:pal("subscribing 2"),
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
SessionIterators = get_session_iterators(Node1, ClientId),

ok = emqtt:stop(Client1),

#{session_iterators => SessionIterators}
ok
end,
fun(Res, Trace) ->
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% Exactly one iterator should have been opened.
?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}),
?assertMatch(#{SubTopicFilter := _}, SessionIterators),
SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
Expand Down Expand Up @@ -321,17 +311,14 @@ t_session_unsubscription_idempotency(Config) ->
},
15_000
),
SessionIterators = get_session_iterators(Node1, ClientId),

ok = emqtt:stop(Client1),

#{session_iterators => SessionIterators}
ok
end,
fun(Res, Trace) ->
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% No iterators remaining
?assertEqual(#{}, SessionIterators),
?assertEqual([], get_all_iterator_refs(Node1)),
?assertEqual({ok, []}, get_all_iterator_ids(Node1)),
ok
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx/src/emqx.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.1.9"},
{vsn, "5.1.10"},
{modules, []},
{registered, []},
{applications, [
Expand Down
25 changes: 15 additions & 10 deletions apps/emqx/src/emqx_persistent_session_ds.erl
Expand Up @@ -24,7 +24,7 @@
persist_message/1,
open_session/1,
add_subscription/2,
del_subscription/3
del_subscription/2
]).

-export([
Expand Down Expand Up @@ -139,21 +139,26 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
{ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
ok.

-spec del_subscription(emqx_ds:iterator_id() | undefined, emqx_types:topic(), emqx_ds:session_id()) ->
-spec del_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
ok | {skipped, disabled}.
del_subscription(IteratorID, TopicFilterBin, DSSessionID) ->
del_subscription(TopicFilterBin, DSSessionID) ->
?WHEN_ENABLED(
begin
TopicFilter = emqx_topic:words(TopicFilterBin),
Ctx = #{iterator_id => IteratorID},
?tp_span(
persistent_session_ds_close_iterators,
Ctx,
ok = ensure_iterator_closed_on_all_shards(IteratorID)
),
case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
{error, not_found} ->
%% already gone
ok;
{ok, IteratorID} ->
?tp_span(
persistent_session_ds_close_iterators,
#{iterator_id => IteratorID},
ok = ensure_iterator_closed_on_all_shards(IteratorID)
)
end,
?tp_span(
persistent_session_ds_iterator_delete,
Ctx,
#{},
emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
)
end
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx/src/emqx_router.erl
Expand Up @@ -117,7 +117,8 @@ mnesia(boot) ->
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, auto}
{write_concurrency, true},
{decentralized_counters, true}
]}
]}
]).
Expand Down
24 changes: 5 additions & 19 deletions apps/emqx/src/emqx_session.erl
Expand Up @@ -269,9 +269,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt;
info(iterators, #session{iterators = Iterators}) ->
Iterators.
CreatedAt.

%% @doc Get stats of the session.
-spec stats(session()) -> emqx_types:stats().
Expand Down Expand Up @@ -320,13 +318,8 @@ is_subscriptions_full(#session{
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
session().
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
{ok, IteratorId, _IsNew} ->
Iterators = Session#session.iterators,
Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}};
_ ->
Session
end.
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
Session.

%%--------------------------------------------------------------------
%% Client -> Broker: UNSUBSCRIBE
Expand Down Expand Up @@ -356,15 +349,8 @@ unsubscribe(
-spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) ->
session().
remove_persistent_subscription(Session, TopicFilterBin, ClientId) ->
Iterators = Session#session.iterators,
case maps:get(TopicFilterBin, Iterators, undefined) of
undefined ->
ok;
IteratorId ->
_ = emqx_persistent_session_ds:del_subscription(IteratorId, TopicFilterBin, ClientId),
ok
end,
Session#session{iterators = maps:remove(TopicFilterBin, Iterators)}.
_ = emqx_persistent_session_ds:del_subscription(TopicFilterBin, ClientId),
Session.

%%--------------------------------------------------------------------
%% Client -> Broker: PUBLISH
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl
Expand Up @@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
#{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}],
<<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}
),
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_dashboard/src/emqx_dashboard.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
{vsn, "5.0.26"},
{vsn, "5.0.27"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},
Expand Down
103 changes: 100 additions & 3 deletions apps/emqx_rule_engine/src/emqx_rule_actions.erl
Expand Up @@ -20,6 +20,7 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqtt/include/emqtt.hrl").

%% APIs
-export([parse_action/1]).
Expand Down Expand Up @@ -60,16 +61,23 @@ pre_process_action_args(
qos := QoS,
retain := Retain,
payload := Payload,
user_properties := UserProperties
mqtt_properties := MQTTPropertiesTemplate0,
user_properties := UserPropertiesTemplate
} = Args
) ->
MQTTPropertiesTemplate =
maps:map(
fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end,
MQTTPropertiesTemplate0
),
Args#{
preprocessed_tmpl => #{
topic => emqx_placeholder:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_placeholder:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties)
mqtt_properties => MQTTPropertiesTemplate,
user_properties => preproc_user_properties(UserPropertiesTemplate)
}
};
pre_process_action_args(_, Args) ->
Expand Down Expand Up @@ -106,6 +114,7 @@ republish(
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks,
mqtt_properties := MQTTPropertiesTemplate,
user_properties := UserPropertiesTks
}
}
Expand All @@ -118,7 +127,9 @@ republish(
%% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain},
PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env),
MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
PubProps = maps:merge(PubProps0, MQTTProps),
?TRACE(
"RULE",
"republish_message",
Expand Down Expand Up @@ -232,3 +243,89 @@ format_pub_props(UserPropertiesTks, Selected, Env) ->
replace_simple_var(UserPropertiesTks, Selected, #{})
end,
#{'User-Property' => UserProperties}.

format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
#{metadata := #{rule_id := RuleId}} = Env,
MQTTProperties0 =
maps:fold(
fun(K, Template, Acc) ->
try
V = emqx_placeholder:proc_tmpl(Template, Selected),
Acc#{K => V}
catch
Kind:Error ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
exception => Kind,
reason => Error,
property => K,
selected => Selected
}
),
Acc
end
end,
#{},
MQTTPropertiesTemplate
),
coerce_properties_values(MQTTProperties0, Env).

ensure_int(B) when is_binary(B) ->
try
binary_to_integer(B)
catch
error:badarg ->
throw(bad_integer)
end;
ensure_int(I) when is_integer(I) ->
I.

coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) ->
maps:fold(
fun(K, V0, Acc) ->
try
V = encode_mqtt_property(K, V0),
Acc#{K => V}
catch
throw:bad_integer ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
reason => bad_integer,
property => K,
value => V0
}
),
Acc;
Kind:Reason:Stacktrace ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
exception => Kind,
reason => Reason,
property => K,
value => V0,
stacktrace => Stacktrace
}
),
Acc
end
end,
#{},
MQTTProperties
).

%% Note: currently we do not support `Topic-Alias', which would need to be encoded as an
%% int.
encode_mqtt_property('Payload-Format-Indicator', V) -> ensure_int(V);
encode_mqtt_property('Message-Expiry-Interval', V) -> ensure_int(V);
encode_mqtt_property('Subscription-Identifier', V) -> ensure_int(V);
%% note: `emqx_placeholder:proc_tmpl/2' currently always return a binary.
encode_mqtt_property(_Prop, V) when is_binary(V) -> V.
2 changes: 1 addition & 1 deletion apps/emqx_rule_engine/src/emqx_rule_engine.app.src
Expand Up @@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.24"},
{vsn, "5.0.25"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]},
Expand Down

0 comments on commit f68c75a

Please sign in to comment.