Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Update whitespace and exports formatting
  • Loading branch information
davisp authored and rnewson committed Jul 23, 2014
1 parent 88a6491 commit 27e315be18d1dc0584a08a5baac06370cfeca912
Showing 1 changed file with 31 additions and 1 deletion.
@@ -12,13 +12,25 @@

-module(mem3_rep).

-export([go/2, go/3, changes_enumerator/3, make_local_id/2]).

-export([
go/2,
go/3,
make_local_id/2
]).

-export([
changes_enumerator/3
]).


-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").


-define(CTX, #user_ctx{roles = [<<"_admin">>]}).


-record(acc, {
batch_size,
batch_count,
@@ -32,12 +44,15 @@
db
}).


go(Source, Target) ->
go(Source, Target, []).


go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);


go(#shard{} = Source, #shard{} = Target, Opts) ->
mem3_sync_security:maybe_sync(Source, Target),
BatchSize = case proplists:get_value(batch_size, Opts) of
@@ -61,6 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
},
go(Acc).


go(#acc{source=Source, batch_count=BC}=Acc0) ->
case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
{ok, Db} ->
@@ -84,6 +100,7 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
{error, missing_source}
end.


repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
@@ -93,9 +110,11 @@ repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.


make_local_id(#shard{}=Source, #shard{}=Target) ->
make_local_id(Source, Target, undefined).


make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
@@ -109,6 +128,7 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
end,
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.


changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
changes_enumerator(FDI, Reds, Acc);
@@ -130,6 +150,7 @@ changes_enumerator(#full_doc_info{}=FDI, _,
Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
{Go, Acc1}.


filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
try Filter(FullDocInfo) of
discard -> discard;
@@ -140,6 +161,7 @@ filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
filter_doc(_, _) ->
keep.


replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -150,6 +172,7 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
update_locals(Acc),
{ok, Acc#acc{revcount=0, infos=[]}}.


find_missing_revs(Acc) ->
#acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
IdsRevs = lists:map(fun(FDI) ->
@@ -159,18 +182,21 @@ find_missing_revs(Acc) ->
Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).


open_docs(#acc{source=Source, infos=Infos}, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
open_doc_revs(Source, FDI, Revs)
end, Missing).


save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
ok.


update_locals(Acc) ->
#acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
#shard{name=Name, node=Node} = Target,
@@ -183,6 +209,7 @@ update_locals(Acc) ->
Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).


rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
@@ -200,6 +227,7 @@ rexi_call(Node, MFA) ->
rexi_monitor:stop(Mon)
end.


calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
@@ -216,12 +244,14 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
0
end.


open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
{FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
end, FoundRevs).


iso8601_timestamp() ->
{_,_,Micro} = Now = os:timestamp(),
{{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),

0 comments on commit 27e315b

Please sign in to comment.