Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Refactor mem3_rpc:add_checkpoint/2
This is based on Adam Kocoloski's original add_checkpoint/2 but uses a
body recursive function to avoid the final reverse/filter steps.

BugzId: 21973
  • Loading branch information
davisp authored and rnewson committed Jul 23, 2014
1 parent e147621 commit e64dd0281f1d9b9b22511b3625c1fb1f97a42042
Showing 1 changed file with 71 additions and 62 deletions.
@@ -125,68 +125,11 @@ add_checkpoint({Props}, {History}) ->
% any larger update seq than we're currently recording.
FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),

% Insert the new entry into the history and trim the history
% to keep an exponentially increasing delta between checkpoints.
% We do this by defining logical buckets of exponentially
% increasing size and then keep the smallest and largest values
% in each bucket. We keep both min and max points so that
% we don't end up with empty buckets as new points are added.
%
% NB: We're guaranteed to keep the newest entry passed to this
% function because we filter out all larger update sequences
% which means it is guaranteed to be the smallest value in the
% first bucket with a delta of 0.
WithNewEntry = [{Props} | FilteredHistory],

% Tag each entry with the bucket id
BucketTagged = lists:map(fun({Entry}) ->
EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
BucketTag = case SourceSeq - EntrySourceSeq of
0 ->
0;
N when N > 0 ->
% This is int(log2(SourceSeq - EntrySourceSeq))
trunc(math:log(N) / math:log(2))
end,
{BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
end, WithNewEntry),

% Find the min/max entries for each bucket
Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
{MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
{ok, Value} -> Value;
error -> {nil, nil}
end,
NewMin = case MinEntry of
{MinDelta, _} when Delta < MinDelta ->
{Delta, Entry};
nil ->
{Delta, Entry};
_ ->
MinEntry
end,
NewMax = case MaxEntry of
{MaxDelta, _} when Delta > MaxDelta ->
{Delta, Entry};
nil ->
{Delta, Entry};
_ ->
MaxEntry
end,
dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
end, dict:new(), BucketTagged),

% Turn our bucket dict back into a list sorted by increasing
% deltas (which corresponds to decreasing source_seq values).
NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
% If there's a single point in a bucket its both the min
% and max entry so we account for that here.
if Min == Max ->
[element(2, Min)];
true ->
[element(2, Min), element(2, Max)]
end
end, lists:sort(dict:to_list(Buckets))),
% Re-bucket our history based on the most recent source
% sequence. This is where we drop old checkpoints to
% maintain the exponential distribution.
{_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
NewSourceHistory = [{Props} | RebucketedHistory],

% Finally update the source node history and we're done.
NodeRemoved = lists:keydelete(SourceNode, 1, History),
@@ -206,6 +149,72 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
lists:filter(TargetFilter, SourceFiltered).


%% @doc This function adjusts our history to maintain a
%% history of checkpoints that follow an exponentially
%% increasing age from the most recent checkpoint.
%%
%% The terms newest and oldest used in these comments
%% refers to the (NewSeq - CurSeq) difference where smaller
%% values are considered newer.
%%
%% It works by assigning each entry to a bucket and keeping
%% the newest and oldest entry in each bucket. Keeping
%% both the newest and oldest means that we won't end up
%% with empty buckets as checkpoints are promoted to new
%% buckets.
%%
%% The return value of this function is a two-tuple of the
%% form `{BucketId, History}` where BucketId is the id of
%% the bucket for the first entry in History. This is used
%% when recursing to detect the oldest value in a given
%% bucket.
%%
%% This function expects the provided history to be sorted
%% in descending order of source_seq values.
rebucket([], _NewSeq, Bucket) ->
{Bucket+1, []};
rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
case find_bucket(NewSeq, CurSeq, Bucket) of
Bucket ->
% This entry is in an existing bucket which means
% we will only keep it if its the oldest value
% in the bucket. To detect this we rebucket the
% rest of the list and only include Entry if the
% rest of the list is in a bigger bucket.
case rebucket(RestHistory, NewSeq, Bucket) of
{Bucket, NewHistory} ->
% There's another entry in this bucket so we drop the
% current entry.
{Bucket, NewHistory};
{NextBucket, NewHistory} when NextBucket > Bucket ->
% The rest of the history was rebucketed into a larger
% bucket so this is the oldest entry in the current
% bucket.
{Bucket, [{Entry} | NewHistory]}
end;
NextBucket when NextBucket > Bucket ->
% This entry is the newest in NextBucket so we add it
% to our history and continue rebucketing.
{_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
{NextBucket, [{Entry} | NewHistory]}
end.


%% @doc Find the bucket id for the given sequence pair.
find_bucket(NewSeq, CurSeq, Bucket) ->
% The +1 constant in this comparison is a bit subtle. The
% reason for it is to make sure that the first entry in
% the history is guaranteed to have a BucketId of 1. This
% also relies on never having a duplicated update
% sequence so adding 1 here guarantees a difference >= 2.
if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
find_bucket(NewSeq, CurSeq, Bucket+1);
true ->
Bucket
end.


rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),

0 comments on commit e64dd02

Please sign in to comment.