From 196d81cd19549c19d8962a21c84fb12a8fd26aad Mon Sep 17 00:00:00 2001 From: Bob Dionne Date: Sun, 19 Aug 2012 11:38:28 -0400 Subject: [PATCH 1/3] Allow the use of checkpoint to be optional A new parameter use_checkpoints which defaults to be true can be either used in the replicator section of ini files and/or used in the replication command or doc added to the replicator db. When this option is set to false no checkpoints are taken. It should be used with caution and only for small dbs. BugzID:14327 This is cherry-picked from the BigCouch branch. The additioanl code_change/3 case was removed as Apache CouchDB currently doesn't support hot code swapping. --- src/couch_replicator/src/couch_replicator.erl | 9 +++++++-- src/couch_replicator/src/couch_replicator_utils.erl | 6 +++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 1ce2cf545ed..67971e9ffdf 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -68,7 +68,8 @@ target_db_compaction_notifier = nil, source_monitor = nil, target_monitor = nil, - source_seq = nil + source_seq = nil, + use_checkpoints = true }). @@ -563,7 +564,8 @@ init_state(Rep) -> start_db_compaction_notifier(Target, self()), source_monitor = db_monitor(Source), target_monitor = db_monitor(Target), - source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ) + source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), + use_checkpoints = get_value(use_checkpoints, Options, true) }, State#rep_state{timer = start_timer(State)}. @@ -672,6 +674,9 @@ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> checkpoint_interval(_State) -> 5000. +do_checkpoint(#rep_state{use_checkpoints=false} = State) -> + NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} }, + {ok, NewState}; do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) -> SourceCurSeq = source_cur_seq(State), NewState = State#rep_state{source_seq = SourceCurSeq}, diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index d7778db871b..1646c691d3a 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -228,6 +228,7 @@ make_options(Props) -> DefConns = couch_config:get("replicator", "http_connections", "20"), DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"), DefRetries = couch_config:get("replicator", "retries_per_request", "10"), + UseCheckpoints = couch_config:get("replicator", "use_checkpoints", "true"), {ok, DefSocketOptions} = couch_util:parse_term( couch_config:get("replicator", "socket_options", "[{keepalive, true}, {nodelay, false}]")), @@ -237,7 +238,8 @@ make_options(Props) -> {http_connections, list_to_integer(DefConns)}, {socket_options, DefSocketOptions}, {worker_batch_size, list_to_integer(DefBatchSize)}, - {worker_processes, list_to_integer(DefWorkers)} + {worker_processes, list_to_integer(DefWorkers)}, + {use_checkpoints, list_to_existing_atom(UseCheckpoints)} ])). @@ -279,6 +281,8 @@ convert_options([{<<"socket_options">>, V} | R]) -> [{socket_options, SocketOptions} | convert_options(R)]; convert_options([{<<"since_seq">>, V} | R]) -> [{since_seq, V} | convert_options(R)]; +convert_options([{<<"use_checkpoints">>, V} | R]) -> + [{use_checkpoints, V} | convert_options(R)]; convert_options([_ | R]) -> % skip unknown option convert_options(R). From f22de9ed2ef347fedc9092a6569db77875cbb70f Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Tue, 16 Jul 2013 00:26:03 +0200 Subject: [PATCH 2/3] Tests for replicator use_checkpoints option --- src/couch_replicator/Makefile.am | 3 +- .../test/07-use-checkpoints.t | 256 ++++++++++++++++++ 2 files changed, 258 insertions(+), 1 deletion(-) create mode 100755 src/couch_replicator/test/07-use-checkpoints.t diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am index 71d37028077..2dcd47dfdbf 100644 --- a/src/couch_replicator/Makefile.am +++ b/src/couch_replicator/Makefile.am @@ -41,7 +41,8 @@ test_files = \ test/03-replication-compact.t \ test/04-replication-large-atts.t \ test/05-replication-many-leaves.t \ - test/06-doc-missing-stubs.t + test/06-doc-missing-stubs.t \ + test/07-use-checkpoints.t compiled_files = \ ebin/couch_replicator_api_wrap.beam \ diff --git a/src/couch_replicator/test/07-use-checkpoints.t b/src/couch_replicator/test/07-use-checkpoints.t new file mode 100755 index 00000000000..cefc1a7e367 --- /dev/null +++ b/src/couch_replicator/test/07-use-checkpoints.t @@ -0,0 +1,256 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Verify that compacting databases that are being used as the source or +% target of a replication doesn't affect the replication and that the +% replication doesn't hold their reference counters forever. + +-define(b2l(B), binary_to_list(B)). + +-record(user_ctx, { + name = null, + roles = [], + handler +}). + +-record(doc, { + id = <<"">>, + revs = {0, []}, + body = {[]}, + atts = [], + deleted = false, + meta = [] +}). + +-record(db, { + main_pid = nil, + update_pid = nil, + compactor_pid = nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + fd, + updater_fd, + fd_ref_counter, + header = nil, + committed_update_seq, + fulldocinfo_by_id_btree, + docinfo_by_seq_btree, + local_docs_btree, + update_seq, + name, + filepath, + validate_doc_funs = [], + security = [], + security_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000, + fsync_options = [], + options = [], + compression, + before_doc_update, + after_doc_read +}). + +-record(rep, { + id, + source, + target, + options, + user_ctx, + doc_id +}). + + +source_db_name() -> <<"couch_test_rep_db_a">>. +target_db_name() -> <<"couch_test_rep_db_b">>. + + +main(_) -> + test_util:init_code_path(), + + etap:plan(16), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + + +test() -> + couch_server_sup:start_link(test_util:config_files()), + ibrowse:start(), + + test_use_checkpoints(false), + test_use_checkpoints(true), + + couch_server_sup:stop(), + ok. + + +test_use_checkpoints(UseCheckpoints) -> + Pairs = [ + {source_db_name(), target_db_name()}, + {{remote, source_db_name()}, target_db_name()}, + {source_db_name(), {remote, target_db_name()}}, + {{remote, source_db_name()}, {remote, (target_db_name())}} + ], + + ListenerFun = case UseCheckpoints of + false -> + fun({finished, _, {CheckpointHistory}}) -> + etap:is(CheckpointHistory, + [{<<"use_checkpoints">>,false}], + "No checkpoints found"); + (_) -> + ok + end; + true -> + fun({finished, _, {CheckpointHistory}}) -> + SessionId = lists:keyfind( + <<"session_id">>, 1, CheckpointHistory), + etap:isnt(SessionId, false, "There's a checkpoint"); + (_) -> + ok + end + end, + {ok, Listener} = couch_replicator_notifier:start_link(ListenerFun), + + lists:foreach( + fun({Source, Target}) -> + {ok, SourceDb} = create_db(source_db_name()), + etap:diag("Populating source database"), + populate_db(SourceDb, 100), + ok = couch_db:close(SourceDb), + + etap:diag("Creating target database"), + {ok, TargetDb} = create_db(target_db_name()), + ok = couch_db:close(TargetDb), + + etap:diag("Setup replicator notifier listener"), + + etap:diag("Triggering replication"), + replicate(Source, Target, UseCheckpoints), + + etap:diag("Replication finished, comparing source and target databases"), + compare_dbs(SourceDb, TargetDb), + + etap:diag("Deleting databases"), + delete_db(TargetDb), + delete_db(SourceDb), + + ok = timer:sleep(1000) + end, + Pairs), + + couch_replicator_notifier:stop(Listener). + + +populate_db(Db, DocCount) -> + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), + Value = iolist_to_binary(["val", integer_to_list(DocIdCounter)]), + Doc = #doc{ + id = Id, + body = {[ {<<"value">>, Value} ]} + }, + [Doc | Acc] + end, + [], lists:seq(1, DocCount)), + {ok, _} = couch_db:update_docs(Db, Docs, []). + + +compare_dbs(#db{name = SourceName}, #db{name = TargetName}) -> + {ok, SourceDb} = couch_db:open_int(SourceName, []), + {ok, TargetDb} = couch_db:open_int(TargetName, []), + Fun = fun(FullDocInfo, _, Acc) -> + {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), + {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), + DocId = couch_util:get_value(<<"_id">>, Props), + DocTarget = case couch_db:open_doc(TargetDb, DocId) of + {ok, DocT} -> + DocT; + Error -> + etap:bail("Error opening document '" ++ ?b2l(DocId) ++ + "' from target: " ++ couch_util:to_list(Error)) + end, + DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), + case DocTargetJson of + DocJson -> + ok; + _ -> + etap:bail("Content from document '" ++ ?b2l(DocId) ++ + "' differs in target database") + end, + {ok, Acc} + end, + {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), + etap:diag("Target database has the same documents as the source database"), + ok = couch_db:close(SourceDb), + ok = couch_db:close(TargetDb). + + +db_url(DbName) -> + iolist_to_binary([ + "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"), + ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), + "/", DbName + ]). + + +create_db(DbName) -> + {ok, Db} = couch_db:create( + DbName, + [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]), + couch_db:close(Db), + {ok, Db}. + + +delete_db(#db{name = DbName, main_pid = Pid}) -> + ok = couch_server:delete( + DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]), + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, _Reason} -> + ok + after 30000 -> + etap:bail("Timeout deleting database") + end. + + +replicate({remote, Db}, Target, UseCheckpoints) -> + replicate(db_url(Db), Target, UseCheckpoints); + +replicate(Source, {remote, Db}, UseCheckpoints) -> + replicate(Source, db_url(Db), UseCheckpoints); + +replicate(Source, Target, UseCheckpoints) -> + RepObject = {[ + {<<"source">>, Source}, + {<<"target">>, Target}, + {<<"use_checkpoints">>, UseCheckpoints} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc( + RepObject, #user_ctx{roles = [<<"_admin">>]}), + {ok, Pid} = couch_replicator:async_replicate(Rep), + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, Reason} -> + etap:is(Reason, normal, "Replication finished successfully") + after 300000 -> + etap:bail("Timeout waiting for replication to finish") + end. From d50af89faa913c3fd55af7b54f6af63500c70de2 Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Tue, 16 Jul 2013 00:34:31 +0200 Subject: [PATCH 3/3] Add documentation for use_checkpoints replicator option --- share/doc/src/json-structure.rst | 3 +++ share/doc/src/replicator.rst | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/share/doc/src/json-structure.rst b/share/doc/src/json-structure.rst index c4089b9eea9..67b4173e928 100644 --- a/share/doc/src/json-structure.rst +++ b/share/doc/src/json-structure.rst @@ -271,6 +271,9 @@ Replication Settings | | filter function; value should be a document | | | containing parameters as members | +--------------------------------+---------------------------------------------+ +| use_checkpoints (optional) | Whether to use replication checkpoints | +| | or not | ++--------------------------------+---------------------------------------------+ .. _replication-status: diff --git a/share/doc/src/replicator.rst b/share/doc/src/replicator.rst index 1a40c65fb2f..b7d8d3e6ef8 100644 --- a/share/doc/src/replicator.rst +++ b/share/doc/src/replicator.rst @@ -19,7 +19,8 @@ A database where you ``PUT``/``POST`` documents to trigger replications and you ``DELETE`` to cancel ongoing replications. These documents have exactly the same content as the JSON objects we used to ``POST`` to ``_replicate`` (fields ``source``, ``target``, ``create_target``, -``continuous``, ``doc_ids``, ``filter``, ``query_params``. +``continuous``, ``doc_ids``, ``filter``, ``query_params``, +``use_checkpoints``). Replication documents can have a user defined ``_id``. Design documents (and ``_local`` documents) added to the replicator database are ignored.