Skip to content
Permalink
Browse files
Implement basic functionalities
  • Loading branch information
sile committed Oct 23, 2017
1 parent 7af1fea commit 7d392c5c7b11ae7fe496cac987ec5dd56f6f1cfc
Showing 9 changed files with 670 additions and 9 deletions.
@@ -1,10 +1,10 @@
{"1.1.0",
[{<<"local">>,{pkg,<<"local">>,<<"0.2.0">>},0},
{<<"passage">>,{pkg,<<"passage">>,<<"0.1.0">>},0},
{<<"thrift_protocol">>,{pkg,<<"thrift_protocol">>,<<"0.1.1">>},0}]}.
{<<"passage">>,{pkg,<<"passage">>,<<"0.1.1">>},0},
{<<"thrift_protocol">>,{pkg,<<"thrift_protocol">>,<<"0.1.2">>},0}]}.
[
{pkg_hash,[
{<<"local">>, <<"91466ED42F5E3509E097C4F57ED58C03362D64D537EF8B4AF0CA90774F7DBAD3">>},
{<<"passage">>, <<"15B763CBC49B0CF885CADE1C19B6C108549BFCFFDE43BA7EF844A5735A3FE57E">>},
{<<"thrift_protocol">>, <<"0D7A949A12EFEF1B5765C13F6095B8D82895EC06EDFF07E1D9DE97E8D64AB68C">>}]}
{<<"passage">>, <<"E45F0D581D078343C61E3B72D540DB882D78AA0560B3B5E00BE4EB24E28EE47D">>},
{<<"thrift_protocol">>, <<"A3741E439567F6D7D232ACFF42AFF9ED317A1A32BF5F7EB1CC89F90DDEA0361B">>}]}
].
@@ -0,0 +1,9 @@
%% https://github.com/uber/jaeger-client-go/tree/v2.9.0/constants.go

