Skip to content
Permalink
Browse files
Add HTTP reporter
The reporter can be configured using the following:

```
% Define http client callback
Client = fun(Url, Method, Headers, Body, ReporterOptions) ->
    ibrowse:send_req(Url, Headers, Method, Body, [])
end.

[
    {protocol, http},
    {endpoint, "http://127.0.0.1:14268"},
    {http_client, HttpClient}
]
```
  • Loading branch information
iilyak committed Jan 21, 2020
1 parent 076192b commit c4edcbf558e939e3279dc88fc7beb528befa2be0
Showing 7 changed files with 387 additions and 121 deletions.
@@ -45,10 +45,28 @@ Browses the tracing result:
$ firefox http://localhost:16686/
```

Selecting reporter
------------------

By default 'compact' jaeger.thrift over UDP reporter is used. However it is
possible to select different reporter. Bellow is a configuration matrics for
available options:

| protocol | thrift_protocol | jaeger port | description |
|----------|-----------------|-------------|----------------------------------|
| udp | compact | 6831 | accept jaeger.thrift over compact thrift protocol (default) |
| udp | binary | 6832 | accept jaeger.thrift over binary thrift protocol |
| http | N/A | 14268 | accept jaeger.thrift directly from clients |

The HTTP version is beneficial if you don't have jaeger agents deployed or your
spans are greater than max udp packet size (65Kb).
Otherwise it is better to use default.

References
-----------

- [OpenTracing](http://opentracing.io/)
- [Jaeger](https://uber.github.io/jaeger/)
- [jaeger-client-go/README.md](https://github.com/jaegertracing/jaeger-client-go/blob/v2.9.0/README.md)
- [Jaeger Client Library](https://github.com/jaegertracing/jaeger/blob/master/docs/client_libraries.md)
- [Jaeger default ports](https://www.jaegertracing.io/docs/1.8/getting-started/)
@@ -33,7 +33,7 @@
Tracer :: passage:tracer_id(),
Sampler :: passage_sampler:sampler().
start_tracer(Tracer, Sampler) ->
start_tracer(Tracer, Sampler, []).
start_tracer(Tracer, Sampler, [{protocol, udp}]).

%% @doc Starts a tracer for Jaeger.
%%
@@ -29,7 +29,6 @@
-module(jaeger_passage_reporter).

-behaviour(passage_reporter).
-behaviour(gen_server).

-include("constants.hrl").

@@ -44,36 +43,11 @@
-export_type([reporter_id/0]).
-export_type([start_option/0, start_options/0]).

%%------------------------------------------------------------------------------
%% Application Internal API
%%------------------------------------------------------------------------------
-export([start_link/2]).

%%------------------------------------------------------------------------------
%% 'passage_reporter' Callback API
%%------------------------------------------------------------------------------
-export([report/2]).

%%------------------------------------------------------------------------------
%% 'gen_server' Callback API
%%------------------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%%------------------------------------------------------------------------------
%% Macros & Records
%%------------------------------------------------------------------------------
-define(STATE, ?MODULE).

-record(?STATE,
{
socket :: gen_udp:socket(),
thrift_format :: thrift_protocol:format(),
agent_host :: inet:hostname(),
agent_port :: inet:port_number(),
default_service_name :: atom(),
process_tags :: passage:tags()
}).

%%------------------------------------------------------------------------------
%% Exported Types
%%------------------------------------------------------------------------------
@@ -83,28 +57,28 @@
-type start_options() :: [start_option()].
%% Options for {@link start/2}.

-type start_option() :: {thrift_format, thrift_protocol:format()}
| {agent_host, inet:hostname()}
| {agent_port, inet:port_number()}
-type start_option() :: jaeger_passage_reporter_udp:start_option()
| jaeger_passage_reporter_http:start_option()
| {protocol, udp | http}
| {default_service_name, atom()}
| {process_tags, passage:tags()}.

%% Common reporter options
%% <ul>
%% <li><b>protocol</b>: Communication protocol used to connect to jaeger. The value is used to select reporter module. Possible values are: `udp' | `http'. The default value is `udp'.</li>
%% <li><b>default_service_name</b>: The default service name. If a reporting span has `location.application' tag, the value is used as the service name instead of this. The default value is `ReporterId'.</li>
%% <li><b>process_tags</b>: The tags of the reporting process. The default value is `#{}'.</li>
%% </ul>
%% UDP reporter specific options
%% <ul>
%% <li><b>thrift_format</b>: The format for encoding thrift messages. The default value is `compact'.</li>
%% <li><b>agent_host</b>: The hostname of the jaeger agent. The default value is `"127.0.0.1"'.</li>
%% <li><b>agent_port</b>: The port of the jaeger agent. The default values for the thrift format `compact' and `binary' are `6831' and `6832' respectively.</li>
%% <li><b>default_service_name</b>: The default service name. If a reporting span has `location.application' tag, the value is used as the service name instead of this. The default value is `ReporterId'.</li>
%% <li><b>process_tags</b>: The tags of the reporting process. The default value is `#{}'.</li>
%% </ul>

%%------------------------------------------------------------------------------
%% Application Internal Functions
%%------------------------------------------------------------------------------
%% @private
-spec start_link(reporter_id(), start_options()) -> {ok, pid()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start_link(ReporterId, Options) ->
Name = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:start_link(Name, ?MODULE, {ReporterId, Options}, []).
%% HTTP reporter specific options
%% <ul>
%% <li><b>endpoint</b>: The jaeger endpoint URL for sending thrift messages. The default value is `http://127.0.0.1:14268'.</li>
%% </ul>

