Permalink
Browse files

simplification, now basically working

  • Loading branch information...
1 parent 6e659cb commit 7062eb745a98fdb5bc70cd0360340d8d02c864e3 @astro committed Aug 10, 2009
Showing with 233 additions and 79 deletions.
  1. +24 −2 src/collectd.erl
  2. +19 −27 src/collectd_pkt.erl
  3. +60 −11 src/collectd_server.erl
  4. +10 −4 src/collectd_sup.erl
  5. +120 −35 src/collectd_values.erl
View
@@ -2,14 +2,36 @@
-behaviour(application).
+-export([add_server/1, add_server/2, add_server/3,
+ set_gauge/3, inc_counter/3, set_counter/3]).
+
%% Application callbacks
-export([start/2, stop/1]).
+add_server(Interval) ->
+ %% ff18::efc0:4a42
+ add_server(Interval, {65304,0,0,0,0,0,61376,19010}).
+
+add_server(Interval, Host) ->
+ add_server(Interval, Host, 25826).
+
+add_server(Interval, Host, Port) ->
+ collectd_sup:add_server(Interval, Host, Port).
+
+set_gauge(Type, TypeInstance, Values) ->
+ collectd_sup:cast_all({set_gauge, Type, TypeInstance, Values}).
+
+inc_counter(Type, TypeInstance, Values) ->
+ collectd_sup:cast_all({inc_counter, Type, TypeInstance, Values}).
+
+set_counter(Type, TypeInstance, Values) ->
+ collectd_sup:cast_all({set_counter, Type, TypeInstance, Values}).
+
%%====================================================================
%% Application callbacks
%%====================================================================
-start(_Type, StartArgs) ->
- case collectd_sup:start_link(StartArgs) of
+start(_Type, _StartArgs) ->
+ case collectd_sup:start_link() of
{ok, Pid} ->
{ok, Pid};
Error ->
View
@@ -1,34 +1,24 @@
-module(collectd_pkt).
--export([pack/4]).
+-export([pack/4,
+ pack_plugin/1, pack_plugin_instance/1,
+ pack_type/1, pack_type_instance/1,
+ pack_values/1]).
pack(Hostname, Time, Interval, Parts) ->
list_to_binary(
[pack_hostname(Hostname),
pack_time(Time),
pack_interval(Interval),
- lists:map(
- fun({Plugin, PluginInstances}) ->
- [pack_plugin(Plugin),
- lists:map(
- fun({PluginInstance, Types}) ->
- [pack_plugin_instance(PluginInstance),
- lists:map(
- fun({Type, TypeInstances}) ->
- [pack_type(Type),
- lists:map(
- fun({TypeInstance, Values}) ->
- pack_values(Values)
- end, TypeInstances)]
- end, Types)]
- end, PluginInstances)]
- end, Parts)]).
+ Parts]).
pack_part(Type, Part) ->
PartSize = size(Part) + 4,
- <<Type:16/big, PartSize:16/big, Part>>.
+ <<Type:16/big, PartSize:16/big, Part/binary>>.
+pack_string(S) when is_atom(S) ->
+ pack_string(atom_to_list(S));
pack_string(S) when is_list(S) ->
pack_string(list_to_binary(S));
pack_string(S) ->
@@ -47,29 +37,31 @@ pack_interval(Interval) ->
pack_part(7, pack_integer(Interval)).
pack_plugin(Plugin) ->
- pack_string(Plugin).
+ pack_part(2, pack_string(Plugin)).
pack_plugin_instance(PluginInstance) ->
- pack_string(PluginInstance).
+ pack_part(3, pack_string(PluginInstance)).
pack_type(Type) ->
- pack_string(Type).
+ pack_part(4, pack_string(Type)).
pack_type_instance(TypeInstance) ->
- pack_string_instance(TypeInstance).
+ pack_part(5, pack_string(TypeInstance)).
-define(TYPE_COUNTER, 0).
-define(TYPE_GAUGE, 1).
pack_values(TypesValues) ->
ValuesSize = length(TypesValues),
- Content = list_to_binary([<<TypesValues:16/big>>,
- lists:map(fun({Type, _}) ->
- <<Type:8>>
+ Content = list_to_binary([<<ValuesSize:16/big>>,
+ lists:map(fun({counter, _}) ->
+ <<?TYPE_COUNTER:8>>;
+ ({gauge, _}) ->
+ <<?TYPE_GAUGE:8>>
end, TypesValues),
- lists:map(fun({?TYPE_COUNTER, Value}) ->
+ lists:map(fun({counter, Value}) ->
<<Value:64/big>>;
- ({?TYPE_GAUGE, Value}) ->
+ ({gauge, Value}) ->
htond(Value)
end, TypesValues)
]),
View
@@ -9,7 +9,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {sock, host, port}).
+-record(state, {sock, interval, host, port, values}).
%%====================================================================
%% API
@@ -19,7 +19,7 @@
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Interval, Host, Port) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Interval, Host, Port], []).
+ gen_server:start_link(?MODULE, [Interval, Host, Port], []).
%%====================================================================
%% gen_server callbacks
@@ -33,13 +33,19 @@ start_link(Interval, Host, Port) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Interval, Host, Port]) ->
- {ok, Sock} = gen_udp:open(0),
+ AF = case Host of
+ {_, _, _, _} -> inet;
+ {_, _, _, _, _, _, _, _} -> inet6
+ end,
+ {ok, Sock} = gen_udp:open(0, [AF]),
I = self(),
+ Timeout = trunc(Interval * 1000),
spawn_link(fun() ->
- timer(I, Interval)
+ timer(I, Timeout)
end),
- {ok, #state{socket = Sock,
- host = Host, port = Port}, Interval}.
+ {ok, #state{sock = Sock, interval = Interval,
+ host = Host, port = Port,
+ values = collectd_values:new()}, Interval}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -60,8 +66,30 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
-handle_cast(timer, State) ->
- {noreply, State}.
+handle_cast(timer, #state{sock = Sock,
+ host = Host, port = Port,
+ interval = Interval,
+ values = Values} = State) ->
+ io:format("Timer with: ~p~n", [Values]),
+ send_packet(Sock, Host, Port, Interval, Values),
+ Values2 = collectd_values:forget_gauges(Values),
+ {noreply, State#state{values = Values2}};
+
+handle_cast({set_gauge, Type, TypeInstance, Values},
+ #state{values = Values1} = State) ->
+ Values2 = collectd_values:set_gauge(Values1, Type, TypeInstance, Values),
+ {noreply, State#state{values = Values2}};
+
+handle_cast({inc_counter, Type, TypeInstance, Values},
+ #state{values = Values1} = State) ->
+ Values2 = collectd_values:inc_counter(Values1, Type, TypeInstance, Values),
+ {noreply, State#state{values = Values2}};
+
+handle_cast({set_counter, Type, TypeInstance, Values},
+ #state{values = Values1} = State) ->
+ Values2 = collectd_values:set_counter(Values1, Type, TypeInstance, Values),
+ {noreply, State#state{values = Values2}}.
+
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
@@ -93,9 +121,30 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
-timer(Pid, Interval) ->
+send_packet(Sock, Host, Port, Interval, Values) ->
+ {MS, S, _} = erlang:now(),
+ Time = MS * 1000000 + S,
+ [Name, Hostname | _] = string:tokens(atom_to_list(node()), "@"),
+ io:format("values: ~p~n", [collectd_values:to_list(Values)]),
+ Parts = [collectd_pkt:pack_plugin("erlang"),
+ collectd_pkt:pack_plugin_instance(Name)
+ | lists:map(fun({type, Type}) ->
+ collectd_pkt:pack_type(Type);
+ ({type_instance, TypeInstance}) ->
+ collectd_pkt:pack_type_instance(TypeInstance);
+ ({values, ValuesType, Values1}) ->
+ Values2 = [{ValuesType, Value}
+ || Value <- Values1],
+ collectd_pkt:pack_values(Values2)
+ end, collectd_values:to_list(Values))],
+ Pkt = collectd_pkt:pack(Hostname, Time, Interval, Parts),
+ io:format("send(~p, ~p, ~p, ~p)~n",[Sock, Host, Port, Pkt]),
+ ok = gen_udp:send(Sock, Host, Port, Pkt).
+
+
+timer(Pid, Timeout) ->
gen_server:cast(Pid, timer),
receive
- after Interval * 1000000 ->
- timer(Pid, Interval)
+ after Timeout ->
+ timer(Pid, Timeout)
end.
View
@@ -3,7 +3,7 @@
-behaviour(supervisor).
%% API
--export([start_link/0]).
+-export([start_link/0, add_server/3, cast_all/1]).
%% Supervisor callbacks
-export([init/1]).
@@ -18,18 +18,24 @@ start_link() ->
add_server(Interval, Host, Port) when is_list(Host) ->
case inet:getaddrs(Host, inet6) of
- {ok, Addr} -> add_server(Addr, Port);
+ {ok, Addr} -> add_server(Interval, Addr, Port);
{error, _} -> case inet:getaddrs(Host, inet) of
- {ok, Addr} -> add_server(Addr, Port);
+ {ok, Addr} -> add_server(Interval, Addr, Port);
{error, Reason} -> {error, Reason}
end
end;
add_server(Interval, Host, Port) ->
- supervisor:start_child({local, ?SERVER},
+ supervisor:start_child(?SERVER,
{{server, Host, Port},
{collectd_server, start_link, [Interval, Host, Port]},
permanent, 1000000, worker, [collectd_server]}).
+cast_all(Msg) ->
+ Children = supervisor:which_children(?SERVER),
+ lists:foreach(fun({_Id, Pid, _Type, _Modules}) ->
+ gen_server:cast(Pid, Msg)
+ end, Children).
+
%%====================================================================
%% Supervisor callbacks
%%====================================================================
Oops, something went wrong.

0 comments on commit 7062eb7

Please sign in to comment.