-define(JAEGER_CLIENT_VERSION_TAG_KEY, 'jaeger.version').
-define(JAEGER_DEBUG_HEADER, 'jaeger-debug-id').
-define(JAEGER_BAGGAGE_HEADER, 'jaeger-baggage').
-define(TRACER_HOSTNAME_TAG_KEY, 'hostname').
-define(TRACER_IP_TAG_KEY, 'ip').
-define(TRACER_STATE_HEADER_NAME, <<"uber-trace-id">>).
-define(TRACE_BAGGAGE_HEADER(Suffix), <<"uberctx-", Suffix/binary>>).
@@ -6,6 +6,7 @@
{applications,
[kernel,
stdlib,
inets,
local,
passage,
thrift_protocol
@@ -1,12 +1,12 @@
-module(jaeger_passage_local_ns).

-export([child_spec/0]).
-export([tracer_name/1]).
-export([reporter_name/1]).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
local:name_server_child_spec(?MODULE).

-spec tracer_name(jaegerl:tracer_id()) -> local:otp_name().
tracer_name(TracerId) ->
local:otp_name({?MODULE, {tracer, TracerId}}).
-spec reporter_name(jaeger_passage_reporter:reporter_id()) -> local:otp_name().
reporter_name(TracerId) ->
local:otp_name({?MODULE, {reporeter, TracerId}}).
@@ -0,0 +1,170 @@
-module(jaeger_passage_reporter).

-behaviour(passage_reporter).
-behaviour(gen_server).

-include("constants.hrl").

%%------------------------------------------------------------------------------
%% Exported API
%%------------------------------------------------------------------------------
-export([start/1, start/2]).
-export([stop/1]).
-export([which_reporters/0]).

-export_type([reporter_id/0]).
-export_type([start_option/0, start_options/0]).

%%------------------------------------------------------------------------------
%% Application Internal API
%%------------------------------------------------------------------------------
-export([start_link/2]).

%%------------------------------------------------------------------------------
%% 'passage_reporter' Callback API
%%------------------------------------------------------------------------------
-export([report/2]).

%%------------------------------------------------------------------------------
%% 'gen_server' Callback API
%%------------------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%%------------------------------------------------------------------------------
%% Macros & Records
%%------------------------------------------------------------------------------
-define(STATE, ?MODULE).

-record(?STATE,
{
socket :: gen_udp:socket(),
thrift_format :: thrift_protocol:format(),
agent_host :: inet:hostname(),
agent_port :: inet:port_number(),
service_name :: atom(),
service_tags :: passage:tags()
}).

%%------------------------------------------------------------------------------
%% Exported Types
%%------------------------------------------------------------------------------
-type reporter_id() :: atom().

-type start_options() :: [start_option()].

-type start_option() :: {thrift_format, thrift_protocol:format()}
| {agent_host, inet:hostname()}
| {agent_port, inet:port_number()}
| {service_name, atom()}
| {service_tags, passage:tags()}.

%%------------------------------------------------------------------------------
%% Application Internal Functions
%%------------------------------------------------------------------------------
-spec start_link(reporter_id(), start_options()) -> {ok, pid()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start_link(ReporterId, Options) ->
Name = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:start_link(Name, ?MODULE, {ReporterId, Options}, []).

%%------------------------------------------------------------------------------
%% Exported Functions
%%------------------------------------------------------------------------------
%% @equiv start(ReporterId, [])
-spec start(reporter_id()) -> {ok, passage_reporter:reporter()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start(ReporterId) ->
start(ReporterId, []).

-spec start(reporter_id(), start_options()) -> {ok, passage_reporter:reporter()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start(ReporterId, Options) ->
case jaeger_passage_reporter_sup:start_child(ReporterId, Options) of
{error, Reason} -> {error, Reason};
{ok, _Pid} -> {ok, passage_reporter:new(?MODULE, ReporterId)}
end.

-spec stop(reporter_id()) -> ok.
stop(ReporterId) ->
jaeger_passage_reporter_sup:stop_child(ReporterId).

-spec which_reporters() -> [reporter_id()].
which_reporters() ->
jaeger_passage_reporter_sup:which_children().

%%------------------------------------------------------------------------------
%% 'passage_reporter' Callback Functions
%%------------------------------------------------------------------------------
%% @private
report(ReporterId, Span) ->
Server = jaeger_passage_local_ns:reporter_name(ReporterId),
gen_server:cast(Server, {report, Span}).

%%------------------------------------------------------------------------------
%% 'gen_server' Callback Functions
%%------------------------------------------------------------------------------
%% @private
init({ReporterId, Options}) ->
Format = proplists:get_value(thrift_protocol, Options, compact),
DefaultPort =
case Format of
compact -> 6831;
binary -> 6832
end,
AgentHost = proplists:get_value(agent_host, Options, "127.0.0.1"),
AgentPort = proplists:get_value(agent_port, Options, DefaultPort),
ServiceName = proplists:get_value(service_name, Options, ReporterId),
Tags0 = proplists:get_value(service_tags, Options, #{}),

{ok, Hostname} = inet:gethostname(),
{ok, Version} = application:get_key(vsn),
Tags1 =
maps:merge(
Tags0,
#{
?JAEGER_CLIENT_VERSION_TAG_KEY => list_to_binary(["jaeger_passage-", Version]),
?TRACER_HOSTNAME_TAG_KEY => list_to_binary(Hostname)
}),
{ok, Socket} = gen_udp:open(0),
State =
#?STATE{
socket = Socket,
thrift_format = Format,
agent_host = AgentHost,
agent_port = AgentPort,
service_name = ServiceName,
service_tags = Tags1
},
{ok, State}.

%% @private
handle_call(_Request, _From, State) ->
{noreply, State}.

%% @private
handle_cast({report, Span}, State) ->
handle_report(Span, State);
handle_cast(_Request, State) ->
{noreply, State}.

%% @private
handle_info(_Info, State) ->
{noreply, State}.

%% @private
terminate(_Reason, _State) ->
ok.

%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
-spec handle_report(passage_span:span(), #?STATE{}) -> {noreply, #?STATE{}}.
handle_report(Span, State = #?STATE{service_name = Name, service_tags = Tags}) ->
Message = jaeger_passage_thrift:make_emit_batch_message(Name, Tags, [Span]),
Encoded = thrift_protocol:encode_message(Message, State#?STATE.thrift_format),
ok = gen_udp:send(State#?STATE.socket, State#?STATE.agent_host, State#?STATE.agent_port, Encoded),
{noreply, State}.
@@ -0,0 +1,39 @@
-module(jaeger_passage_reporter_sup).

-behaviour(supervisor).

-export([start_link/0]).
-export([start_child/2]).
-export([stop_child/1]).
-export([which_children/0]).

-export([init/1]).

-define(SERVER, ?MODULE).

start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).

-spec start_child(jaeger_passage_reporter:reporter_id(), jaeger_passage_reporter:start_options()) ->
{ok, pid()} | {error, Reason} when
Reason :: {already_started, pid()} | term().
start_child(ReporterId, Options) ->
Child = #{
id => ReporterId,
start => {jaeger_passage_reporter, start_link, [ReporterId, Options]},
restart => permanent
},
supervisor:start_child(?MODULE, Child).

-spec stop_child(jaeger_passage_reporter:reporter_id()) -> ok.
stop_child(ReporterId) ->
_ = supervisor:terminate_child(?MODULE, ReporterId),
_ = supervisor:delete_child(?MODULE, ReporterId),
ok.

-spec which_children() -> [jaeger_passage_reporter:reporter_id()].
which_children() ->
[Id || {Id, _, _, _} <- supervisor:which_children(?MODULE)].

init([]) ->
{ok, {#{}, []}}.

0 comments on commit 7d392c5

Please sign in to comment.