Improve internal replicator configuration #25

Closed
wants to merge 1 commit into
from
Jump to file or symbol
Failed to load files and symbols.
+108 −35
Split
View
@@ -1,59 +1,125 @@
-module(mem3_rep).
--export([go/2, changes_enumerator/3, make_local_id/2]).
+-export([go/2, go/3, changes_enumerator/3, make_local_id/2]).
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
--record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
-
-go(DbName, Node) when is_binary(DbName), is_atom(Node) ->
- go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node});
-
-go(#shard{} = Source, #shard{} = Target) ->
- LocalId = make_local_id(Source, Target),
+-record(acc, {
+ batch_size,
+ batch_count,
+ revcount = 0,
+ infos = [],
+ seq,
+ localid,
+ source,
+ target,
+ filter
+}).
+
+go(Source, Target) ->
+ go(Source, Target, []).
+
+go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
+ go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
+
+go(#shard{} = Source, #shard{} = Target, Opts) ->
+ BatchSize = case proplists:get_value(batch_size, Opts) of
+ BS when is_integer(BS), BS > 0 -> BS;
+ _ -> 100
+ end,
+ BatchCount = case proplists:get_value(batch_count, Opts) of
+ all -> all;
+ BC when is_integer(BC), BC > 0 -> BC;
+ _ -> 1
+ end,
+ Filter = proplists:get_value(filter, Opts),
+ LocalId = make_local_id(Source, Target, Filter),
+ Acc = #acc{
+ batch_size = BatchSize,
+ batch_count = BatchCount,
+ localid = LocalId,
+ source = Source,
+ target = Target,
+ filter = Filter
+ },
+ go(Acc).
+
+go(#acc{source=Source, batch_count=BC}=Acc) ->
case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
{ok, Db} ->
- try
- go(Db, Target, LocalId)
+ Resp = try
+ repl(Db, Acc)
catch error:{not_found, no_db_file} ->
{error, missing_target}
after
couch_db:close(Db)
+ end,
+ case Resp of
+ {ok, P} when P > 0, BC == all ->
+ go(Acc);
+ {ok, P} when P > 0, BC > 1 ->
+ go(Acc#acc{batch_count=BC-1});
+ Else ->
+ Else
end;
{not_found, no_db_file} ->
{error, missing_source}
end.
-go(#db{name = DbName, seq_tree = Bt} = Db, #shard{} = Target, LocalId) ->
+repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
- Seq = calculate_start_seq(Db, Target, LocalId),
- Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
+ Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
+ Acc1 = Acc0#acc{source=Db, seq=Seq},
Fun = fun ?MODULE:changes_enumerator/3,
- {ok, _, AccOut} = couch_btree:fold(Bt, Fun, Acc0, [{start_key, Seq + 1}]),
- {ok, #acc{seq = LastSeq}} = replicate_batch(AccOut),
- case couch_db:count_changes_since(Db, LastSeq) of
- 0 ->
- ok;
- N ->
- exit({pending_changes, N})
- end.
+ {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+ {ok, couch_db:count_changes_since(Db, LastSeq)}.
-make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
+make_local_id(#shard{}=Source, #shard{}=Target) ->
+ make_local_id(Source, Target, undefined).
+
+make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
- <<"_local/shard-sync-", S/binary, "-", T/binary>>.
-
-changes_enumerator(FullDocInfo, _, #acc{revcount = C} = Acc) when C >= 99 ->
- #doc_info{high_seq = Seq} = couch_doc:to_doc_info(FullDocInfo),
- {stop, Acc#acc{seq = Seq, infos = [FullDocInfo | Acc#acc.infos]}};
-
-changes_enumerator(FullDocInfo, _, #acc{revcount = C, infos = Infos} = Acc) ->
- #doc_info{high_seq = Seq, revs = Revs} = couch_doc:to_doc_info(FullDocInfo),
- Count = C + length(Revs),
- {ok, Acc#acc{seq = Seq, revcount = Count, infos = [FullDocInfo | Infos]}}.
+ F = case is_function(Filter) of
+ true ->
+ {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
+ B = couch_util:encodeBase64Url(Hash),
+ <<"-", B/binary>>;
+ false ->
+ <<>>
+ end,
+ <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+
+changes_enumerator(FDI, _, #acc{revcount=C, infos=Infos}=Acc0) ->
+ #doc_info{
+ high_seq=Seq,
+ revs=Revs
+ } = couch_doc:to_doc_info(FDI),
+ {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of
+ keep -> {C + length(Revs), [FDI | Infos]};
+ discard -> {C, Infos}
+ end,
+ Acc1 = Acc0#acc{
+ seq=Seq,
+ revcount=Count,
+ infos=NewInfos
+ },
+ Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
+ {Go, Acc1}.
+
+filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
+ try Filter(FullDocInfo) of
+ discard -> discard;
+ _ -> keep
+ catch _:_ ->
+ keep
+ end;
+filter_doc(_, _) ->
+ keep.
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
@@ -107,7 +173,7 @@ rexi_call(Node, MFA) ->
{Ref, Error} ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
- erlang:error({rexi_DOWN, Reason})
+ erlang:error({rexi_DOWN, {Node, Reason}})
after 600000 ->
erlang:error(timeout)
end
View
@@ -179,8 +179,15 @@ handle_replication_exit(State, Pid) ->
end,
{noreply, NewState}.
-start_push_replication(#job{name=Name, node=Node}) ->
- spawn_link(mem3_rep, go, [Name, Node]).
+start_push_replication(#job{name=Name, node=Node, pid=From}) ->
+ spawn_link(fun() ->
+ case mem3_rep:go(Name, Node) of
+ {ok, Pending} when Pending > 0 ->
+ exit({pending_changes, Pending});
+ _ ->
+ ok
+ end
+ end).
add_to_queue(State, #job{name=DbName, node=Node} = Job) ->
#state{dict=D, waiting=Waiting} = State,