Skip to content
Browse files

Simplify segment iteration and use snapshots between tree updates

Simplify segment iteration by using LevelDB iterators directly, rather
than using the existing fold operation. This allows the iteration process
to use a single iterator and re-seek it as necessary, rather than throw
exceptions and re-issue new folds.

Add snapshot support, which is currently implemented by constructing a
new iterator only when update_tree/1 is called, and using this iterator
for all segment operations. This change allows new keys to be inserted
into the hashtree while a concurrent hash exchange is on-going. Thus,
inserting keys and performing comparisons are entirely isolated.
  • Loading branch information...
1 parent d04031e commit 08386dda5139a93357e88f0008cb41b8cbaef9f7 @jtuple jtuple committed May 7, 2012
Showing with 52 additions and 37 deletions.
  1. +52 −37 src/hashtree.erl
View
89 src/hashtree.erl
@@ -20,6 +20,7 @@
tree,
ref,
path,
+ itr,
dirty_segments}).
%%%===================================================================
@@ -66,9 +67,10 @@ insert(Key, ObjHash, State) ->
update_tree(State=#state{dirty_segments=Dirty}) ->
%% Segments = gb_sets:to_list(Dirty),
Segments = bitarray_to_list(Dirty),
- State2 = update_tree(Segments, State),
- %% State2#state{dirty_segments=gb_sets:new()}.
- State2#state{dirty_segments=bitarray_new(?NUM_SEGMENTS)}.
+ State2 = snapshot(State),
+ State3 = update_tree(Segments, State2),
+ %% State3#state{dirty_segments=gb_sets:new()}.
+ State3#state{dirty_segments=bitarray_new(?NUM_SEGMENTS)}.
update_tree([], State) ->
State;
@@ -199,53 +201,47 @@ decode(Bin) ->
encode_bucket(TreeId, Level, Bucket) ->
<<TreeId:160/integer,$b,Level:64/integer,Bucket:64/integer>>.
-%% select_segment(Ref, Segment) ->
-%% try
-%% eleveldb:fold(Ref,
-%% fun({K, V}, Acc) ->
-%% {Seg, _} = decode(K),
-%% (Seg == Segment) orelse throw({break, Acc}),
-%% [{K, V} | Acc]
-%% end,
-%% [],
-%% [{first_key, encode(Segment,<<>>)}])
-%% catch
-%% {break, AccFinal} ->
-%% AccFinal
-%% end.
-
hashes(State, Segments) ->
multi_select_segment(State, Segments, fun hash/1).
key_hashes(State, Segment) ->
multi_select_segment(State, [Segment], fun(X) -> X end).
-multi_select_segment(State, Segments, F) ->
- multi_select_segment(State, Segments, F, []).
+snapshot(State) ->
+ %% Abuse eleveldb iterators as snapshots
+ catch eleveldb:iterator_close(State#state.itr),
+ {ok, Itr} = eleveldb:iterator(State#state.ref, []),
+ State#state{itr=Itr}.
-multi_select_segment(State=#state{id=Id, ref=Ref}, Segments, F, Acc) ->
+multi_select_segment(#state{id=Id, itr=Itr}, Segments, F) ->
[First | Rest] = Segments,
- try
- {_, _, LastSegment, _, _, LastAcc, FA} =
- eleveldb:fold(Ref, fun iterate/2, {Ref, Id, First, Rest, F, [], Acc},
- [{first_key, encode(Id, First, <<>>)}]),
- [{LastSegment, F(LastAcc)} | FA]
- catch
- {break, [], FinalAcc} ->
- FinalAcc;
- {break, Remaining, FinalAcc} ->
- multi_select_segment(State, Remaining, F, FinalAcc)
- end.
-
-iterate({K, V}, {Ref, Id, Segment, Segments, F, Acc, FinalAcc}) ->
+ Acc0 = {Itr, Id, First, Rest, F, [], []},
+ Seek = encode(Id, First, <<>>),
+ {_, _, LastSegment, _, _, LastAcc, FA} =
+ iterate(eleveldb:iterator_move(Itr, Seek), Acc0),
+ [{LastSegment, F(LastAcc)} | FA].
+
+iterate({error, invalid_iterator}, AllAcc) ->
+ AllAcc;
+iterate({ok, K, V}, AllAcc={Itr, Id, Segment, Segments, F, Acc, FinalAcc}) ->
{SegId, Seg, _} = decode(K),
case {SegId, Seg, Segments} of
{Id, Segment, _} ->
- {Ref, Id, Segment, Segments, F, [{K,V} | Acc], FinalAcc};
+ %% Still reading existing segment
+ Acc2 = {Itr, Id, Segment, Segments, F, [{K,V} | Acc], FinalAcc},
+ iterate(eleveldb:iterator_move(Itr, next), Acc2);
{Id, _, [Seg|Remaining]} ->
- {Ref, Id, Seg, Remaining, F, [{K,V}], [{Segment, F(Acc)} | FinalAcc]};
+ %% Pointing at next segment we are interested in
+ Acc2 = {Itr, Id, Seg, Remaining, F, [{K,V}], [{Segment, F(Acc)} | FinalAcc]},
+ iterate(eleveldb:iterator_move(Itr, next), Acc2);
+ {Id, _, [NextSeg|Remaining]} ->
+ %% Pointing at uninteresting segment, seek to next interesting one
+ Acc2 = {Itr, Id, NextSeg, Remaining, F, [], [{Segment, F(Acc)} | FinalAcc]},
+ Seek = encode(Id, NextSeg, <<>>),
+ iterate(eleveldb:iterator_move(Itr, Seek), Acc2);
_ ->
- throw({break, Segments, [{Segment, F(Acc)} | FinalAcc]})
+ %% Done with traversal
+ AllAcc
end.
compare(Level, Bucket, Tree, Remote, KeyAcc) when Level == Tree#state.levels+1 ->
@@ -494,6 +490,25 @@ peval(L) ->
L3.
%%%===================================================================
+%%% EUnit
+%%%===================================================================
+
+%% Verify that `update_tree/1' generates a snapshot of the underlying
+%% LevelDB store that is used by `compare', therefore isolating the
+%% compare from newer/concurrent insertions into the tree.
+snapshot_test() ->
+ A0 = insert(<<"10">>, <<"42">>, new()),
+ B0 = insert(<<"10">>, <<"52">>, new()),
+ A1 = update_tree(A0),
+ B1 = update_tree(B0),
+ B2 = insert(<<"10">>, <<"42">>, B1),
+ KeyDiff = local_compare(A1, B1),
+ destroy(A1),
+ destroy(B2),
+ ?assertEqual([{different, <<"10">>}], KeyDiff),
+ ok.
+
+%%%===================================================================
%%% EQC
%%%===================================================================

0 comments on commit 08386dd

Please sign in to comment.
Something went wrong with that request. Please try again.