Skip to content

Commit

Permalink
Merge pull request #5885 from savonarola/fix-acl-schema
Browse files Browse the repository at this point in the history
fix(mnesia_acl): introduce optimized schema and migration process
  • Loading branch information
savonarola committed Oct 27, 2021
2 parents 67b543f + 6d48bbf commit 49c7eae
Show file tree
Hide file tree
Showing 20 changed files with 1,105 additions and 277 deletions.
14 changes: 14 additions & 0 deletions .ci/acl_migration_test/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

set -xe

cd "$EMQX_PATH"

rm -rf _build _upgrade_base

mkdir _upgrade_base
pushd _upgrade_base
wget "https://s3-us-west-2.amazonaws.com/packages.emqx/emqx-ce/v${EMQX_BASE}/emqx-ubuntu20.04-${EMQX_BASE}-amd64.zip"
popd

make emqx-zip
15 changes: 15 additions & 0 deletions .ci/acl_migration_test/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

set -xe

mkdir -p "$TEST_PATH"
cd "$TEST_PATH"

cp ../"$EMQX_PATH"/_upgrade_base/*.zip ./
unzip ./*.zip

cp ../"$EMQX_PATH"/_packages/emqx/*.zip ./emqx/releases/

git clone --depth 1 https://github.com/terry-xiaoyu/one_more_emqx.git

./one_more_emqx/one_more_emqx.sh emqx2
17 changes: 17 additions & 0 deletions .ci/acl_migration_test/suite.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

set -xe

export EMQX_PATH="$1"
export EMQX_BASE="$2"

export TEST_PATH="emqx_test"

./build.sh

VERSION=$("$EMQX_PATH"/pkg-vsn.sh)
export VERSION

./prepare.sh

./test.sh
121 changes: 121 additions & 0 deletions .ci/acl_migration_test/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/bin/bash

set -e

EMQX_ENDPOINT="http://localhost:8081/api/v4/acl"
EMQX2_ENDPOINT="http://localhost:8917/api/v4/acl"

function run() {
emqx="$1"
shift

echo "[$emqx]" "$@"

pushd "$TEST_PATH/$emqx"
"$@"
popd
}

function post_rule() {
endpoint="$1"
rule="$2"
echo -n "->($endpoint) "
curl -s -u admin:public -X POST "$endpoint" -d "$rule"
echo
}

function verify_clientid_rule() {
endpoint="$1"
id="$2"
echo -n "<-($endpoint) "
curl -s -u admin:public "$endpoint/clientid/$id" | grep "$id" || (echo "verify rule for client $id failed" && return 1)
}

# Run nodes

run emqx ./bin/emqx start
run emqx2 ./bin/emqx start

run emqx ./bin/emqx_ctl plugins load emqx_auth_mnesia
run emqx2 ./bin/emqx_ctl plugins load emqx_auth_mnesia

run emqx2 ./bin/emqx_ctl cluster join 'emqx@127.0.0.1'

# Add ACL rule to unupgraded EMQX nodes

post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT1_A","topic": "t", "action": "pub", "access": "allow"}'
post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT1_B","topic": "t", "action": "pub", "access": "allow"}'

# Upgrade emqx2 node

run emqx2 ./bin/emqx install "$VERSION"
sleep 60

# Verify upgrade blocked

run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep false || (echo "emqx2 shouldn't have migrated" && exit 1)

# Verify old rules on both nodes

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'

# Add ACL on OLD and NEW node, verify on all nodes

post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT2_A","topic": "t", "action": "pub", "access": "allow"}'
post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT2_B","topic": "t", "action": "pub", "access": "allow"}'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'

# Upgrade emqx node

run emqx ./bin/emqx install "$VERSION"

# Wait for upgrade

sleep 60

# Verify if upgrade occured

run emqx ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx should have migrated" && exit 1)
run emqx2 ./bin/emqx eval 'emqx_acl_mnesia_migrator:is_old_table_migrated().' | grep true || (echo "emqx2 should have migrated" && exit 1)

# Verify rules are kept

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_A'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_A'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT1_B'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT1_B'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_A'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_A'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT2_B'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT2_B'

# Add ACL on OLD and NEW node, verify on all nodes

post_rule "$EMQX_ENDPOINT" '{"clientid": "CLIENT3_A","topic": "t", "action": "pub", "access": "allow"}'
post_rule "$EMQX2_ENDPOINT" '{"clientid": "CLIENT3_B","topic": "t", "action": "pub", "access": "allow"}'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_A'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_A'

verify_clientid_rule "$EMQX_ENDPOINT" 'CLIENT3_B'
verify_clientid_rule "$EMQX2_ENDPOINT" 'CLIENT3_B'

# Stop nodes

run emqx ./bin/emqx stop
run emqx2 ./bin/emqx stop

echo "Success!"

22 changes: 22 additions & 0 deletions .github/workflows/run_acl_migration_tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: ACL fix & migration integration tests

on: workflow_dispatch

jobs:
test:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
strategy:
fail-fast: true
env:
BASE_VERSION: "4.3.0"
steps:
- uses: actions/checkout@v2
with:
path: emqx
- name: Prepare scripts
run: |
cp ./emqx/.ci/acl_migration_test/*.sh ./
- name: Run tests
run: |
./suite.sh emqx "$BASE_VERSION"
40 changes: 33 additions & 7 deletions apps/emqx_auth_mnesia/include/emqx_auth_mnesia.hrl
Original file line number Diff line number Diff line change
@@ -1,21 +1,47 @@
-define(APP, emqx_auth_mnesia).

-type(login():: {clientid, binary()}
-type(login() :: {clientid, binary()}
| {username, binary()}).

-type(acl_target() :: login() | all).

-type(acl_target_type() :: clientid | username | all).

-type(access():: allow | deny).
-type(action():: pub | sub).
-type(legacy_action():: action() | pubsub).
-type(created_at():: integer()).

-record(emqx_user, {
login :: login(),
password :: binary(),
created_at :: integer()
created_at :: created_at()
}).

-record(emqx_acl, {
filter:: {login() | all, emqx_topic:topic()},
action :: pub | sub | pubsub,
access :: allow | deny,
created_at :: integer()
-define(ACL_TABLE, emqx_acl).

-define(MIGRATION_MARK_KEY, emqx_acl2_migration_started).

-record(?ACL_TABLE, {
filter :: {acl_target(), emqx_topic:topic()} | ?MIGRATION_MARK_KEY,
action :: legacy_action(),
access :: access(),
created_at :: created_at()
}).

-define(MIGRATION_MARK_RECORD, #?ACL_TABLE{filter = ?MIGRATION_MARK_KEY, action = pub, access = deny, created_at = 0}).

-type(rule() :: {access(), action(), emqx_topic:topic(), created_at()}).

-define(ACL_TABLE2, emqx_acl2).

-record(?ACL_TABLE2, {
who :: acl_target(),
rules :: [ rule() ]
}).

-type(acl_record() :: {acl_target(), emqx_topic:topic(), action(), access(), created_at()}).

-record(auth_metrics, {
success = 'client.auth.success',
failure = 'client.auth.failure',
Expand Down
25 changes: 8 additions & 17 deletions apps/emqx_auth_mnesia/src/emqx_acl_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,16 @@

-include("emqx_auth_mnesia.hrl").

-include_lib("stdlib/include/ms_transform.hrl").

-define(TABLE, emqx_acl).

%% ACL Callbacks
-export([ init/0
, register_metrics/0
, check_acl/5
, description/0
]).
]).

init() ->
ok = ekka_mnesia:create_table(emqx_acl, [
{type, bag},
{disc_copies, [node()]},
{attributes, record_info(fields, emqx_acl)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(emqx_acl, disc_copies).
ok = emqx_acl_mnesia_db:create_table(),
ok = emqx_acl_mnesia_db:create_table2().

-spec(register_metrics() -> ok).
register_metrics() ->
Expand All @@ -46,12 +38,12 @@ check_acl(ClientInfo = #{ clientid := Clientid }, PubSub, Topic, _NoMatchAction,

Acls = case Username of
undefined ->
emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
emqx_acl_mnesia_cli:lookup_acl(all);
emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
emqx_acl_mnesia_db:lookup_acl(all);
_ ->
emqx_acl_mnesia_cli:lookup_acl({clientid, Clientid}) ++
emqx_acl_mnesia_cli:lookup_acl({username, Username}) ++
emqx_acl_mnesia_cli:lookup_acl(all)
emqx_acl_mnesia_db:lookup_acl({clientid, Clientid}) ++
emqx_acl_mnesia_db:lookup_acl({username, Username}) ++
emqx_acl_mnesia_db:lookup_acl(all)
end,

case match(ClientInfo, PubSub, Topic, Acls) of
Expand Down Expand Up @@ -83,7 +75,6 @@ match(ClientInfo, PubSub, Topic, [ {_, ACLTopic, Action, Access, _} | Acls]) ->
match_topic(ClientInfo, Topic, ACLTopic) when is_binary(Topic) ->
emqx_topic:match(Topic, feed_var(ClientInfo, ACLTopic)).

match_actions(_, pubsub) -> true;
match_actions(subscribe, sub) -> true;
match_actions(publish, pub) -> true;
match_actions(_, _) -> false.
Expand Down
34 changes: 16 additions & 18 deletions apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

-module(emqx_acl_mnesia_api).

-include("emqx_auth_mnesia.hrl").

-include_lib("stdlib/include/ms_transform.hrl").

-import(proplists, [ get_value/2
Expand Down Expand Up @@ -99,26 +97,22 @@
]).

list_clientid(_Bindings, Params) ->
MatchSpec = ets:fun2ms(
fun({emqx_acl, {{clientid, Clientid}, Topic}, Action, Access, CreatedAt}) -> {{clientid,Clientid}, Topic, Action,Access, CreatedAt} end),
return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
Table = emqx_acl_mnesia_db:login_acl_table(clientid),
return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).

list_username(_Bindings, Params) ->
MatchSpec = ets:fun2ms(
fun({emqx_acl, {{username, Username}, Topic}, Action, Access, CreatedAt}) -> {{username, Username}, Topic, Action,Access, CreatedAt} end),
return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
Table = emqx_acl_mnesia_db:login_acl_table(username),
return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).

list_all(_Bindings, Params) ->
MatchSpec = ets:fun2ms(
fun({emqx_acl, {all, Topic}, Action, Access, CreatedAt}) -> {all, Topic, Action,Access, CreatedAt}end
),
return({ok, emqx_auth_mnesia_api:paginate(emqx_acl, MatchSpec, Params, fun emqx_acl_mnesia_cli:comparing/2, fun format/1)}).
Table = emqx_acl_mnesia_db:login_acl_table(all),
return({ok, emqx_auth_mnesia_api:paginate_qh(Table, count(Table), Params, fun emqx_acl_mnesia_db:comparing/2, fun format/1)}).


lookup(#{clientid := Clientid}, _Params) ->
return({ok, format(emqx_acl_mnesia_cli:lookup_acl({clientid, urldecode(Clientid)}))});
return({ok, format(emqx_acl_mnesia_db:lookup_acl({clientid, urldecode(Clientid)}))});
lookup(#{username := Username}, _Params) ->
return({ok, format(emqx_acl_mnesia_cli:lookup_acl({username, urldecode(Username)}))}).
return({ok, format(emqx_acl_mnesia_db:lookup_acl({username, urldecode(Username)}))}).

add(_Bindings, Params) ->
[ P | _] = Params,
Expand Down Expand Up @@ -152,7 +146,7 @@ do_add(Params) ->
Access = get_value(<<"access">>, Params),
Re = case validate([login, topic, action, access], [Login, Topic, Action, Access]) of
ok ->
emqx_acl_mnesia_cli:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
emqx_acl_mnesia_db:add_acl(Login, Topic, erlang:binary_to_atom(Action, utf8), erlang:binary_to_atom(Access, utf8));
Err -> Err
end,
maps:merge(#{topic => Topic,
Expand All @@ -165,15 +159,19 @@ do_add(Params) ->
end).

delete(#{clientid := Clientid, topic := Topic}, _) ->
return(emqx_acl_mnesia_cli:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
return(emqx_acl_mnesia_db:remove_acl({clientid, urldecode(Clientid)}, urldecode(Topic)));
delete(#{username := Username, topic := Topic}, _) ->
return(emqx_acl_mnesia_cli:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
return(emqx_acl_mnesia_db:remove_acl({username, urldecode(Username)}, urldecode(Topic)));
delete(#{topic := Topic}, _) ->
return(emqx_acl_mnesia_cli:remove_acl(all, urldecode(Topic))).
return(emqx_acl_mnesia_db:remove_acl(all, urldecode(Topic))).

%%------------------------------------------------------------------------------
%% Interval Funcs
%%------------------------------------------------------------------------------

count(QH) ->
qlc:fold(fun(_, Count) -> Count + 1 end, 0, QH).

format({{clientid, Clientid}, Topic, Action, Access, _CreatedAt}) ->
#{clientid => Clientid, topic => Topic, action => Action, access => Access};
format({{username, Username}, Topic, Action, Access, _CreatedAt}) ->
Expand Down

0 comments on commit 49c7eae

Please sign in to comment.