Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
base stat framework + prometheus aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
deadtrickster committed Feb 21, 2018
1 parent d5cf736 commit 4009246
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .dir-locals.el
@@ -0,0 +1,6 @@
((nil . ((indent-tabs-mode . nil)
(fill-column . 80)))
(erlang-mode . ((flycheck-erlang-include-path . ("../include"))
(flycheck-erlang-library-path . ("../_build/default/lib/opencensus"
"../_build/default/lib/opencensus/ebin"))
(erlang-indent-level . 4))))
4 changes: 3 additions & 1 deletion elvis.config
Expand Up @@ -10,8 +10,10 @@
{elvis_style, dont_repeat_yourself, #{min_complexity => 20}},
{elvis_style, line_length, #{limit => 120}},
{elvis_style, state_record_and_type, disable},
{elvis_style, function_naming_convention, #{regex => "^_{0,2}([a-z][a-z0-9]*_?)*_{0,2}$"}},
%% sequential reporter calls other reporters dynamically
{elvis_style, invalid_dynamic_call, #{ignore => [oc_sequential_reporter]}}],
%% stat view proxies measures
{elvis_style, invalid_dynamic_call, #{ignore => [oc_sequential_reporter, oc_stat_view]}}],
ruleset => erl_files
},
#{dirs => ["test"],
Expand Down
5 changes: 5 additions & 0 deletions include/opencensus.hrl
Expand Up @@ -105,3 +105,8 @@
message :: unicode:unicode_binary()
}).

-type measure_name() :: atom() | binary() | string().

-type view_name() :: atom() | binary() | string().
-type view_description() :: binary() | string().
-type aggregation() :: atom().
3 changes: 2 additions & 1 deletion rebar.config
Expand Up @@ -4,7 +4,8 @@
warn_untyped_record]}.

{deps, [{wts, "~> 0.3"},
{ctx, "~> 0.4.1"}]}.
{ctx, "~> 0.4.1"},
{prometheus, "~> 4.0.0"}]}.

