Skip to content

Commit 03bc164

Browse files
vmxrnewson
authored andcommitted
Tests for replicator use_checkpoints option
1 parent 7c23a6e commit 03bc164

File tree

2 files changed

+258
-1
lines changed

2 files changed

+258
-1
lines changed

src/couch_replicator/Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ test_files = \
4141
test/03-replication-compact.t \
4242
test/04-replication-large-atts.t \
4343
test/05-replication-many-leaves.t \
44-
test/06-doc-missing-stubs.t
44+
test/06-doc-missing-stubs.t \
45+
test/07-use-checkpoints.t
4546

4647
compiled_files = \
4748
ebin/couch_replicator_api_wrap.beam \
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
#!/usr/bin/env escript
2+
%% -*- erlang -*-
3+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4+
% use this file except in compliance with the License. You may obtain a copy of
5+
% the License at
6+
%
7+
% http://www.apache.org/licenses/LICENSE-2.0
8+
%
9+
% Unless required by applicable law or agreed to in writing, software
10+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
% License for the specific language governing permissions and limitations under
13+
% the License.
14+
15+
% Verify that compacting databases that are being used as the source or
16+
% target of a replication doesn't affect the replication and that the
17+
% replication doesn't hold their reference counters forever.
18+
19+
-define(b2l(B), binary_to_list(B)).
20+
21+
-record(user_ctx, {
22+
name = null,
23+
roles = [],
24+
handler
25+
}).
26+
27+
-record(doc, {
28+
id = <<"">>,
29+
revs = {0, []},
30+
body = {[]},
31+
atts = [],
32+
deleted = false,
33+
meta = []
34+
}).
35+
36+
-record(db, {
37+
main_pid = nil,
38+
update_pid = nil,
39+
compactor_pid = nil,
40+
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
41+
fd,
42+
updater_fd,
43+
fd_ref_counter,
44+
header = nil,
45+
committed_update_seq,
46+
fulldocinfo_by_id_btree,
47+
docinfo_by_seq_btree,
48+
local_docs_btree,
49+
update_seq,
50+
name,
51+
filepath,
52+
validate_doc_funs = [],
53+
security = [],
54+
security_ptr = nil,
55+
user_ctx = #user_ctx{},
56+
waiting_delayed_commit = nil,
57+
revs_limit = 1000,
58+
fsync_options = [],
59+
options = [],
60+
compression,
61+
before_doc_update,
62+
after_doc_read
63+
}).
64+
65+
-record(rep, {
66+
id,
67+
source,
68+
target,
69+
options,
70+
user_ctx,
71+
doc_id
72+
}).
73+
74+
75+
source_db_name() -> <<"couch_test_rep_db_a">>.
76+
target_db_name() -> <<"couch_test_rep_db_b">>.
77+
78+
79+
main(_) ->
80+
test_util:init_code_path(),
81+
82+
etap:plan(16),
83+
case (catch test()) of
84+
ok ->
85+
etap:end_tests();
86+
Other ->
87+
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
88+
etap:bail(Other)
89+
end,
90+
ok.
91+
92+
93+
test() ->
94+
couch_server_sup:start_link(test_util:config_files()),
95+
ibrowse:start(),
96+
97+
test_use_checkpoints(false),
98+
test_use_checkpoints(true),
99+
100+
couch_server_sup:stop(),
101+
ok.
102+
103+
104+
test_use_checkpoints(UseCheckpoints) ->
105+
Pairs = [
106+
{source_db_name(), target_db_name()},
107+
{{remote, source_db_name()}, target_db_name()},
108+
{source_db_name(), {remote, target_db_name()}},
109+
{{remote, source_db_name()}, {remote, (target_db_name())}}
110+
],
111+
112+
ListenerFun = case UseCheckpoints of
113+
false ->
114+
fun({finished, _, {CheckpointHistory}}) ->
115+
etap:is(CheckpointHistory,
116+
[{<<"use_checkpoints">>,false}],
117+
"No checkpoints found");
118+
(_) ->
119+
ok
120+
end;
121+
true ->
122+
fun({finished, _, {CheckpointHistory}}) ->
123+
SessionId = lists:keyfind(
124+
<<"session_id">>, 1, CheckpointHistory),
125+
etap:isnt(SessionId, false, "There's a checkpoint");
126+
(_) ->
127+
ok
128+
end
129+
end,
130+
{ok, Listener} = couch_replicator_notifier:start_link(ListenerFun),
131+
132+
lists:foreach(
133+
fun({Source, Target}) ->
134+
{ok, SourceDb} = create_db(source_db_name()),
135+
etap:diag("Populating source database"),
136+
populate_db(SourceDb, 100),
137+
ok = couch_db:close(SourceDb),
138+
139+
etap:diag("Creating target database"),
140+
{ok, TargetDb} = create_db(target_db_name()),
141+
ok = couch_db:close(TargetDb),
142+
143+
etap:diag("Setup replicator notifier listener"),
144+
145+
etap:diag("Triggering replication"),
146+
replicate(Source, Target, UseCheckpoints),
147+
148+
etap:diag("Replication finished, comparing source and target databases"),
149+
compare_dbs(SourceDb, TargetDb),
150+
151+
etap:diag("Deleting databases"),
152+
delete_db(TargetDb),
153+
delete_db(SourceDb),
154+
155+
ok = timer:sleep(1000)
156+
end,
157+
Pairs),
158+
159+
couch_replicator_notifier:stop(Listener).
160+
161+
162+
populate_db(Db, DocCount) ->
163+
Docs = lists:foldl(
164+
fun(DocIdCounter, Acc) ->
165+
Id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
166+
Value = iolist_to_binary(["val", integer_to_list(DocIdCounter)]),
167+
Doc = #doc{
168+
id = Id,
169+
body = {[ {<<"value">>, Value} ]}
170+
},
171+
[Doc | Acc]
172+
end,
173+
[], lists:seq(1, DocCount)),
174+
{ok, _} = couch_db:update_docs(Db, Docs, []).
175+
176+
177+
compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
178+
{ok, SourceDb} = couch_db:open_int(SourceName, []),
179+
{ok, TargetDb} = couch_db:open_int(TargetName, []),
180+
Fun = fun(FullDocInfo, _, Acc) ->
181+
{ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
182+
{Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
183+
DocId = couch_util:get_value(<<"_id">>, Props),
184+
DocTarget = case couch_db:open_doc(TargetDb, DocId) of
185+
{ok, DocT} ->
186+
DocT;
187+
Error ->
188+
etap:bail("Error opening document '" ++ ?b2l(DocId) ++
189+
"' from target: " ++ couch_util:to_list(Error))
190+
end,
191+
DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
192+
case DocTargetJson of
193+
DocJson ->
194+
ok;
195+
_ ->
196+
etap:bail("Content from document '" ++ ?b2l(DocId) ++
197+
"' differs in target database")
198+
end,
199+
{ok, Acc}
200+
end,
201+
{ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
202+
etap:diag("Target database has the same documents as the source database"),
203+
ok = couch_db:close(SourceDb),
204+
ok = couch_db:close(TargetDb).
205+
206+
207+
db_url(DbName) ->
208+
iolist_to_binary([
209+
"http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
210+
":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
211+
"/", DbName
212+
]).
213+
214+
215+
create_db(DbName) ->
216+
{ok, Db} = couch_db:create(
217+
DbName,
218+
[{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
219+
couch_db:close(Db),
220+
{ok, Db}.
221+
222+
223+
delete_db(#db{name = DbName, main_pid = Pid}) ->
224+
ok = couch_server:delete(
225+
DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
226+
MonRef = erlang:monitor(process, Pid),
227+
receive
228+
{'DOWN', MonRef, process, Pid, _Reason} ->
229+
ok
230+
after 30000 ->
231+
etap:bail("Timeout deleting database")
232+
end.
233+
234+
235+
replicate({remote, Db}, Target, UseCheckpoints) ->
236+
replicate(db_url(Db), Target, UseCheckpoints);
237+
238+
replicate(Source, {remote, Db}, UseCheckpoints) ->
239+
replicate(Source, db_url(Db), UseCheckpoints);
240+
241+
replicate(Source, Target, UseCheckpoints) ->
242+
RepObject = {[
243+
{<<"source">>, Source},
244+
{<<"target">>, Target},
245+
{<<"use_checkpoints">>, UseCheckpoints}
246+
]},
247+
{ok, Rep} = couch_replicator_utils:parse_rep_doc(
248+
RepObject, #user_ctx{roles = [<<"_admin">>]}),
249+
{ok, Pid} = couch_replicator:async_replicate(Rep),
250+
MonRef = erlang:monitor(process, Pid),
251+
receive
252+
{'DOWN', MonRef, process, Pid, Reason} ->
253+
etap:is(Reason, normal, "Replication finished successfully")
254+
after 300000 ->
255+
etap:bail("Timeout waiting for replication to finish")
256+
end.

0 commit comments

Comments
 (0)