%%------------------------------------------------------------------------------
%% Exported Functions
@@ -113,7 +87,7 @@ start_link(ReporterId, Options) ->
-spec start(reporter_id()) -> {ok, passage_reporter:reporter()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start(ReporterId) ->
start(ReporterId, []).
start(ReporterId, [{protocol, udp}]).

%% @doc Starts a reporter process.
-spec start(reporter_id(), start_options()) -> {ok, Reporter} | {error, Reason} when
@@ -123,8 +97,13 @@ start(ReporterId, Options) ->
Args = [ReporterId, Options],
is_atom(ReporterId) orelse error(badarg, Args),
is_list(Options) orelse error(badarg, Args),

case jaeger_passage_reporter_sup:start_child(ReporterId, Options) of
ReporterModule = case proplists:get_value(protocol, Options, udp) of
udp -> jaeger_passage_reporter_udp;
http -> jaeger_passage_reporter_http;
_ -> error(badarg, Args)
end,
ReporterOptions = proplists:delete(protocol, Options),
case jaeger_passage_reporter_sup:start_child(ReporterId, ReporterModule, ReporterOptions) of
{error, Reason} -> {error, Reason};
{ok, _Pid} -> {ok, passage_reporter:new(?MODULE, ReporterId)}
end.
@@ -158,74 +137,3 @@ get_id(Reporter) ->
report(ReporterId, Span) ->
Server = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:cast(Server, {report, Span}).

%%------------------------------------------------------------------------------
%% 'gen_server' Callback Functions
%%------------------------------------------------------------------------------
%% @private
init({ReporterId, Options}) ->
Format = proplists:get_value(thrift_format, Options, compact),
DefaultPort =
case Format of
compact -> 6831;
binary -> 6832
end,
AgentHost = proplists:get_value(agent_host, Options, "127.0.0.1"),
AgentPort = proplists:get_value(agent_port, Options, DefaultPort),
DefaultServiceName = proplists:get_value(default_service_name, Options, ReporterId),
Tags0 = proplists:get_value(process_tags, Options, #{}),

{ok, Hostname} = inet:gethostname(),
{ok, Version} = application:get_key(vsn),
Tags1 =
maps:merge(
Tags0,
#{
?JAEGER_CLIENT_VERSION_TAG_KEY => list_to_binary(["jaeger_passage-", Version]),
?TRACER_HOSTNAME_TAG_KEY => list_to_binary(Hostname),
'erlang.node' => node()
}),
{ok, Socket} = gen_udp:open(0),
State =
#?STATE{
socket = Socket,
thrift_format = Format,
agent_host = AgentHost,
agent_port = AgentPort,
default_service_name = DefaultServiceName,
process_tags = Tags1
},
{ok, State}.

%% @private
handle_call(_Request, _From, State) ->
{noreply, State}.

%% @private
handle_cast({report, Span}, State) ->
handle_report(Span, State);
handle_cast(_Request, State) ->
{noreply, State}.

%% @private
handle_info(_Info, State) ->
{noreply, State}.

%% @private
terminate(_Reason, _State) ->
ok.

%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
-spec handle_report(passage_span:span(), #?STATE{}) -> {noreply, #?STATE{}}.
handle_report(Span, State = #?STATE{default_service_name = DefaultName, process_tags = Tags}) ->
Name = maps:get('location.application', passage_span:get_tags(Span), DefaultName),
Message = jaeger_passage_thrift:make_emit_batch_message(Name, Tags, [Span]),
Encoded = thrift_protocol:encode_message(Message, State#?STATE.thrift_format),
ok = gen_udp:send(State#?STATE.socket, State#?STATE.agent_host, State#?STATE.agent_port, Encoded),
{noreply, State}.

0 comments on commit c4edcbf

Please sign in to comment.