Permalink
Browse files

Add first version of collector logic

  • Loading branch information...
1 parent 6b10f8b commit f0324354611d1c4b9816f34df03b3561f14df428 @asaaki committed Apr 1, 2012
Showing with 126 additions and 10 deletions.
  1. +1 −1 run_emitter
  2. +4 −0 src/bucket.erl
  3. +89 −0 src/collector.erl
  4. +8 −0 src/kollekt.hrl
  5. +1 −0 src/kollekt_app.erl
  6. +1 −0 src/kollekt_sup.erl
  7. +5 −8 src/test_emitter.erl
  8. +17 −1 src/utils.erl
View
@@ -1,2 +1,2 @@
#!/bin/bash
-erl +K true -noshell -noinput -pa ./ebin -s test_emitter go -extra localhost 2323
+erl +K true +S 1 -noshell -noinput -pa ./ebin -s test_emitter go -extra localhost 2323
View
@@ -29,10 +29,13 @@ loop(BucketId, BucketStore, StartedAt, Opts) ->
OverSized = length(NewBucketStore) >= Opts#bucket_opts.maxitems,
case {OverAged, OverSized} of
{true, false} ->
+ collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxlife);
{false, true} ->
+ collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxitems);
{true, true} ->
+ collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxitems); % we take the maxitems as first prio reason
{_,_} ->
loop(BucketId, NewBucketStore, StartedAt, Opts)
@@ -55,6 +58,7 @@ loop(BucketId, BucketStore, StartedAt, Opts) ->
true ->
timeout
end,
+ collector:add(BucketId, BucketStore),
bucket_broker:remove(BucketId, Reason)
end.
View
@@ -0,0 +1,89 @@
+% collector service
+%
+% needs: -
+% used_by: bucket.erl
+
+-module(collector).
+-behaviour(gen_server).
+-include("kollekt.hrl").
+
+-export([
+ start/1, stop/0,
+ add/2, flush_db/0,
+ init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3
+ ]).
+
+start(_Args) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() -> gen_server:call(?MODULE, stop).
+
+init(_Args) ->
+ init_flush_looper(),
+ {ok, ets:new(?MODULE,[set])}.
+
+add(BucketId, BucketData) ->
+ gen_server:call(?MODULE, {add_bucket, BucketId, BucketData}).
+
+flush_db() ->
+ gen_server:call(?MODULE, {flush, toDisk}).
+
+handle_call({add_bucket, BucketId, BucketData}, _From, Table) ->
+ % also if a bucket session was destroyed before we will add new values
+ % if we find an existing bucket session in the collector table
+ utils:upsert_list(Table, BucketId, BucketData),
+ Reply = ok,
+ {reply, Reply, Table};
+
+handle_call({flush, toDisk}, _From, Table) ->
+ DataList = ets:tab2list(Table),
+
+ case length(DataList) of
+ 0 -> void;
+ _ ->
+ Output = lists:map(fun(Item)->
+ {BucketId, BucketData} = Item,
+ [BucketId,";",
+ lists:map(fun(E)->
+ [E, ","]
+ end, BucketData),
+ "\n"]
+ end, DataList),
+
+ file:write_file(
+ "xdump.tmp",
+ Output,
+ [write,binary,{encoding,utf8}])
+ end,
+
+ Reply = ok,
+ {reply, Reply, Table};
+
+handle_call(_Request, _From, Table) ->
+ Reply = ok,
+ {reply, Reply, Table}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+% flush looper
+init_flush_looper() ->
+ spawn(fun()-> flush_looper() end).
+
+flush_looper() ->
+ receive
+ _ -> flush_looper()
+ after 600000 ->
+ flush_db(),
+ flush_looper()
+ end.
View
@@ -45,3 +45,11 @@
{buckets, removed, maxlife},
{buckets, removed, maxitems}
]).
+
+% test emitter
+
+% ALLOWED CHARS
+-define(TEST_ALLOWED_SESSION_CHARS,
+ list_to_tuple("0123456789abcdef")).
+-define(TEST_ALLOWED_CHARS,
+ list_to_tuple("abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ 0123456789")).
View
@@ -6,6 +6,7 @@
start(_StartType, _StartArgs) ->
stats:start([]),
+ collector:start([]),
kueue:start([]),
bucket_broker:start([]),
spawn(fun() -> udp:start(?DEFAULT_PORT) end),
View
@@ -7,6 +7,7 @@
start_link() ->
ChildSpecs = [
?CHILD(stats, worker),
+ ?CHILD(collector, worker),
?CHILD(kueue, worker),
?CHILD(bucket_broker, worker),
?CHILD_W_ARGS(udp, worker, [2323])
View
@@ -19,20 +19,17 @@ init(Host, Port) ->
end,
{ok, Socket} = gen_udp:open(0, [binary]),
io:format("Test Emitter started. Target: ~p:~p~n" ,[UseHost, UsePort]),
- % we spawn 9 emitter processes
- utils:for(9, fun(_N)-> spawner(Socket, UseHost, UsePort) end),
+ % we spawn multiple emitter processes
+ utils:for(3, fun(_N)-> spawner(Socket, UseHost, UsePort) end),
init_loop().
init_loop() -> init_loop().
loop(Socket, Host, Port) ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- SessionSize = 1000000, % max amount of different sessions
- ValueSize = 10000, % max amount of different values
- Session = crypto:md5(list_to_binary(integer_to_list(random:uniform(SessionSize)+1))),
- Value = crypto:md5(list_to_binary(integer_to_list(random:uniform(ValueSize)+1))),
+ {A1,A2,A3} = now(), random:seed(A1, A2, A3),
+ Session = utils:random_str(7, ?TEST_ALLOWED_SESSION_CHARS),
+ Value = utils:random_str(23, ?TEST_ALLOWED_CHARS),
DataList = [Session,?DEFAULT_BUCKET_DATA_DELIMITER,Value],
BinData = list_to_binary(DataList),
View
@@ -1,8 +1,10 @@
-module(utils).
+-include("kollekt.hrl").
-export([
for/2,
upsert_list/3, set_v/3,
- int_set_v/3, int_get_v/2, int_get_kv/2
+ int_set_v/3, int_get_v/2, int_get_kv/2,
+ random_str/2
]).
% for loop - needs a fun/1 (gets loop counter value)!
@@ -57,3 +59,17 @@ int_get_kv(Store, Key) ->
[{K, V}] -> {K, V};
[] -> {Key, 0}
end.
+
+% random strings
+random_str(Len, Chars) ->
+ random:seed(now()),
+ random_str(Len, Chars, random:seed(now())).
+
+random_str(0, _Chars, _State) -> [];
+random_str(Len, Chars, State) ->
+ {Char, NewState} = random_char(Chars, State),
+ [Char|random_str(Len-1, Chars, NewState)].
+
+random_char(Chars, State) ->
+ {Select, NewState} = random:uniform_s(tuple_size(Chars), State),
+ {element(Select, Chars), NewState}.

0 comments on commit f032435

Please sign in to comment.