Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Reorder functions into a logical progression
This just moves functions around in the mem3_rep module to give a better
logical progression. Purely stylistic but it should make things easier
to read and find.
  • Loading branch information
davisp authored and rnewson committed Jul 23, 2014
1 parent 27e315b commit ade6ab16709146f3196505e24067e5151d142148
Showing 1 changed file with 48 additions and 48 deletions.
@@ -101,16 +101,6 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
end.


repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
Acc1 = Acc0#acc{source=Db, seq=Seq},
Fun = fun ?MODULE:changes_enumerator/3,
{ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.


make_local_id(#shard{}=Source, #shard{}=Target) ->
make_local_id(Source, Target, undefined).

@@ -129,6 +119,33 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.


repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
Acc1 = Acc0#acc{source=Db, seq=Seq},
Fun = fun ?MODULE:changes_enumerator/3,
{ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.


calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
#doc{body = {TProps}} ->
SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
erlang:min(SourceSeq, TargetSeq)
catch error:{not_found, _} ->
0
end;
{not_found, _} ->
0
end.


changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
changes_enumerator(FDI, Reds, Acc);
@@ -151,17 +168,6 @@ changes_enumerator(#full_doc_info{}=FDI, _,
{Go, Acc1}.


filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
try Filter(FullDocInfo) of
discard -> discard;
_ -> keep
catch _:_ ->
keep
end;
filter_doc(_, _) ->
keep.


replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -183,18 +189,25 @@ find_missing_revs(Acc) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).


save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
ok.


open_docs(#acc{source=Source, infos=Infos}, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
open_doc_revs(Source, FDI, Revs)
end, Missing).


save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
ok.
open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
{FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
end, FoundRevs).


update_locals(Acc) ->
@@ -228,28 +241,15 @@ rexi_call(Node, MFA) ->
end.


calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
#doc{body = {TProps}} ->
SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
erlang:min(SourceSeq, TargetSeq)
catch error:{not_found, _} ->
0
end;
{not_found, _} ->
0
end.


open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
{FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
end, FoundRevs).
filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
try Filter(FullDocInfo) of
discard -> discard;
_ -> keep
catch _:_ ->
keep
end;
filter_doc(_, _) ->
keep.


iso8601_timestamp() ->

0 comments on commit ade6ab1

Please sign in to comment.