This repository has been archived by the owner on May 8, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
bucket.erl
76 lines (65 loc) · 2.04 KB
/
bucket.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
% UDP server
%
% needs: bucket_broker.erl
% used_by: bucket_broker.erl
-module(bucket).
-export([new/1, new/2]).
-include("kollekt.hrl").
new(BucketId) ->
Opts = #bucket_opts{},
new(BucketId, Opts).
new(BucketId, Opts) ->
spawn(fun() -> init(BucketId, Opts) end).
init(BucketId, Opts) ->
BucketStore = [],
{MegaSecs, Secs, _} = now(),
StartedAt = (MegaSecs * 1000000 + Secs),
loop(BucketId, BucketStore, StartedAt, Opts).
loop(BucketId, BucketStore, StartedAt, Opts) ->
receive
{data, Data} ->
NewBucketStore = [Data|BucketStore],
sucess,
OverAged = checkAge(StartedAt, Opts#bucket_opts.maxlife),
OverSized = length(NewBucketStore) >= Opts#bucket_opts.maxitems,
case {OverAged, OverSized} of
{true, false} ->
collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxlife);
{false, true} ->
collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxitems);
{true, true} ->
collector:add(BucketId, NewBucketStore),
bucket_broker:remove(BucketId, maxitems); % we take the maxitems as first prio reason
{_,_} ->
loop(BucketId, NewBucketStore, StartedAt, Opts)
end;
Any ->
% debug
io:format("Bucket :: No receiver matched! (with: ~p) [pid:~p]~n", [Any,self()]),
oops,
loop(BucketId, BucketStore, StartedAt, Opts)
after Opts#bucket_opts.timeout ->
OverSized = length(BucketStore) >= Opts#bucket_opts.maxitems,
OverAged = checkAge(StartedAt, Opts#bucket_opts.maxlife),
Reason = if
OverSized =:= true ->
maxitems;
OverAged =:= true ->
maxlife;
true ->
timeout
end,
collector:add(BucketId, BucketStore),
bucket_broker:remove(BucketId, Reason)
end.
% helper
checkAge(StartedAt, MaxLife) ->
CurrentTime = currentTime(),
CurrentLife = CurrentTime - StartedAt,
CurrentLife >= MaxLife.
currentTime() ->
{MegaSecs, Secs, _} = now(),
CurrentTime = (MegaSecs * 1000000 + Secs),
CurrentTime.