{plugins, [{rebar_erl_vsn, "0.1.3"}]}.
{provider_hooks, [{pre, [{compile, {default, erl_vsn}}]}]}.
Expand Down
2 changes: 2 additions & 0 deletions rebar.lock
@@ -1,10 +1,12 @@
{"1.1.0",
[{<<"ctx">>,{pkg,<<"ctx">>,<<"0.4.1">>},0},
{<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.0.0">>},0},
{<<"rfc3339">>,{pkg,<<"rfc3339">>,<<"0.9.0">>},1},
{<<"wts">>,{pkg,<<"wts">>,<<"0.3.0">>},0}]}.
[
{pkg_hash,[
{<<"ctx">>, <<"E4297DD25CCDE992BC7DE298F514BEACD0A44FAA9126A1F2567306D94C519A13">>},
{<<"prometheus">>, <<"261253C4BD0BD71175F185553B684F456188A3572ACFA8221A8C10FDA52AFF81">>},
{<<"rfc3339">>, <<"2075653DC9407541C84B1E15F8BDA2ABE95FB17C9694025E079583F2D19C1060">>},
{<<"wts">>, <<"5CDF22C775CB1EBAE24C326A5DB6074D753C42F4BD12A9AA47CC62D3E2C71AD1">>}]}
].
17 changes: 17 additions & 0 deletions src/oc_stat.erl
@@ -0,0 +1,17 @@
-module(oc_stat).

-export([record/3,
export/0]).

-include("opencensus.hrl").

-spec record(measure_name(), ctx:t(), number()) -> number().
record(MeasureName, Ctx, Value) ->
Tags = oc_tags:from_ctx(Ctx),
[oc_stat_view:add_sample(View, Tags, Value)
|| View <- oc_stat_view:measure_views(MeasureName),
oc_stat_view:subscribed(View)],
Value.

export() ->
[oc_stat_view:export(View) || View <- oc_stat_view:subscribed()].
18 changes: 18 additions & 0 deletions src/oc_stat_count_aggregation.erl
@@ -0,0 +1,18 @@
-module(oc_stat_count_aggregation).

-export([init/4,
add_sample/4,
export/2]).

init(Name, Description, {CTags, Keys}, Options) ->
prometheus_counter:declare([{name, Name},
{help, Description},
{labels, Keys},
{constant_labels, CTags}]),
Options.

add_sample(Name, Tags, Value, _Options) ->
prometheus_counter:inc(Name, Tags, Value).

export(_Name, _Options) ->
erlang:error(not_supported).
21 changes: 21 additions & 0 deletions src/oc_stat_distribution_aggregation.erl
@@ -0,0 +1,21 @@
-module(oc_stat_distribution_aggregation).

-export([init/4,
add_sample/4,
export/2]).

init(Name, Description, {CTags, Keys}, Options) ->
Buckets = prometheus_buckets:new(proplists:get_value(buckets, Options, default)),
prometheus_histogram:declare([{name, Name},
{help, Description},
{labels, Keys},
{constant_labels, CTags},
{buckets, Buckets}]),
Buckets.

add_sample(Name, Tags, Value, Buckets) ->
Position = prometheus_buckets:position(Buckets, Value),
prometheus_histogram:pobserve(default, Name, Tags, Buckets, Position, Value).

export(_Name, _Options) ->
erlang:error(not_supported).
18 changes: 18 additions & 0 deletions src/oc_stat_latest_aggregation.erl
@@ -0,0 +1,18 @@
-module(oc_stat_latest_aggregation).

-export([init/4,
add_sample/4,
export/2]).

init(Name, Description, {CTags, Keys}, Options) ->
prometheus_gauge:declare([{name, Name},
{help, Description},
{labels, Keys},
{constant_labels, CTags}]),
Options.

add_sample(Name, Tags, Value, _Options) ->
prometheus_gauge:set(Name, Tags, Value).

export(_Name, _Options) ->
erlang:error(not_supported).
18 changes: 18 additions & 0 deletions src/oc_stat_sum_aggregation.erl
@@ -0,0 +1,18 @@
-module(oc_stat_sum_aggregation).

-export([init/4,
add_sample/4,
export/2]).

init(Name, Description, {CTags, Keys}, Options) ->
prometheus_summary:declare([{name, Name},
{help, Description},
{labels, Keys},
{constant_labels, CTags}]),
Options.

add_sample(Name, Tags, Value, _Options) ->
prometheus_summary:observe(Name, Tags, Value).

export(_Name, _Options) ->
erlang:error(not_supported).
139 changes: 139 additions & 0 deletions src/oc_stat_view.erl
@@ -0,0 +1,139 @@
-module(oc_stat_view).

-export([register/5,
deregister/1,
subscribe/1,
subscribe/5,
batch_subscribe/1,
subscribed/1,
unsubscribe/1,
registered/1]).

-export([measure_views/1,
subscribed/0,
add_sample/3,
export/1]).

-export(['__init_backend__'/0]).

-include("opencensus.hrl").

-define(NAME_POS, 2).
-define(SUBSCRIBED_POS, 3).

-spec register(view_name(), view_description(), oc_tags:tags(),
measure_name(), aggregation()) -> ok.
register(Name, Description, Tags, Measure, Aggregation) ->
%% TODO: check Measure exists?
register(Name, Description, Tags, Measure, Aggregation, false).

-spec deregister(view_name()) -> ok.
deregister(Name) ->
ets:delete(?MODULE, Name),
ok.

-spec subscribe(map() | view_name()) -> ok.
subscribe(#{name := Name, description := Description, tags := Tags,
measure := Measure, aggregation := Aggregation}) ->
subscribe(Name, Description, Tags, Measure, Aggregation);
subscribe(Name) ->
ets:update_element(?MODULE, Name, {?SUBSCRIBED_POS, true}),
ok.

-spec subscribe(view_name(), view_description(), oc_tags:tags(),
measure_name(), aggregation()) -> ok.
subscribe(Name, Description, Tags, Measure, Aggregation) ->
%% TODO: check Measure exists?
register(Name, Description, Tags, Measure, Aggregation, true).

%% @doc
%% Subscribe many `Views' at once.
%% @end
-spec batch_subscribe(list(view_name() | map())) -> ok.
batch_subscribe(Views) when is_list(Views) ->
[ok = subscribe(View) || View <- Views],
ok.

-spec register(view_name(), view_description(), oc_tags:tags(),
measure_name(), aggregation(), boolean()) -> ok | no_return().
register(Name, Description, Tags, Measure, Aggregation, Subscribed) ->
NAggregation = normalize_aggregation(Aggregation),
NTags = normalize_tags(Tags),
case ets:match_object(?MODULE, {Measure, Name, '_', '_', '_', '_'}) of
[{Measure, Name, '_', Description, NTags, NAggregation}] ->
ets:update_element(?MODULE, Name, {?SUBSCRIBED_POS, Subscribed});
[_] ->
erlang:error({already_exists, Name});
_ ->
{AggregationModule, AggregationOptions} = NAggregation,
{CTags, Keys} = NTags,
%% TODO: transaction?
%% HACK: Keys reversed because tag_value reverses
NAggregationOptions = AggregationModule:init(Name, Description,
{CTags, lists:reverse(Keys)}, AggregationOptions),
ets:insert(?MODULE, {Measure, Name, Subscribed, Description, NTags,
{AggregationModule, NAggregationOptions}})
end,
ok.

-spec unsubscribe(view_name()) -> ok.
unsubscribe(Name) ->
ets:update_element(?MODULE, Name, {?SUBSCRIBED_POS, false}),
ok.

%% @doc
%% Checks whether a view `Name' is registered.
%% @end
-spec registered(view_name()) -> boolean().
registered(Name) ->
ets:lookup_element(?MODULE, Name, ?NAME_POS) =/= [].

