Skip to content

Commit

Permalink
Make buffer implementation pluggable
Browse files Browse the repository at this point in the history
See samples/pobox_queue_buf.erl for an example pobox_buf implementation.
  • Loading branch information
edescourtis committed May 4, 2018
1 parent 4d6da08 commit ba313af
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 14 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ The FSM can be illustrated as crappy ASCII as:

## Types of buffer

Currently, there are three types of buffers supported: queues and stacks,
and `keep_old` queues.
Currently, there are three types of built-in buffers supported: queues
and stacks, and `keep_old` queues. You can also provide your own
buffer implementation using the `pobox_buf` behaviour. See
`samples/pobox_queue_buf.erl` for an example implementation.

Queues will keep messages in order, and drop oldest messages to make
place for new ones. If you have a buffer of size 3 and receive messages
Expand Down Expand Up @@ -149,8 +151,9 @@ Where:
This also means that processes that terminate normally won't kill the
POBox.
- `MaxSize` is the maximum number of messages in a buffer.
- `BufferType` can be either `queue` or `stack` and specifies which type
is going to be used.
- `BufferType` can be either `queue`, `stack` or `keep_old` and specifies
which type is going to be used. You can also provide your buffer module
using `{mod, Module}`.
- `InitialState` can be either `passive` or `notify`. The default value
is set to `notify`. Having the buffer passive is desirable when you
start it during an asynchronous `init` and do not want to receive
Expand Down Expand Up @@ -313,7 +316,7 @@ This is more a wishlist than a roadmap, in no particular order:
- Provide default filter functions in a new module

## Changelog

- 1.0.5: added `pobox_buf` behaviour to add custom buffer implementations
- 1.0.4: move to gen\_statem implementation to avoid OTP 21 compile errors and OTP 20 warnings
- 1.0.3: fix typespecs to generate fewer errors
- 1.0.2: explicitly specify `registered` to be `[]` for
Expand All @@ -333,3 +336,4 @@ This is more a wishlist than a roadmap, in no particular order:
- Fred Hebert / @ferd: library generalization and current implementation
- Geoff Cant / @archaelus: design, original implementation
- Jean-Samuel Bédard / @jsbed: adaptation to gen\_statem behaviour
- Eric des Courtis / @edescourtis: added `pobox_buf` behaviour
35 changes: 27 additions & 8 deletions src/pobox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
-compile({no_auto_import,[size/1]}).

