Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don’t worry, you can still create the pull request.
  • 3 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
Commits on Sep 23, 2011
Bob Dionne whitespace 0b00ac4
Commits on Sep 30, 2011
Bob Dionne Introduce snapshots to improve compaction
A snapshot is just a db_header that contains a unique id, a pointer to
the header and a pointer to the previous header. The snapshots thus form
a linked list. When compaction runs the list is traversed to find the
oldest snapshot and compaction is performed in chunks, a snapshot at a
time walking back up the list. This allows docs that have have not been
edited to be copied only once. If compaction crashes or is restarted to
top off, it begins at the next snapshot after the update_seq.

BugzID:12639
41298f6
Bob Dionne Fix off by one error
That allowed the first doc to be copied over as TotalCopied begins with 0
6008831
Showing with 141 additions and 53 deletions.
  1. +4 −1 apps/couch/include/couch_db.hrl
  2. +13 −9 apps/couch/src/couch_db.erl
  3. +109 −41 apps/couch/src/couch_db_updater.erl
  4. +15 −2 apps/couch/src/couch_file.erl
View
5 apps/couch/include/couch_db.hrl
@@ -133,7 +133,10 @@
purge_seq = 0,
purged_docs = nil,
security_ptr = nil,
- revs_limit = 1000
+ revs_limit = 1000,
+ snapshot_id = 0,
+ snapshot_curr = nil,
+ snapshot_prev = nil
}).
-record(db,
View
22 apps/couch/src/couch_db.erl
@@ -12,7 +12,8 @@
-module(couch_db).
--export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
+-export([open/2,open_int/2,close/1,create/2,start_compact/1,
+ take_snapshot/1, get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
@@ -128,6 +129,9 @@ start_compact(#db{main_pid=Pid}) ->
{ok, _} = gen_server:call(Pid, start_compact),
ok.
+take_snapshot(#db{main_pid=Pid}) ->
+ gen_server:call(Pid, take_snapshot).
+
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
@@ -154,7 +158,7 @@ apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.
-
+
apply_open_options2(Doc,[]) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
@@ -615,11 +619,11 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
-
+
LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,
-
+
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = LoadPrevRevFun(),
@@ -629,7 +633,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
Doc2 = Doc,
GetDiskDocFun = LoadPrevRevFun
end,
-
+
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
{[Doc2 | AccValidated], AccErrors2};
@@ -724,7 +728,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
{[Doc | DocsAcc], NonRepDocsAcc}
end
end, {[], []}, Docs),
-
+
DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
@@ -764,9 +768,9 @@ update_docs(Db, Docs, Options, interactive_edit) ->
check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
-
+
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
-
+
ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
fun(#doc{id=Id,revs={Pos, RevIds}}) ->
@@ -852,7 +856,7 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
(Att) ->
Att#att{revpos=RevPos+1}
end, Atts)}.
-
+
doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
View
150 apps/couch/src/couch_db_updater.erl
@@ -17,6 +17,9 @@
-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+% temporary export for debugging
+-export([all_snapshots/1]).
+
-include("couch_db.hrl").
@@ -61,6 +64,14 @@ handle_call(start_compact, _From, Db) ->
{noreply, NewDb} = handle_cast(start_compact, Db),
{reply, {ok, NewDb#db.compactor_pid}, NewDb};
+handle_call(take_snapshot, _From, #db{fd=Fd, header=Header}=Db) ->
+ {ok, CurrentPos} = couch_file:bytes(Fd),
+ NewHeader = Header#db_header{snapshot_id=Header#db_header.snapshot_id + 1,
+ snapshot_curr=CurrentPos,
+ snapshot_prev=Header#db_header.snapshot_curr},
+ couch_file:write_header(Fd,NewHeader),
+ {reply, ok, Db#db{header=NewHeader}};
+
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
@@ -881,58 +892,89 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
copy_compact(Db, NewDb0, Retry) ->
FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
- NewDb = NewDb0#db{fsync_options=FsyncOptions},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- EnumBySeqFun =
- fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
- case DocInfo of
- #full_doc_info{update_seq=Seq} ->
- ok;
- #doc_info{high_seq=Seq} ->
- ok
- end,
- couch_task_status:update("Copied ~p of ~p changes (~p%)",
- [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
- if TotalCopied rem 1000 =:= 0 ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- if TotalCopied rem 10000 =:= 0 ->
- NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
- {ok, {NewDb3, [], TotalCopied + 1}};
- true ->
- {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
- end
+ NewDb1 = NewDb0#db{fsync_options=FsyncOptions},
+ TotalChanges = couch_db:count_changes_since(Db, NewDb1#db.update_seq),
+
+ Headers = compute_headers(Db, NewDb1#db.update_seq),
+
+ NewDb2 =
+ lists:foldl(fun(Ss, NewDb) ->
+ DbSs = init_db(Db#db.name, Db#db.filepath, Db#db.fd, Ss),
+ copy_compact_snapshot(DbSs, NewDb, Retry, TotalChanges)
+ end, NewDb1, Headers),
+ if NewDb2#db.security /= Db#db.security ->
+ {ok, Ptr} = couch_file:append_term(NewDb2#db.fd, Db#db.security),
+ NewDb3 = NewDb2#db{security=Db#db.security, security_ptr=Ptr};
+ true ->
+ NewDb3 = NewDb2
end,
+ commit_data(NewDb3#db{update_seq=Db#db.update_seq}).
+copy_compact_snapshot(DbSs, NewDb, Retry, TotalChanges) ->
+ EnumBySeqFun =
+ fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ case DocInfo of
+ #full_doc_info{update_seq=Seq} ->
+ ok;
+ #doc_info{high_seq=Seq} ->
+ ok
+ end,
+ couch_task_status:update(
+ "Copied ~p of ~p changes (~p%)",
+ [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
+ if (TotalCopied + 1) rem 1000 =:= 0 ->
+ NewDb2 =
+ copy_docs(DbSs, AccNewDb,
+ lists:reverse([DocInfo | AccUncopied]), Retry),
+ if (TotalCopied + 1) rem 10000 =:= 0 ->
+ NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
+ {ok, {NewDb3, [], TotalCopied + 1}};
+ true ->
+ {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
+ end
+ end,
couch_task_status:set_update_frequency(500),
-
- {ok, _, {NewDb2, Uncopied, TotalChanges}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0},
- [{start_key, NewDb#db.update_seq + 1}]),
+ {ok, _, {NewDb4, Uncopied, _TotalChanges}} =
+ couch_btree:foldl(DbSs#db.seq_tree, EnumBySeqFun,
+ {NewDb, [], 0},
+ [{start_key, NewDb#db.update_seq + 1},
+ {end_key, DbSs#db.update_seq}]),
couch_task_status:update("Flushing"),
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+ NewDb5 = copy_docs(DbSs, NewDb4, lists:reverse(Uncopied), Retry),
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
+ %% snapshot is taken in compact file for each snapshot in the original.
+ %% the current header may not be a snapshot
+ NewDb6 = commit_data(NewDb5#db{update_seq=DbSs#db.update_seq}),
+ if (DbSs#db.header#db_header.snapshot_id > 0) ->
+ snap(NewDb6);
+ true ->
+ NewDb6
+ end.
+
+snap(#db{fd=Fd, header=Header}=Db) ->
+ {ok, CurrentPos} = couch_file:bytes(Fd),
+ NewHeader = Header#db_header{snapshot_id=Header#db_header.snapshot_id + 1,
+ snapshot_curr=CurrentPos,
+ snapshot_prev=Header#db_header.snapshot_curr},
+ couch_file:write_header(Fd,NewHeader),
+ Db#db{header=NewHeader}.
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
-start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
+start_copy_compact(#db{name=Name,
+ filepath=Filepath,
+ header=#db_header{purge_seq=PurgeSeq}}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
- couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
+ couch_task_status:add_task(<<"Database Compaction">>,
+ <<Name/binary, " retry">>,
+ <<"Starting">>),
Retry = true,
case couch_file:read_header(Fd) of
{ok, Header} ->
@@ -941,7 +983,9 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
ok = couch_file:write_header(Fd, Header=#db_header{})
end;
{error, enoent} ->
- couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
+ couch_task_status:add_task(<<"Database Compaction">>,
+ Name,
+ <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
@@ -950,7 +994,8 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
NewDb2 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
{ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs),
- NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ NewDb#db{header=Header#db_header{purge_seq=PurgeSeq,
+ purged_docs=Pointer}};
true ->
NewDb
end,
@@ -960,3 +1005,26 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
close_db(NewDb3),
gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).
+compute_headers(Db, UpdateSeq) ->
+ io:format("only want snapshots after ~p ~n",[UpdateSeq]),
+ [Ss || #db_header{update_seq=Seq}=Ss <- all_snapshots(Db),
+ Seq > UpdateSeq].
+
+all_snapshots(#db{fd=Fd, header=Header}) ->
+ if Header#db_header.snapshot_curr == nil ->
+ [Header];
+ true ->
+ {ok, Ss} =
+ couch_file:read_header_at(Fd, Header#db_header.snapshot_curr),
+ previous_snapshots(Fd, Ss, [Ss, Header])
+ end.
+
+previous_snapshots(Fd, Ss, Acc) ->
+ case Ss#db_header.snapshot_prev of
+ nil ->
+ Acc;
+ PrevPos ->
+ {ok, NextSs} = couch_file:read_header_at(Fd, PrevPos),
+ previous_snapshots(Fd, NextSs, [NextSs | Acc])
+ end.
+
View
17 apps/couch/src/couch_file.erl
@@ -25,7 +25,8 @@
-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
--export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
+-export([pread_binary/2, read_header/1, read_header_at/2,
+ truncate/2, upgrade_old_header/2]).
-export([append_term_md5/2,append_binary_md5/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
-export([delete/2,delete/3,init_delete_dir/1]).
@@ -223,6 +224,14 @@ read_header(Fd) ->
Else
end.
+read_header_at(Fd, Pos) ->
+ case gen_server:call(Fd, {find_header_at, Pos}, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+
write_header(Fd, Data) ->
Bin = term_to_binary(Data),
Md5 = couch_util:md5(Bin),
@@ -377,7 +386,11 @@ handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call({find_header_at, Pos}, _From, #file{fd=Fd}=File) ->
+ Block = Pos div ?SIZE_BLOCK + 1,
+ {reply, load_header(Fd, Block), File}.
% 09 UPGRADE CODE
-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header

No commit comments for this range

Something went wrong with that request. Please try again.