measure_views(Measure) ->
ets:lookup(?MODULE, Measure).

subscribed({_Measure, _Name, Subscribed, _Description, _Tags, _Aggregation}) ->
Subscribed.

subscribed() ->
ets:match_object(?MODULE, {'_', '_', true, '_', '_', '_'}).

add_sample({_Measure, Name, _Subscribed, _Description, ViewTags, Aggregation}, ContextTags, Value) ->
{_, Keys} = ViewTags,
TagValues = tag_values(ContextTags, Keys),
{AggregationModule, AggregationOptions} = Aggregation,
AggregationModule:add_sample(Name, TagValues, Value, AggregationOptions).

export({_Measure, Name, _, Description, ViewTags, Aggregation}) ->
{AggregationModule, AggregationOptions} = Aggregation,
{STags, Keys} = ViewTags,
#{name => Name,
description => Description,
aggregation => Aggregation,
tags => {STags, lists:reverse(Keys)},
rows => AggregationModule:export(Name, AggregationOptions)}.

normalize_aggregation({Module, Options}) ->
{Module, Options};
normalize_aggregation(Module) when is_atom(Module) ->
{Module, []}.

normalize_tags([]) ->
{#{}, []};
normalize_tags(Tags) ->
normalize_tags(Tags, {#{}, []}).

normalize_tags([], {Map, List}) ->
{Map, lists:reverse(List)};
normalize_tags([First|Rest], {Map, List}) when is_map(First) ->
normalize_tags(Rest, {maps:merge(Map, First), List});
normalize_tags([First|Rest], {Map, List}) when is_atom(First) ->
normalize_tags(Rest, {Map, [First | List]}).

tag_values(Tags, Keys) ->
lists:foldl(fun(Key, Acc) ->
[maps:get(Key, Tags, undefined) | Acc]
end, [], Keys).

'__init_backend__'() ->
?MODULE = ets:new(?MODULE, [bag, named_table, public, {read_concurrency, true}]),
ok.
7 changes: 5 additions & 2 deletions src/oc_tags.erl
Expand Up @@ -11,11 +11,12 @@
format_error/1]).

-export_types([key/0,
value/0]).
value/0,
tags/0]).

-include("opencensus.hrl").

-type key() :: unicode:latin1_charlist().
-type key() :: atom() | unicode:latin1_charlist().
-type value() :: unicode:latin1_charlist().
-type tags() :: #{key() => value()}.

Expand Down Expand Up @@ -55,6 +56,8 @@ put(Key, Value, Tags) ->
{error, {?MODULE, invalid_tag}}
end.

verify_key(Key) when is_atom(Key) ->
verify_key(atom_to_list(Key));
verify_key(Key) ->
KeyLength = erlang:length(Key),
KeyLength > 0 andalso KeyLength < 256 andalso
Expand Down
3 changes: 2 additions & 1 deletion src/opencensus.app.src
Expand Up @@ -7,7 +7,8 @@
[kernel,
stdlib,
wts,
ctx
ctx,
prometheus
]},
{modules, []},
{env, [{send_interval_ms, 500}]},
Expand Down
6 changes: 6 additions & 0 deletions src/opencensus_sup.erl
Expand Up @@ -32,6 +32,12 @@ start_link() ->
init([]) ->
ok = oc_sampler:init(application:get_env(opencensus, sampler, {oc_sampler_always, []})),

ok = oc_stat_view:'__init_backend__'(),
ok = oc_stat_exporter:'__init_backend__'(),

StatConf = application:get_env(opencensus, stat, []),
oc_stat_view:batch_subscribe(proplists:get_value(views, StatConf, [])),

Reporter = #{id => oc_reporter,
start => {oc_reporter, start_link, []},
restart => permanent,
Expand Down

0 comments on commit 4009246

Please sign in to comment.