Skip to content

Commit

Permalink
feat: integrate OpenTelemetry log handler
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Nov 22, 2023
1 parent becaa0f commit d9f95cd
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 71 deletions.
3 changes: 2 additions & 1 deletion apps/emqx_conf/src/emqx_conf_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@

%% Callback to upgrade config after loaded from config file but before validation.
upgrade_raw_conf(RawConf) ->
emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf).
RawConf1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf),
emqx_otel_schema:upgrade_legacy_metrics(RawConf1).

%% root config should not have a namespace
namespace() -> undefined.
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_machine/src/emqx_machine_boot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ stop_apps() ->
?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(),
ok = emqx_conf_app:unset_config_loaded(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
%% Mute otel deps application.
_ = emqx_otel:stop_otel(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
ok = emqx_otel_app:stop_deps().

%% Those port apps are terminated after the main apps
%% Don't need to stop when reboot.
Expand Down
10 changes: 8 additions & 2 deletions apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
{application, emqx_opentelemetry, [
{description, "OpenTelemetry for EMQX Broker"},
{vsn, "0.1.3"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_otel_app, []}},
{applications, [kernel, stdlib, emqx]},
{applications, [
kernel,
stdlib,
emqx,
%% otel metrics depend on emqx_mgmt_cache
emqx_management
]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
Expand Down
22 changes: 18 additions & 4 deletions apps/emqx_opentelemetry/src/emqx_otel_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,24 @@ otel_config_schema() ->

otel_config_example() ->
#{
enable => true,
exporter =>
#{
logs => #{
enable => true,
exporter => #{
endpoint => "http://localhost:4317",
ssl_options => #{
enable => false
}
},
level => warning
},
metrics => #{
enable => true,
exporter => #{
endpoint => "http://localhost:4317",
interval => "10s"
interval => "10s",
ssl_options => #{
enable => false
}
}
}
}.
6 changes: 6 additions & 0 deletions apps/emqx_opentelemetry/src/emqx_otel_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
-behaviour(application).

-export([start/2, stop/1]).
-export([stop_deps/0]).

start(_StartType, _StartArgs) ->
emqx_otel_config:add_handler(),
ok = emqx_otel_config:add_otel_log_handler(),
emqx_otel_sup:start_link().

stop(_State) ->
emqx_otel_config:remove_handler(),
_ = emqx_otel_config:remove_otel_log_handler(),
ok.

stop_deps() ->
emqx_otel_config:stop_all_otel_apps().
114 changes: 108 additions & 6 deletions apps/emqx_opentelemetry/src/emqx_otel_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

-define(OPTL, [opentelemetry]).

-define(OTEL_EXPORTER, opentelemetry_exporter).
-define(OTEL_LOG_HANDLER, otel_log_handler).
-define(OTEL_LOG_HANDLER_ID, opentelemetry_handler).

-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
-export([add_otel_log_handler/0, remove_otel_log_handler/0]).
-export([stop_all_otel_apps/0]).
-export([otel_exporter/1]).

update(Config) ->
case
Expand All @@ -45,14 +52,109 @@ remove_handler() ->
ok = emqx_config_handler:remove_handler(?OPTL),
ok.

post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) ->
ok;
post_config_update(?OPTL, _Req, New, _Old, AppEnvs) ->
application:set_env(AppEnvs),
ensure_otel(New);
MetricsRes = ensure_otel_metrics(New),
LogsRes = ensure_otel_logs(New),
_ = maybe_stop_all_otel_apps(New),
case {MetricsRes, LogsRes} of
{ok, ok} -> ok;
Other -> {error, Other}
end;
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok.

ensure_otel(#{enable := true} = Conf) ->
_ = emqx_otel:stop_otel(),
emqx_otel:start_otel(Conf);
ensure_otel(#{enable := false}) ->
emqx_otel:stop_otel().
stop_all_otel_apps() ->
_ = application:stop(opentelemetry),
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
ok.

add_otel_log_handler() ->
ensure_otel_logs(emqx:get_config(?OPTL)).

remove_otel_log_handler() ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID).

otel_exporter(ExporterConf) ->
#{
endpoint := Endpoint,
protocol := Proto,
ssl_options := SSLOpts
} = ExporterConf,
{?OTEL_EXPORTER, #{
endpoint => Endpoint,
protocol => Proto,
ssl_options => ssl_opts(Endpoint, SSLOpts)
}}.

