Skip to content

Commit

Permalink
Merge pull request #11132 from savonarola/0531-authz-qos-retain
Browse files Browse the repository at this point in the history
feat(authz): use extensible map format for actions in authz rules
  • Loading branch information
savonarola committed Jul 10, 2023
2 parents f9e54ed + 19f9fc5 commit 12e237c
Show file tree
Hide file tree
Showing 57 changed files with 3,203 additions and 1,693 deletions.
14 changes: 14 additions & 0 deletions apps/emqx/include/emqx_access_control.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,17 @@
-define(EMQX_AUTHORIZATION_CONFIG_ROOT_NAME, "authorization").
-define(EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_ATOM, authorization).
-define(EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY, <<"authorization">>).

-define(DEFAULT_ACTION_QOS, 0).
-define(DEFAULT_ACTION_RETAIN, false).

-define(AUTHZ_SUBSCRIBE(QOS), #{action_type => subscribe, qos => QOS}).
-define(AUTHZ_SUBSCRIBE, ?AUTHZ_SUBSCRIBE(?DEFAULT_ACTION_QOS)).

-define(AUTHZ_PUBLISH(QOS, RETAIN), #{action_type => publish, qos => QOS, retain => RETAIN}).
-define(AUTHZ_PUBLISH(QOS), ?AUTHZ_PUBLISH(QOS, ?DEFAULT_ACTION_RETAIN)).
-define(AUTHZ_PUBLISH, ?AUTHZ_PUBLISH(?DEFAULT_ACTION_QOS)).

-define(authz_action(PUBSUB, QOS), #{action_type := PUBSUB, qos := QOS}).
-define(authz_action(PUBSUB), ?authz_action(PUBSUB, _)).
-define(authz_action, ?authz_action(_)).
4 changes: 3 additions & 1 deletion apps/emqx/include/emqx_placeholder.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

-define(PH(Type), <<"${", Type/binary, "}">>).

%% action: publish/subscribe/all
%% action: publish/subscribe
-define(PH_ACTION, <<"${action}">>).

%% cert
Expand Down Expand Up @@ -79,6 +79,7 @@
-define(PH_REASON, <<"${reason}">>).

-define(PH_ENDPOINT_NAME, <<"${endpoint_name}">>).
-define(PH_RETAIN, <<"${retain}">>).

%% sync change these place holder with binary def.
-define(PH_S_ACTION, "${action}").
Expand Down Expand Up @@ -113,5 +114,6 @@
-define(PH_S_NODE, "${node}").
-define(PH_S_REASON, "${reason}").
-define(PH_S_ENDPOINT_NAME, "${endpoint_name}").
-define(PH_S_RETAIN, "${retain}").

-endif.
28 changes: 14 additions & 14 deletions apps/emqx/src/emqx_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ authenticate(Credential) ->
%% @doc Check Authorization
-spec authorize(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) ->
allow | deny.
authorize(ClientInfo, PubSub, <<"$delayed/", Data/binary>> = RawTopic) ->
authorize(ClientInfo, Action, <<"$delayed/", Data/binary>> = RawTopic) ->
case binary:split(Data, <<"/">>) of
[_, Topic] ->
authorize(ClientInfo, PubSub, Topic);
authorize(ClientInfo, Action, Topic);
_ ->
?SLOG(warning, #{
msg => "invalid_delayed_topic_format",
Expand All @@ -90,39 +90,39 @@ authorize(ClientInfo, PubSub, <<"$delayed/", Data/binary>> = RawTopic) ->
inc_authz_metrics(deny),
deny
end;
authorize(ClientInfo, PubSub, Topic) ->
authorize(ClientInfo, Action, Topic) ->
Result =
case emqx_authz_cache:is_enabled() of
true -> check_authorization_cache(ClientInfo, PubSub, Topic);
false -> do_authorize(ClientInfo, PubSub, Topic)
true -> check_authorization_cache(ClientInfo, Action, Topic);
false -> do_authorize(ClientInfo, Action, Topic)
end,
inc_authz_metrics(Result),
Result.

check_authorization_cache(ClientInfo, PubSub, Topic) ->
case emqx_authz_cache:get_authz_cache(PubSub, Topic) of
check_authorization_cache(ClientInfo, Action, Topic) ->
case emqx_authz_cache:get_authz_cache(Action, Topic) of
not_found ->
AuthzResult = do_authorize(ClientInfo, PubSub, Topic),
emqx_authz_cache:put_authz_cache(PubSub, Topic, AuthzResult),
AuthzResult = do_authorize(ClientInfo, Action, Topic),
emqx_authz_cache:put_authz_cache(Action, Topic, AuthzResult),
AuthzResult;
AuthzResult ->
emqx:run_hook(
'client.check_authz_complete',
[ClientInfo, PubSub, Topic, AuthzResult, cache]
[ClientInfo, Action, Topic, AuthzResult, cache]
),
inc_authz_metrics(cache_hit),
AuthzResult
end.

do_authorize(ClientInfo, PubSub, Topic) ->
do_authorize(ClientInfo, Action, Topic) ->
NoMatch = emqx:get_config([authorization, no_match], allow),
Default = #{result => NoMatch, from => default},
case run_hooks('client.authorize', [ClientInfo, PubSub, Topic], Default) of
case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of
AuthzResult = #{result := Result} when Result == allow; Result == deny ->
From = maps:get(from, AuthzResult, unknown),
emqx:run_hook(
'client.check_authz_complete',
[ClientInfo, PubSub, Topic, Result, From]
[ClientInfo, Action, Topic, Result, From]
),
Result;
Other ->
Expand All @@ -133,7 +133,7 @@ do_authorize(ClientInfo, PubSub, Topic) ->
}),
emqx:run_hook(
'client.check_authz_complete',
[ClientInfo, PubSub, Topic, deny, unknown_return_format]
[ClientInfo, Action, Topic, deny, unknown_return_format]
),
deny
end.
Expand Down
5 changes: 2 additions & 3 deletions apps/emqx/src/emqx_authz_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(emqx_authz_cache).

-include("emqx.hrl").
-include("emqx_access_control.hrl").

-export([
list_authz_cache/0,
Expand Down Expand Up @@ -159,8 +159,7 @@ dump_authz_cache() ->
map_authz_cache(Fun) ->
[
Fun(R)
|| R = {{SubPub, _T}, _Authz} <- erlang:get(),
SubPub =:= publish orelse SubPub =:= subscribe
|| R = {{?authz_action, _T}, _Authz} <- erlang:get()
].
foreach_authz_cache(Fun) ->
_ = map_authz_cache(Fun),
Expand Down
47 changes: 34 additions & 13 deletions apps/emqx/src/emqx_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-include("emqx.hrl").
-include("emqx_channel.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_access_control.hrl").
-include("logger.hrl").
-include("types.hrl").

Expand Down Expand Up @@ -491,7 +492,7 @@ handle_in(
ok ->
TopicFilters0 = parse_topic_filters(TopicFilters),
TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
TupleTopicFilters0 = check_sub_authzs(SubPkt, TopicFilters1, Channel),
HasAuthzDeny = lists:any(
fun({_TopicFilter, ReasonCode}) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
Expand Down Expand Up @@ -1838,14 +1839,34 @@ check_pub_alias(
check_pub_alias(_Packet, _Channel) ->
ok.

%%--------------------------------------------------------------------
%% Authorization action

authz_action(#mqtt_packet{
header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{}
}) ->
?AUTHZ_PUBLISH(QoS, Retain);
authz_action(#mqtt_packet{
header = #mqtt_packet_header{qos = QoS}, variable = #mqtt_packet_subscribe{}
}) ->
?AUTHZ_SUBSCRIBE(QoS);
%% Will message
authz_action(#message{qos = QoS, flags = #{retain := Retain}}) ->
?AUTHZ_PUBLISH(QoS, Retain);
authz_action(#message{qos = QoS}) ->
?AUTHZ_PUBLISH(QoS).

%%--------------------------------------------------------------------
%% Check Pub Authorization

check_pub_authz(
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
#mqtt_packet{
variable = #mqtt_packet_publish{topic_name = Topic}
} = Packet,
#channel{clientinfo = ClientInfo}
) ->
case emqx_access_control:authorize(ClientInfo, publish, Topic) of
Action = authz_action(Packet),
case emqx_access_control:authorize(ClientInfo, Action, Topic) of
allow -> ok;
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
Expand All @@ -1868,24 +1889,23 @@ check_pub_caps(
%%--------------------------------------------------------------------
%% Check Sub Authorization

%% TODO: not only check topic filter. Qos chould be checked too.
%% Not implemented yet:
%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7]
check_sub_authzs(TopicFilters, Channel) ->
check_sub_authzs(TopicFilters, Channel, []).
check_sub_authzs(Packet, TopicFilters, Channel) ->
Action = authz_action(Packet),
check_sub_authzs(Action, TopicFilters, Channel, []).

check_sub_authzs(
Action,
[TopicFilter = {Topic, _} | More],
Channel = #channel{clientinfo = ClientInfo},
Acc
) ->
case emqx_access_control:authorize(ClientInfo, subscribe, Topic) of
case emqx_access_control:authorize(ClientInfo, Action, Topic) of
allow ->
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
deny ->
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
end;
check_sub_authzs([], _Channel, Acc) ->
check_sub_authzs(_Action, [], _Channel, Acc) ->
lists:reverse(Acc).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -2149,7 +2169,8 @@ publish_will_msg(
ClientInfo = #{mountpoint := MountPoint},
Msg = #message{topic = Topic}
) ->
PublishingDisallowed = emqx_access_control:authorize(ClientInfo, publish, Topic) =/= allow,
Action = authz_action(Msg),
PublishingDisallowed = emqx_access_control:authorize(ClientInfo, Action, Topic) =/= allow,
ClientBanned = emqx_banned:check(ClientInfo),
case PublishingDisallowed orelse ClientBanned of
true ->
Expand Down
8 changes: 7 additions & 1 deletion apps/emqx/src/emqx_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-export_type([
zone/0,
pubsub/0,
pubsub_action/0,
subid/0
]).

Expand Down Expand Up @@ -127,7 +128,12 @@
| exactly_once.

-type zone() :: atom().
-type pubsub() :: publish | subscribe.
-type pubsub_action() :: publish | subscribe.

-type pubsub() ::
#{action_type := subscribe, qos := qos()}
| #{action_type := publish, qos := qos(), retain := boolean()}.

-type subid() :: binary() | atom().

-type group() :: binary() | undefined.
Expand Down
17 changes: 8 additions & 9 deletions apps/emqx/test/emqx_access_control_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/emqx_access_control.hrl").
-include_lib("eunit/include/eunit.hrl").

all() -> emqx_common_test_helpers:all(?MODULE).
Expand All @@ -44,8 +44,7 @@ t_authenticate(_) ->
?assertMatch({ok, _}, emqx_access_control:authenticate(clientinfo())).

t_authorize(_) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), Publish, <<"t">>)).
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), ?AUTHZ_PUBLISH, <<"t">>)).