-ifdef(namespaced_types).
-record(buf, {type = undefined :: undefined | 'stack' | 'queue' | 'keep_old',
-record(buf, {type = undefined :: undefined | 'stack' | 'queue' | 'keep_old' | {mod, module()},
max = undefined :: undefined | max(),
size = 0 :: non_neg_integer(),
drop = 0 :: drop(),
data = undefined :: undefined | queue:queue() | list()}).
-else.
-record(buf, {type = undefined :: undefined | 'stack' | 'queue' | 'keep_old',
-record(buf, {type = undefined :: undefined | 'stack' | 'queue' | 'keep_old' | {mod, module()},
max = undefined :: undefined | max(),
size = 0 :: non_neg_integer(),
drop = 0 :: drop(),
Expand Down Expand Up @@ -75,8 +75,13 @@ start_link(Owner, Size, Type, StateName) when is_pid(Owner);
start_link(Name, Owner, Size, Type) ->
start_link(Name, Owner, Size, Type, notify).

-spec start_link(name(), pid(), max(), stack | queue,
-spec start_link(name(), pid(), max(), stack | queue | keep_old | {mod, module()},
'notify'|'passive') -> {ok, pid()}.
start_link(Name, Owner, Size, Type = {mod, Module}, StateName) when Size > 0,
is_atom(Module),
StateName =:= notify orelse
StateName =:= passive ->
gen_statem:start_link(Name, ?MODULE, {Owner, Size, Type, StateName}, []);
start_link(Name, Owner, Size, Type, StateName) when Size > 0,
Type =:= queue orelse
Type =:= stack orelse
Expand All @@ -85,6 +90,7 @@ start_link(Name, Owner, Size, Type, StateName) when Size > 0,
StateName =:= passive ->
gen_statem:start_link(Name, ?MODULE, {Owner, Size, Type, StateName}, []).


%% @doc Allows to take a given buffer, and make it larger or smaller.
%% A buffer can be made larger without overhead, but it may take
%% more work to make it smaller given there could be a
Expand Down Expand Up @@ -228,10 +234,11 @@ send_notification(S = #state{owner=Owner}) ->
{next_state, passive, S}.

%%% Generic buffer ops
-spec buf_new('queue' | 'stack' | 'keep_old', max()) -> buffer().
-spec buf_new('queue' | 'stack' | 'keep_old' | {mod, module()}, max()) -> buffer().
buf_new(queue, Size) -> #buf{type=queue, max=Size, data=queue:new()};
buf_new(stack, Size) -> #buf{type=stack, max=Size, data=[]};
buf_new(keep_old, Size) -> #buf{type=keep_old, max=Size, data=queue:new()}.
buf_new(keep_old, Size) -> #buf{type=keep_old, max=Size, data=queue:new()};
buf_new(T={mod, Mod}, Size) -> #buf{type=T, max=Size, data=Mod:new()}.

insert(Msg, B=#buf{type=T, max=Size, size=Size, drop=Drop, data=Data}) ->
B#buf{drop=Drop+1, data=push_drop(T, Msg, Size, Data)};
Expand Down Expand Up @@ -274,6 +281,11 @@ filter(T, Data, Fun, State, Msgs, Count, Drop) ->
end.

%% Specific buffer ops
push_drop(T = {mod, Mod}, Msg, Size, Data) ->
case erlang:function_exported(Mod, push_drop, 2) of
true -> Mod:push_drop(Msg, Data);
false -> push(T, Msg, drop(T, Size, Data))
end;
push_drop(keep_old, _Msg, _Size, Data) -> Data;
push_drop(T, Msg, Size, Data) -> push(T, Msg, drop(T, Size, Data)).

Expand All @@ -294,13 +306,20 @@ drop(stack, N, Size, L) ->
drop(keep_old, N, Size, Queue) ->
if Size > N -> element(1, queue:split(N, Queue));
Size =< N -> queue:new()
end.
end;
drop({mod, Mod}, N, Size, Data) ->
if Size > N -> Mod:drop(N, Data);
Size =< N -> Mod:new()
end.

push(queue, Msg, Q) -> queue:in(Msg, Q);
push(stack, Msg, L) -> [Msg|L];
push(keep_old, Msg, Q) -> queue:in(Msg, Q).
push(keep_old, Msg, Q) -> queue:in(Msg, Q);
push({mod, Mod}, Msg, Data) ->
Mod:push(Msg, Data).

pop(queue, Q) -> queue:out(Q);
pop(stack, []) -> {empty, []};
pop(stack, [H|T]) -> {{value,H}, T};
pop(keep_old, Q) -> queue:out(Q).
pop(keep_old, Q) -> queue:out(Q);
pop({mod, Mod}, Data) -> Mod:pop(Data).
18 changes: 18 additions & 0 deletions src/pobox_buf.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
%%%-------------------------------------------------------------------
%% @copyright Eric des Courtis
%% @author Eric des Courtis <eric.descourtis@mitel.com>
%% @doc Generic message buffer behaviour. For more
%% information, see README.txt
%% @end
%%%-------------------------------------------------------------------
-module(pobox_buf).

%% Behaviour API
-callback new() -> Buf :: any().
-callback push(Msg :: any(), Buf :: any()) -> Buf :: any().
-callback pop(Buf :: any()) -> {empty, Buf :: any()} | {{value, Msg :: any()}, Buf :: any()}.
-callback drop(N :: pos_integer(), Buf :: any()) -> Buf ::any().
-callback push_drop(Msg :: any(), Buf :: any()) -> Buf :: any().
-optional_callbacks([push_drop/2]).


20 changes: 20 additions & 0 deletions src/samples/pobox_queue_buf.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-module(pobox_queue_buf).

-behaviour(pobox_buf).
%% API
-export([new/0, push/2, pop/1, drop/2, push_drop/2]).

new() ->
queue:new().

push(Msg, Q) ->
queue:in(Msg, Q).

pop(Q) ->
queue:out(Q).

drop(N, Q) ->
element(2, queue:split(N, Q)).

push_drop(Msg, Q) ->
push(Msg, drop(1, Q)).
15 changes: 14 additions & 1 deletion test/pobox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
-include_lib("common_test/include/ct.hrl").
-compile(export_all).

all() -> [{group, queue}, {group, stack}, {group, keep_old}].
all() -> [{group, queue}, {group, stack}, {group, keep_old}, {group, pobox_queue_buf}].
groups() ->
%% Nested groups for eternal glory. We use one group to declare all the
%% tests ('all' group), and then nest it in 'stack' and 'queue' groups,
%% which all repeat the 'all' test but with a different implementation.
[{queue, [], [{group, all}]},
{stack, [], [{group, all}]},
{keep_old, [], [{group, all}]},
{pobox_queue_buf, [], [{group, all}]},
{all, [], [notify_to_active, notify_to_overflow, no_api_post,
filter_skip, filter_drop, active_to_notify,
passive_to_notify, passive_to_active, resize,
Expand Down Expand Up @@ -50,6 +51,8 @@ init_per_group(stack, Config) ->
[{type, stack} | Config];
init_per_group(keep_old, Config) ->
[{type, keep_old} | Config];
init_per_group(pobox_queue_buf, Config) ->
[{type, {mod, pobox_queue_buf}} | Config];
init_per_group(_, Config) ->
Config.

Expand Down Expand Up @@ -91,6 +94,8 @@ notify_to_active(Config) ->
case ?config(type, Config) of
queue -> % queues are in order
Sent = Msgs;
{mod, pobox_queue_buf} -> % queues are in order
Sent = Msgs;
stack -> % We don't care for the order
Sent = lists:sort(Msgs);
keep_old -> % in order, no benefit for out-of-order
Expand All @@ -116,6 +121,8 @@ notify_to_overflow(Config) ->
case ?config(type, Config) of
queue -> % queues are in order. We expect to have lost the 1st msgs
Msgs = lists:seq(Size+1, Size*2); % we dropped 1..Size
{mod, pobox_queue_buf} -> % queues are in order. We expect to have lost the 1st msgs
Msgs = lists:seq(Size+1, Size*2); % we dropped 1..Size
stack -> % We don't care for the order. We have all oldest + 1 newest
Kept = lists:sort([Size*2 | lists:seq(1,Size-1)]),
Kept = lists:sort(Msgs);
Expand Down Expand Up @@ -156,6 +163,8 @@ filter_skip(Config) ->
case ?config(type, Config) of
queue ->
[1,2,3] = [Msg1, Msg2, Msg3];
{mod, pobox_queue_buf} ->
[1,2,3] = [Msg1, Msg2, Msg3];
stack ->
[3,2,1] = [Msg1, Msg2, Msg3];
keep_old ->
Expand All @@ -182,6 +191,8 @@ filter_drop(Config) ->
case ?config(type, Config) of
queue ->
[1,4] = MsgList ++ [MsgExtra];
{mod, pobox_queue_buf} ->
[1,4] = MsgList ++ [MsgExtra];
stack ->
[3,4] = MsgList ++ [MsgExtra];
keep_old ->
Expand Down Expand Up @@ -280,6 +291,8 @@ resize(Config) ->
case ?config(type, Config) of
queue ->
Kept = [4,5,6];
{mod, pobox_queue_buf} ->
Kept = [4,5,6];
stack ->
Kept = [3,2,1];
keep_old ->
Expand Down

0 comments on commit ba313af

Please sign in to comment.