%% Internal functions

ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) ->
_ = emqx_otel_metrics:stop_otel(),
emqx_otel_metrics:start_otel(MetricsConf);
ensure_otel_metrics(#{metrics := #{enable := false}}) ->
emqx_otel_metrics:stop_otel();
ensure_otel_metrics(_) ->
ok.

ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) ->
ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID),
ok = ensure_log_apps(),
HandlerConf = tr_handler_conf(LogsConf),
%% NOTE: should primary logger level be updated if it's higher than otel log level?
logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf);
ensure_otel_logs(#{logs := #{enable := false}}) ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID).

remove_handler_if_present(HandlerId) ->
case logger:get_handler_config(HandlerId) of
{ok, _} ->
ok = logger:remove_handler(HandlerId);
_ ->
ok
end.

ensure_log_apps() ->
{ok, _} = application:ensure_all_started(opentelemetry_exporter),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
ok.

maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) ->
stop_all_otel_apps();
maybe_stop_all_otel_apps(_) ->
ok.

tr_handler_conf(Conf) ->
#{
level := Level,
max_queue_size := MaxQueueSize,
exporting_timeout := ExportingTimeout,
scheduled_delay := ScheduledDelay,
exporter := ExporterConf
} = Conf,
#{
level => Level,
config => #{
max_queue_size => MaxQueueSize,
exporting_timeout_ms => ExportingTimeout,
scheduled_delay_ms => ScheduledDelay,
exporter => otel_exporter(ExporterConf)
}
}.

ssl_opts(Endpoint, SSLOpts) ->
case is_ssl(Endpoint) of
true ->
emqx_tls_lib:to_client_opts(SSLOpts#{enable => true});
false ->
[]
end.

is_ssl(<<"https://", _/binary>> = _Endpoint) ->
true;
is_ssl(_Endpoint) ->
false.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_otel).
-module(emqx_otel_metrics).
-include_lib("emqx/include/logger.hrl").

-export([start_otel/1, stop_otel/0]).
Expand All @@ -29,17 +29,19 @@ start_otel(Conf) ->
assert_started(supervisor:start_child(?SUPERVISOR, Spec)).

stop_otel() ->
Res =
case erlang:whereis(?SUPERVISOR) of
undefined ->
ok;
Pid ->
case supervisor:terminate_child(Pid, ?MODULE) of
ok -> supervisor:delete_child(Pid, ?MODULE);
{error, not_found} -> ok;
Error -> Error
end
end,
ok = cleanup(),
case erlang:whereis(?SUPERVISOR) of
undefined ->
ok;
Pid ->
case supervisor:terminate_child(Pid, ?MODULE) of
ok -> supervisor:delete_child(Pid, ?MODULE);
{error, not_found} -> ok;
Error -> Error
end
end.
Res.

start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
Expand Down Expand Up @@ -71,32 +73,40 @@ setup(_Conf) ->
ok.

ensure_apps(Conf) ->
#{exporter := #{interval := ExporterInterval}} = Conf,
#{exporter := #{interval := ExporterInterval} = Exporter} = Conf,
{ok, _} = application:ensure_all_started(opentelemetry_exporter),
{ok, _} = application:ensure_all_started(opentelemetry),
_ = application:stop(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_api_experimental),

_ = opentelemetry_experimental:stop_default_metrics(),
ok = application:set_env(
opentelemetry_experimental,
readers,
[
#{
id => emqx_otel_metric_reader,
module => otel_metric_reader,
config => #{
exporter => {opentelemetry_exporter, #{}},
exporter => emqx_otel_config:otel_exporter(Exporter),
export_interval_ms => ExporterInterval
}
}
]
),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_api_experimental),
{ok, _} = opentelemetry_experimental:start_default_metrics(),
ok.

cleanup() ->
_ = application:stop(opentelemetry),
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
safe_stop_default_metrics().

safe_stop_default_metrics() ->
try
_ = opentelemetry_experimental:stop_default_metrics()
catch
%% noramal scenario, metrics supervisor is not started
exit:{noproc, _} -> ok
end,
ok.

create_metric_views() ->
Expand Down

0 comments on commit d9f95cd

Please sign in to comment.