t_delayed_authorize(_) ->
RawTopic = <<"$delayed/1/foo/2">>,
Expand All @@ -54,11 +53,11 @@ t_delayed_authorize(_) ->

ok = emqx_hooks:put('client.authorize', {?MODULE, authz_stub, [Topic]}, ?HP_AUTHZ),

Publish1 = ?PUBLISH_PACKET(?QOS_0, RawTopic, 1, <<"payload">>),
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), Publish1, RawTopic)),
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), ?AUTHZ_PUBLISH, RawTopic)),

Publish2 = ?PUBLISH_PACKET(?QOS_0, InvalidTopic, 1, <<"payload">>),
?assertEqual(deny, emqx_access_control:authorize(clientinfo(), Publish2, InvalidTopic)),
?assertEqual(
deny, emqx_access_control:authorize(clientinfo(), ?AUTHZ_PUBLISH, InvalidTopic)
),
ok.

t_quick_deny_anonymous(_) ->
Expand Down Expand Up @@ -96,8 +95,8 @@ t_quick_deny_anonymous(_) ->
%% Helper functions
%%--------------------------------------------------------------------

authz_stub(_Client, _PubSub, ValidTopic, _DefaultResult, ValidTopic) -> {stop, #{result => allow}};
authz_stub(_Client, _PubSub, _Topic, _DefaultResult, _ValidTopic) -> {stop, #{result => deny}}.
authz_stub(_Client, _Action, ValidTopic, _DefaultResult, ValidTopic) -> {stop, #{result => allow}};
authz_stub(_Client, _Action, _Topic, _DefaultResult, _ValidTopic) -> {stop, #{result => deny}}.

quick_deny_anonymous_authn(#{username := <<"badname">>}, _AuthResult) ->
{stop, {error, not_authorized}};
Expand Down
2 changes: 0 additions & 2 deletions apps/emqx/test/emqx_authz_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ t_clean_authz_cache(_) ->
ct:sleep(100),
ClientPid =
case emqx_cm:lookup_channels(<<"emqx_c">>) of
[Pid] when is_pid(Pid) ->
Pid;
Pids when is_list(Pids) ->
lists:last(Pids);
_ ->
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx/test/emqx_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,8 @@ t_check_pub_alias(_) ->
t_check_sub_authzs(_) ->
emqx_config:put_zone_conf(default, [authorization, enable], true),
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
Subscribe = ?SUBSCRIBE_PACKET(1, [TopicFilter]),
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs(Subscribe, [TopicFilter], channel()).

t_enrich_connack_caps(_) ->
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
Expand Down
21 changes: 20 additions & 1 deletion apps/emqx/test/emqx_proper_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

-include_lib("proper/include/proper.hrl").
-include("emqx.hrl").
-include("emqx_access_control.hrl").

%% High level Types
-export([
Expand All @@ -34,7 +35,8 @@
subopts/0,
nodename/0,
normal_topic/0,
normal_topic_filter/0
normal_topic_filter/0,
pubsub/0
]).

%% Basic Types
Expand Down Expand Up @@ -482,6 +484,23 @@ normal_topic_filter() ->
end
).

subscribe_action() ->
?LET(
Qos,
qos(),
?AUTHZ_SUBSCRIBE(Qos)
).

publish_action() ->
?LET(
{Qos, Retain},
{qos(), boolean()},
?AUTHZ_PUBLISH(Qos, Retain)
).

pubsub() ->
oneof([publish_action(), subscribe_action()]).

%%--------------------------------------------------------------------
%% Basic Types
%%--------------------------------------------------------------------
Expand Down

0 comments on commit 12e237c

Please sign in to comment.