Permalink
Browse files

start

  • Loading branch information...
0 parents commit 6e659cbd0328308a35d877b3deddf2c401f161cc @astro committed Jul 26, 2009
Showing with 298 additions and 0 deletions.
  1. +30 −0 src/collectd.erl
  2. +80 −0 src/collectd_pkt.erl
  3. +101 −0 src/collectd_server.erl
  4. +41 −0 src/collectd_sup.erl
  5. +46 −0 src/collectd_values.erl
@@ -0,0 +1,30 @@
+-module(collectd).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%%====================================================================
+%% Application callbacks
+%%====================================================================
+start(_Type, StartArgs) ->
+ case collectd_sup:start_link(StartArgs) of
+ {ok, Pid} ->
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: stop(State) -> void()
+%% Description: This function is called whenever an application
+%% has stopped. It is intended to be the opposite of Module:start/2 and
+%% should do any necessary cleaning up. The return value is ignored.
+%%--------------------------------------------------------------------
+stop(_State) ->
+ ok.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
@@ -0,0 +1,80 @@
+-module(collectd_pkt).
+
+-export([pack/4]).
+
+pack(Hostname, Time, Interval, Parts) ->
+ list_to_binary(
+ [pack_hostname(Hostname),
+ pack_time(Time),
+ pack_interval(Interval),
+ lists:map(
+ fun({Plugin, PluginInstances}) ->
+ [pack_plugin(Plugin),
+ lists:map(
+ fun({PluginInstance, Types}) ->
+ [pack_plugin_instance(PluginInstance),
+ lists:map(
+ fun({Type, TypeInstances}) ->
+ [pack_type(Type),
+ lists:map(
+ fun({TypeInstance, Values}) ->
+ pack_values(Values)
+ end, TypeInstances)]
+ end, Types)]
+ end, PluginInstances)]
+ end, Parts)]).
+
+
+pack_part(Type, Part) ->
+ PartSize = size(Part) + 4,
+ <<Type:16/big, PartSize:16/big, Part>>.
+
+pack_string(S) when is_list(S) ->
+ pack_string(list_to_binary(S));
+pack_string(S) ->
+ <<S/binary, 0>>.
+
+pack_integer(I) ->
+ <<I:64/big>>.
+
+pack_hostname(Hostname) ->
+ pack_part(0, pack_string(Hostname)).
+
+pack_time(Time) ->
+ pack_part(1, pack_integer(Time)).
+
+pack_interval(Interval) ->
+ pack_part(7, pack_integer(Interval)).
+
+pack_plugin(Plugin) ->
+ pack_string(Plugin).
+
+pack_plugin_instance(PluginInstance) ->
+ pack_string(PluginInstance).
+
+pack_type(Type) ->
+ pack_string(Type).
+
+pack_type_instance(TypeInstance) ->
+ pack_string_instance(TypeInstance).
+
+-define(TYPE_COUNTER, 0).
+-define(TYPE_GAUGE, 1).
+
+pack_values(TypesValues) ->
+ ValuesSize = length(TypesValues),
+ Content = list_to_binary([<<TypesValues:16/big>>,
+ lists:map(fun({Type, _}) ->
+ <<Type:8>>
+ end, TypesValues),
+ lists:map(fun({?TYPE_COUNTER, Value}) ->
+ <<Value:64/big>>;
+ ({?TYPE_GAUGE, Value}) ->
+ htond(Value)
+ end, TypesValues)
+ ]),
+ pack_part(6, Content).
+
+htond(Float) ->
+ <<_:16, B/binary>> = term_to_binary(Float, [{minor_version, 1}]),
+ B.
@@ -0,0 +1,101 @@
+-module(collectd_server).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {sock, host, port}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(Interval, Host, Port) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Interval, Host, Port], []).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Interval, Host, Port]) ->
+ {ok, Sock} = gen_udp:open(0),
+ I = self(),
+ spawn_link(fun() ->
+ timer(I, Interval)
+ end),
+ {ok, #state{socket = Sock,
+ host = Host, port = Port}, Interval}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(timer, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, #state{sock = Socket}) ->
+ ok = gen_udp:close(Socket).
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+timer(Pid, Interval) ->
+ gen_server:cast(Pid, timer),
+ receive
+ after Interval * 1000000 ->
+ timer(Pid, Interval)
+ end.
@@ -0,0 +1,41 @@
+-module(collectd_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API functions
+%%====================================================================
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+add_server(Interval, Host, Port) when is_list(Host) ->
+ case inet:getaddrs(Host, inet6) of
+ {ok, Addr} -> add_server(Addr, Port);
+ {error, _} -> case inet:getaddrs(Host, inet) of
+ {ok, Addr} -> add_server(Addr, Port);
+ {error, Reason} -> {error, Reason}
+ end
+ end;
+add_server(Interval, Host, Port) ->
+ supervisor:start_child({local, ?SERVER},
+ {{server, Host, Port},
+ {collectd_server, start_link, [Interval, Host, Port]},
+ permanent, 1000000, worker, [collectd_server]}).
+
+%%====================================================================
+%% Supervisor callbacks
+%%====================================================================
+init([]) ->
+ {ok,{{one_for_one,1,1}, []}}.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
@@ -0,0 +1,46 @@
+-module(collectd_values).
+
+-export([new/0, set_gauge/3, inc_counter/3, set_counter/3, make_parts/1]).
+
+new() ->
+ [].
+
+set_val(Values, [], NewValue) ->
+ [NewValue | Values];
+set_val(Values, [K | R], NewValue) ->
+ case lists:keysearch(K, 1, Values) of
+ false ->
+ [set_val1([], R, NewValue) | Values];
+ {true, {K, SubValues1}} ->
+ SubValues2 = set_val(SubValues1, R, NewValue),
+ lists:keyreplace(K, 1, Values, {K, SubValues2})
+ end.
+
+map_vals(Values, F) ->
+ map_vals1(Values, F, []).
+
+map_vals1(Values, F, PPTT) when size(PPTT) == 4 ->
+ F(lists:reverse(PPTT), Values);
+map_vals1(Values, F, PPTT) ->
+ lists:map(fun({K, SubValues}) ->
+ {K, map_vals(SubValues, F, [K | PPTT])}
+ end, Values).
+
+set_gauge(Values, PPTT, Gauges) ->
+ set_val(Values, PPTT, {gauge, Gauges}).
+
+inc_counter(Values, PPTT, Increment) ->
+ NewValues = map_vals(Values, fun(PPTT, {counter, OldValues}) ->
+ {counter,
+ zipwith(fun(Old, New) ->
+ Old + New
+ end, OldValues, NewValues)}
+ end),
+ if
+ NewValues == Values ->
+ set_val(Values, PPTT, Increment);
+ true ->
+ incremented
+ end.
+
+set_counter(Values, PPTT

0 comments on commit 6e659cb

Please sign in to comment.