Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Add a new mem3_rpc module for replication RPCs
This is intended to make the local/remote code execution contexts a lot
more clear.
  • Loading branch information
davisp authored and rnewson committed Jul 23, 2014
1 parent ade6ab1 commit fcbc821b4f9cd3fa66124860ebce921a8fe428f0
Showing 2 changed files with 79 additions and 26 deletions.
@@ -133,7 +133,7 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, [ejson_body]) of
{ok, #doc{body = {SProps}}} ->
Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
#doc{body = {TProps}} ->
SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
@@ -185,14 +185,19 @@ find_missing_revs(Acc) ->
#doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
{Id, [R || #rev_info{rev=R} <- RevInfos]}
end, Infos),
Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
{io_priority, {internal_repl, Name}},
{user_ctx, ?CTX}
]).


save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
mem3_rpc:update_docs(Node, Name, Docs, [
replicated_changes,
full_commit,
{user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}
]),
ok.


@@ -219,26 +224,10 @@ update_locals(Acc) ->
{<<"timestamp">>, list_to_binary(iso8601_timestamp())}
]}},
{ok, _} = couch_db:update_doc(Db, Doc, []),
Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).


rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive {Ref, {ok, Reply}} ->
Reply;
{Ref, Error} ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
erlang:error({rexi_DOWN, {Node, Reason}})
after 600000 ->
erlang:error(timeout)
end
after
rexi_monitor:stop(Mon)
end.
mem3_rpc:save_checkpoint(Node, Name, Doc, [
{user_ctx, ?CTX},
{io_priority, {internal_repl, Name}}
]).


filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
@@ -0,0 +1,64 @@
% Copyright 2013 Cloudant
%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(mem3_rpc).


-export([
get_missing_revs/4,
update_docs/4,
load_checkpoint/4,
save_checkpoint/4
]).


-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").


-define(CTX, #user_ctx{roles = [<<"_admin">>]}).


get_missing_revs(Node, DbName, IdsRevs, Options) ->
rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).


update_docs(Node, DbName, Docs, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).


load_checkpoint(Node, DbName, DocId, Opts) ->
rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).


save_checkpoint(Node, DbName, Doc, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).


rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive {Ref, {ok, Reply}} ->
Reply;
{Ref, Error} ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
erlang:error({rexi_DOWN, {Node, Reason}})
after 600000 ->
erlang:error(timeout)
end
after
rexi_monitor:stop(Mon)
end.

0 comments on commit fcbc821

Please sign in to comment.