Skip to content

Commit

Permalink
refactor(replayq): Ensure brutal_purge safety
Browse files Browse the repository at this point in the history
This commit replaces all long-lived anonymous functions with M:F/A
calls. So that duing upgrade, the old beam can be purged as soon as new
beam is loaded. i.e. brutal_purge should not cause process kill.
  • Loading branch information
zmstone committed Nov 19, 2020
1 parent cb58b8f commit a490eb9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/replayq.appup.src
@@ -1,5 +1,5 @@
%% -*-: erlang -*-
{<<".*+">>,
[ {<<".*+">>, [ {load_module, replayq} ]} ],
[ {<<".*+">>, [ {load_module, replayq} ]} ]
[ {<<".*+">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ],
[ {<<".*+">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ]
}.
14 changes: 9 additions & 5 deletions src/replayq.erl
Expand Up @@ -4,7 +4,9 @@
-export([append/2, pop/2, ack/2, ack_sync/2, peek/1]).
-export([count/1, bytes/1, is_empty/1]).

-export([committer_loop/2]).

%% internal exports for beam reload
-export([committer_loop/2, default_sizer/1, default_marshaller/1]).

-export_type([config/0, q/0, ack_ref/0, sizer/0, marshaller/0]).

Expand Down Expand Up @@ -65,8 +67,6 @@
-define(FIRST_SEGNO, 1).
-define(NEXT_SEGNO(N), (N + 1)).
-define(STOP, stop).
-define(DEFAULT_SIZER, fun(I) when is_binary(I) -> erlang:size(I) end).
-define(DEFAULT_MARSHALLER, fun(I) when is_binary(I) -> I end).
-define(MEM_ONLY_ITEM(Bytes, Item), {Bytes, Item}).
-define(DISK_CP_ITEM(Id, Bytes, Item), {Id, Bytes, Item}).

Expand Down Expand Up @@ -577,11 +577,15 @@ make_dummy_segment(Segno) ->
#{fd => ?NO_FD, segno => Segno, bytes => 0, count => 0}.

get_sizer(C) ->
maps:get(sizer, C, ?DEFAULT_SIZER).
maps:get(sizer, C, fun ?MODULE:default_sizer/1).

get_marshaller(C) ->
maps:get(marshaller, C, ?DEFAULT_MARSHALLER).
maps:get(marshaller, C, fun ?MODULE:default_marshaller/1).

is_offload_mode(mem_only) -> false;
is_offload_mode(Config) when is_map(Config) ->
maps:get(offload, Config, false).

default_sizer(I) when is_binary(I) -> erlang:size(I).

default_marshaller(I) when is_binary(I) -> I.

0 comments on commit a490eb9

Please sign in to comment.