Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add new plugin for metrics #53

Merged
merged 11 commits into from
Apr 29, 2024
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-module(vmq_events_sidecar_schema).
-module(vmq_schema_translate).

-export([
translate_sampling/2
Expand Down
44 changes: 19 additions & 25 deletions apps/vmq_enhanced_auth/src/vmq_enhanced_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
init/0,
load_from_file/1,
load_from_list/1,
check/5,
check/4,
set_acl_version_metrics/1
]).

Expand Down Expand Up @@ -110,12 +110,7 @@ auth_on_subscribe(RegView, User, SubscriberId, [{Topic, Qos} | Rest], Modifiers)
D ->
next;
true ->
QosValue =
case Qos of
N when is_integer(N) -> N;
{N, _} when is_integer(N) -> N
end,
case check(read, Topic, User, SubscriberId, QosValue) of
case check(read, Topic, User, SubscriberId) of
{true, MatchedAcl} ->
auth_on_subscribe(
User,
Expand All @@ -134,13 +129,13 @@ auth_on_subscribe(RegView, User, SubscriberId, [{Topic, Qos} | Rest], Modifiers)
end
end.

auth_on_publish(User, SubscriberId, Qos, Topic, _, _) ->
auth_on_publish(User, SubscriberId, _Qos, Topic, _, _) ->
D = is_acl_auth_disabled(),
if
D ->
next;
true ->
case check(write, Topic, User, SubscriberId, Qos) of
case check(write, Topic, User, SubscriberId) of
{true, MatchedAcl} ->
{ok, [{matched_acl, MatchedAcl}]};
false ->
Expand All @@ -151,7 +146,7 @@ auth_on_publish(User, SubscriberId, Qos, Topic, _, _) ->
auth_on_subscribe_m5(_, _, []) ->
ok;
auth_on_subscribe_m5(User, SubscriberId, [{Topic, _Qos} | Rest]) ->
case check(read, Topic, User, SubscriberId, _Qos) of
case check(read, Topic, User, SubscriberId) of
{true, _} ->
auth_on_subscribe(User, SubscriberId, Rest);
false ->
Expand Down Expand Up @@ -321,56 +316,56 @@ parse_acl_line({F, eof}, _User) ->
F(F, close),
ok.

check(Type, [Word | _] = Topic, User, SubscriberId, Qos) when is_binary(Word) ->
case check_all_acl(Type, Topic, Qos) of
check(Type, [Word | _] = Topic, User, SubscriberId) when is_binary(Word) ->
case check_all_acl(Type, Topic) of
{true, MatchedAcl} ->
{true, MatchedAcl};
false when User == all -> false;
false ->
case check_user_acl(Type, User, Topic, Qos) of
case check_user_acl(Type, User, Topic) of
{true, MatchedAcl} ->
{true, MatchedAcl};
false ->
case check_pattern_acl(Type, Topic, User, SubscriberId, Qos) of
case check_pattern_acl(Type, Topic, User, SubscriberId) of
{true, MatchedAcl} ->
{true, MatchedAcl};
false ->
check_token_acl(Type, Topic, User, SubscriberId, Qos)
check_token_acl(Type, Topic, User, SubscriberId)
end
end
end.

check_all_acl(Type, TIn, Qos) ->
check_all_acl(Type, TIn) ->
Tbl = t(Type, all),
iterate_until_true(Tbl, fun(T) ->
match(TIn, T, Tbl, Type, T, Qos)
match(TIn, T, Tbl, T)
end).

check_user_acl(Type, User, TIn, Qos) ->
check_user_acl(Type, User, TIn) ->
Tbl = t(Type, User),
iterate_until_true(
ets:match(Tbl, {{User, '$1'}, '_', '_'}),
fun([T]) ->
Key = {User, T},
match(TIn, T, Tbl, Type, Key, Qos)
match(TIn, T, Tbl, Key)
end
).

check_pattern_acl(Type, TIn, User, SubscriberId, Qos) ->
check_pattern_acl(Type, TIn, User, SubscriberId) ->
Tbl = t(Type, pattern),
iterate_until_true(Tbl, fun(P) ->
T = topic(User, SubscriberId, P),
match(TIn, T, Tbl, Type, P, Qos)
match(TIn, T, Tbl, P)
end).

check_token_acl(Type, TIn, User, SubscriberId, Qos) ->
check_token_acl(Type, TIn, User, SubscriberId) ->
Tbl = t(Type, token),
iterate_until_true(Tbl, fun(P) ->
T = topic(User, SubscriberId, P),
match(TIn, T, Tbl, Type, P, Qos)
match(TIn, T, Tbl, P)
end).

match(TIn, T, Tbl, Type, Key, Qos) ->
match(TIn, T, Tbl, Key) ->
case match(TIn, T) of
true ->
case ets:lookup(Tbl, Key) of
Expand All @@ -380,7 +375,6 @@ match(TIn, T, Tbl, Type, Key, Qos) ->
},
{true, MatchedAcl};
[{_, _, Label}] ->
vmq_metrics:incr_matched_topic(Label, Type, Qos),
MatchedAcl = #matched_acl{
name = Label, pattern = iolist_to_binary(vmq_topic:unword(T))
},
Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/priv/vmq_events_sidecar.schema
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

