A Disk Queue for Log Replay in Erlang
- Queue items are written to segment files on disk to survive restart.
- Batch popping items out of queue with size/count limit.
- An
ack/2
API is provided to record the reader position within a segment. - Add config option
max_total_bytes
to limit the total size of replayq logs
Q0 = replayq:open(#{mem_only => true}),
Q1 = replayq:append(Q0, [Binary1, Binary2]),
{Q2, AckRef, [Binary1]} = replayq:pop(Q1, #{bytes_limt => 1}),
ok = replayq:ack(Q2, AckRef).
Q0 = replayq:open(#{dir => "/tmp/replayq-test", seg_bytes => 10000000}),
Q1 = replayq:append(Q0, [Binary1, Binary2]),
{Q2, AckRef, [Binary1]} = replayq:pop(Q1, #{count_limit => 1}),
ok = replayq:ack(Q2, AckRef).
Q0 = replayq:open(#{dir => "/tmp/replayq-test",
seg_bytes => 10000000,
sizer => fun({K, V}) -> size(K) + size(V) end,
marshaller => fun({K, V}) -> term_to_binary({K, V});
(Bin) -> binary_to_term(Bin)
end
}),
Q1 = replayq:append(Q0, [{<<"k1">>, <<"v1">>}, {<<"k2">>, <<"v2">>}]),
{Q2, AckRef, [{<<"k1">>, <<"v1">>}]} = replayq:pop(Q1, #{count_limit => 1}),
ok = replayq:ack(Q2, AckRef).
In this example, we want to create a batch of items such that all items in the batch are the same. We stop adding items to the batch as soon as we encounter an item that differs from the first item in the batch.
Q0 = replayq:open(#{mem_only => true}),
Q1 = replayq:append(Q0,
[
<<"type1">>,
<<"type1">>,
<<"type2">>,
<<"type2">>,
<<"type2">>,
<<"type3">>
]),
StopBeforeFunc =
fun(Item, #{current_type := none}) ->
#{current_type => Item};
(Item, #{current_type := Item}) ->
%% Item and current_type are the same
#{current_type => Item};
(Item, #{current_type := _OtherType}) ->
%% Return true to stop collecting items before the current item
true
end,
StopBeforeInitialState = #{current_type => none},
StopBefore = {StopBeforeFunc, StopBeforeInitialState},
%% We will stop because the stop_before function returns true
{Q2, AckRef1, [<<"type1">>, <<"type1">>]} =
replayq:pop(Q1, #{stop_before => StopBefore, count_limit => 10}),
ok = replayq:ack(Q2, AckRef1),
%% We will stop because of the count_limit
{Q3, AckRef2, [<<"type2">>]} =
replayq:pop(Q2, #{stop_before => StopBefore, count_limit => 1}),
ok = replayq:ack(Q3, AckRef2),
%% We will stop because the stop_before function returns true
{Q4, AckRef3, [<<"type2">>, <<"type2">>]} =
replayq:pop(Q3, #{stop_before => StopBefore, count_limit => 10}),
ok = replayq:ack(Q4, AckRef3),
%% We will stop because the queue gets empty
{Q5, AckRef4, [<<"type3">>]} =
replayq:pop(Q4, #{stop_before => StopBefore, count_limit => 10}),
ok = replayq:ack(Q5, AckRef4).
In offload mode, the disk queue is only used to offload queue tail segments.
Add offload => true
to Config
for replayq:open/1
.
Using offload => {true, volatile}
in the Config
for
replayq:open/1
will make it unconditionally clear any previous
segments and commit file in the given work directory. Also, it will
not dump in-memory data back to the disk when closing.