{translation, "vmq_events_sidecar.sampler.on_publish",
fun(Conf) ->
vmq_events_sidecar_schema:translate_sampling("on_publish", Conf)
vmq_schema_translate:translate_sampling("on_publish", Conf)
end
}.

Expand All @@ -70,6 +70,6 @@

{translation, "vmq_events_sidecar.sampler.on_deliver",
fun(Conf) ->
vmq_events_sidecar_schema:translate_sampling("on_deliver", Conf)
vmq_schema_translate:translate_sampling("on_deliver", Conf)
end
}.
3 changes: 2 additions & 1 deletion apps/vmq_events_sidecar/src/vmq_events_sidecar_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ on_deliver(
{MP, ClientId, normalise(UserName), QoS, unword(Topic), Payload, IsRetain, MatchedAcl,
Persisted},
ACL
).
),
next.
dhruvjain99 marked this conversation as resolved.
Show resolved Hide resolved

-spec on_delivery_complete(
username(), subscriber_id(), qos(), topic(), payload(), flag(), matched_acl(), flag()
Expand Down
10 changes: 6 additions & 4 deletions apps/vmq_events_sidecar/test/vmq_events_sidecar_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ init_per_suite(Config) ->
application:load(vmq_plugin),
application:ensure_all_started(vmq_plugin),
ok = vmq_plugin_mgr:enable_plugin(vmq_events_sidecar),
ok = vmq_plugin_mgr:enable_plugin(vmq_metrics_plus),
{ok, _} = vmq_metrics:start_link(),

{ok, _} = application:ensure_all_started(shackle),
Expand All @@ -30,6 +31,7 @@ end_per_suite(Config) ->
stop_tcp_server(proplists:get_value(socket, Config, [])),

ok = vmq_plugin_mgr:disable_plugin(vmq_events_sidecar),
ok = vmq_plugin_mgr:disable_plugin(vmq_metrics_plus),
application:stop(vmq_plugin),

application:stop(shackle),
Expand Down Expand Up @@ -84,15 +86,15 @@ on_register_empty_properties_test(_) ->
on_publish_test(_) ->
enable_hook(on_publish),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_publish,
[ok,ok] = vmq_plugin:all(on_publish,
dhruvjain99 marked this conversation as resolved.
Show resolved Hide resolved
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}]),
ok = exp_response(on_publish_ok),
disable_hook(on_publish).

on_subscribe_test(_) ->
enable_hook(on_subscribe),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_subscribe,
[ok,ok] = vmq_plugin:all(on_subscribe,
[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, [{?TOPIC, 1, #matched_acl{name = ?LABEL, pattern = ?PATTERN}},
{?TOPIC, not_allowed, #matched_acl{}}]]),
ok = exp_response(on_subscribe_ok),
Expand All @@ -117,7 +119,7 @@ on_deliver_test(_) ->
on_delivery_complete_test(_) ->
enable_hook(on_delivery_complete),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}, true]),
[ok,ok] = vmq_plugin:all(on_delivery_complete,[Self, {?MOUNTPOINT, ?ALLOWED_CLIENT_ID}, 1, ?TOPIC, ?PAYLOAD, false, #matched_acl{name = ?LABEL, pattern = ?PATTERN}, true]),
ok = exp_response(on_delivery_complete_ok),
disable_hook(on_delivery_complete).

Expand Down Expand Up @@ -159,7 +161,7 @@ on_session_expired_test(_) ->
on_message_drop_test(_) ->
enable_hook(on_message_drop),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_message_drop, [{?MOUNTPOINT, Self}, fun() -> {?TOPIC, 1, ?PAYLOAD, #{}, #matched_acl{name = ?LABEL, pattern = ?PATTERN}} end, binary_to_atom(?MESSAGE_DROP_REASON)]),
[ok,ok] = vmq_plugin:all(on_message_drop, [{?MOUNTPOINT, Self}, fun() -> {?TOPIC, 1, ?PAYLOAD, #{}, #matched_acl{name = ?LABEL, pattern = ?PATTERN}} end, binary_to_atom(?MESSAGE_DROP_REASON)]),
ok = exp_response(on_message_drop_ok),
disable_hook(on_message_drop).

Expand Down
5 changes: 5 additions & 0 deletions apps/vmq_metrics_plus/rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{erl_opts, [
{parse_transform, lager_transform},
warnings_as_errors,
debug_info
]}.
25 changes: 25 additions & 0 deletions apps/vmq_metrics_plus/src/vmq_metrics_plus.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{application, vmq_metrics_plus, [
{description, "VerneMQ metrics plugin"},
{vsn, git},
{registered, []},
{mod, {vmq_metrics_plus_app, []}},
{applications, [
kernel,
stdlib,
lager
]},
{env, [
{vmq_metrics_mfa, {vmq_metrics_plus, metrics, []}},
{vmq_plugin_hooks, [
{vmq_metrics_plus_plugin, on_subscribe, 3, []},
{vmq_metrics_plus_plugin, on_publish, 7, []},
{vmq_metrics_plus_plugin, on_deliver, 8, []},
{vmq_metrics_plus_plugin, on_delivery_complete, 8, []},
{vmq_metrics_plus_plugin, on_message_drop, 3, []}
]}
]},
{modules, []},

{licenses, ["Apache 2.0"]},
{links, []}
]}.
122 changes: 122 additions & 0 deletions apps/vmq_metrics_plus/src/vmq_metrics_plus.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(vmq_metrics_plus).

-behaviour(gen_server).

%% API
-export([
start_link/0
]).

%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-export([
metrics/0,
incr_matched_topic/3
]).

-define(SERVER, ?MODULE).
-define(TOPIC_LABEL_TABLE, topic_labels).

-record(state, {}).

%%%===================================================================
%%% API
%%%===================================================================

start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

metrics() -> topic_metrics().

topic_metrics() ->
ets:foldl(
fun({Metric, TotalCount}, Acc) ->
{UniqueId, MetricName, Description, Labels} = topic_metric_name(Metric),
[{counter, Labels, UniqueId, MetricName, Description, TotalCount} | Acc]
end,
[],
?TOPIC_LABEL_TABLE
).

-type metric_type() :: subscribe | publish | deliver | delivery_complete | message_drop.

-spec incr_topic_counter(
Metric :: {topic_matches, metric_type(), Labels :: [{atom(), atom() | list() | binary()}]}
) -> ok.
incr_topic_counter(Metric) ->
try
ets:update_counter(?TOPIC_LABEL_TABLE, Metric, 1)
catch
_:_ ->
try
ets:insert_new(?TOPIC_LABEL_TABLE, {Metric, 0}),
incr_topic_counter(Metric)
catch
_:_ ->
lager:warning("couldn't initialize tables", [])
end
end.

-spec incr_matched_topic(binary() | undefined, metric_type(), integer()) -> ok.
incr_matched_topic(<<>>, _Type, _Qos) ->
ok;
incr_matched_topic(undefined, _Type, _Qos) ->
ok;
incr_matched_topic(Name, Type, Qos) ->
incr_topic_counter({topic_matches, Type, [{acl_matched, Name}, {qos, integer_to_list(Qos)}]}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init([]) ->
ets:new(?TOPIC_LABEL_TABLE, [named_table, public, {write_concurrency, true}]),
{ok, #state{}}.

handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================

topic_metric_name({Metric, SubMetric, Labels}) ->
LMetric = atom_to_list(Metric),
LSubMetric = atom_to_list(SubMetric),
MetricName = list_to_atom(LSubMetric ++ "_" ++ LMetric),
Description = list_to_binary(
"The number of " ++ LSubMetric ++ " packets on ACL matched topics."
),
{[MetricName | Labels], MetricName, Description, Labels}.
18 changes: 18 additions & 0 deletions apps/vmq_metrics_plus/src/vmq_metrics_plus_app.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
%%%-------------------------------------------------------------------
%% @doc vmq_metric_plus public API
%% @end
%%%-------------------------------------------------------------------

-module(vmq_metrics_plus_app).

-behaviour(application).

-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
vmq_metrics_plus_sup:start_link().

stop(_State) ->
ok.

%% internal functions