Permalink
Browse files

Introduce snapshots to improve compaction

A snapshot is just a db_header that contains a unique id, a pointer to
the header and a pointer to the previous header. The snapshots thus form
a linked list. When compaction runs the list is traversed to find the
oldest snapshot and compaction is performed in chunks, a snapshot at a
time walking back up the list. This allows docs that have have not been
edited to be copied only once. If compaction crashes or is restarted to
top off, it begins at the next snapshot after the update_seq.

BugzID:12639
  • Loading branch information...
1 parent 0b00ac4 commit 41298f64602878e1b2709ee3e0393a61c380a6a7 Bob Dionne committed Sep 28, 2011
Showing with 133 additions and 45 deletions.
  1. +4 −1 apps/couch/include/couch_db.hrl
  2. +5 −1 apps/couch/src/couch_db.erl
  3. +109 −41 apps/couch/src/couch_db_updater.erl
  4. +15 −2 apps/couch/src/couch_file.erl
@@ -133,7 +133,10 @@
purge_seq = 0,
purged_docs = nil,
security_ptr = nil,
- revs_limit = 1000
+ revs_limit = 1000,
+ snapshot_id = 0,
+ snapshot_curr = nil,
+ snapshot_prev = nil
}).
-record(db,
@@ -12,7 +12,8 @@
-module(couch_db).
--export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
+-export([open/2,open_int/2,close/1,create/2,start_compact/1,
+ take_snapshot/1, get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
@@ -128,6 +129,9 @@ start_compact(#db{main_pid=Pid}) ->
{ok, _} = gen_server:call(Pid, start_compact),
ok.
+take_snapshot(#db{main_pid=Pid}) ->
+ gen_server:call(Pid, take_snapshot).
+
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
@@ -17,6 +17,9 @@
-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+% temporary export for debugging
+-export([all_snapshots/1]).
+
-include("couch_db.hrl").
@@ -61,6 +64,14 @@ handle_call(start_compact, _From, Db) ->
{noreply, NewDb} = handle_cast(start_compact, Db),
{reply, {ok, NewDb#db.compactor_pid}, NewDb};
+handle_call(take_snapshot, _From, #db{fd=Fd, header=Header}=Db) ->
+ {ok, CurrentPos} = couch_file:bytes(Fd),
+ NewHeader = Header#db_header{snapshot_id=Header#db_header.snapshot_id + 1,
+ snapshot_curr=CurrentPos,
+ snapshot_prev=Header#db_header.snapshot_curr},
+ couch_file:write_header(Fd,NewHeader),
+ {reply, ok, Db#db{header=NewHeader}};
+
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
@@ -881,58 +892,89 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
copy_compact(Db, NewDb0, Retry) ->
FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
- NewDb = NewDb0#db{fsync_options=FsyncOptions},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- EnumBySeqFun =
- fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
- case DocInfo of
- #full_doc_info{update_seq=Seq} ->
- ok;
- #doc_info{high_seq=Seq} ->
- ok
- end,
- couch_task_status:update("Copied ~p of ~p changes (~p%)",
- [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
- if TotalCopied rem 1000 =:= 0 ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- if TotalCopied rem 10000 =:= 0 ->
- NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
- {ok, {NewDb3, [], TotalCopied + 1}};
- true ->
- {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
- end
+ NewDb1 = NewDb0#db{fsync_options=FsyncOptions},
+ TotalChanges = couch_db:count_changes_since(Db, NewDb1#db.update_seq),
+
+ Headers = compute_headers(Db, NewDb1#db.update_seq),
+
+ NewDb2 =
+ lists:foldl(fun(Ss, NewDb) ->
+ DbSs = init_db(Db#db.name, Db#db.filepath, Db#db.fd, Ss),
+ copy_compact_snapshot(DbSs, NewDb, Retry, TotalChanges)
+ end, NewDb1, Headers),
+ if NewDb2#db.security /= Db#db.security ->
+ {ok, Ptr} = couch_file:append_term(NewDb2#db.fd, Db#db.security),
+ NewDb3 = NewDb2#db{security=Db#db.security, security_ptr=Ptr};
+ true ->
+ NewDb3 = NewDb2
end,
+ commit_data(NewDb3#db{update_seq=Db#db.update_seq}).
+copy_compact_snapshot(DbSs, NewDb, Retry, TotalChanges) ->
+ EnumBySeqFun =
+ fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ case DocInfo of
+ #full_doc_info{update_seq=Seq} ->
+ ok;
+ #doc_info{high_seq=Seq} ->
+ ok
+ end,
+ couch_task_status:update(
+ "Copied ~p of ~p changes (~p%)",
+ [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
+ if TotalCopied rem 1000 =:= 0 ->
+ NewDb2 =
+ copy_docs(DbSs, AccNewDb,
+ lists:reverse([DocInfo | AccUncopied]), Retry),
+ if TotalCopied rem 10000 =:= 0 ->
+ NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
+ {ok, {NewDb3, [], TotalCopied + 1}};
+ true ->
+ {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
+ end
+ end,
couch_task_status:set_update_frequency(500),
-
- {ok, _, {NewDb2, Uncopied, TotalChanges}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0},
- [{start_key, NewDb#db.update_seq + 1}]),
+ {ok, _, {NewDb4, Uncopied, _TotalChanges}} =
+ couch_btree:foldl(DbSs#db.seq_tree, EnumBySeqFun,
+ {NewDb, [], 0},
+ [{start_key, NewDb#db.update_seq + 1},
+ {end_key, DbSs#db.update_seq}]),
couch_task_status:update("Flushing"),
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+ NewDb5 = copy_docs(DbSs, NewDb4, lists:reverse(Uncopied), Retry),
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
+ %% snapshot is taken in compact file for each snapshot in the original.
+ %% the current header may not be a snapshot
+ NewDb6 = commit_data(NewDb5#db{update_seq=DbSs#db.update_seq}),
+ if (DbSs#db.header#db_header.snapshot_id > 0) ->
+ snap(NewDb6);
+ true ->
+ NewDb6
+ end.
+
+snap(#db{fd=Fd, header=Header}=Db) ->
+ {ok, CurrentPos} = couch_file:bytes(Fd),
+ NewHeader = Header#db_header{snapshot_id=Header#db_header.snapshot_id + 1,
+ snapshot_curr=CurrentPos,
+ snapshot_prev=Header#db_header.snapshot_curr},
+ couch_file:write_header(Fd,NewHeader),
+ Db#db{header=NewHeader}.
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
-start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
+start_copy_compact(#db{name=Name,
+ filepath=Filepath,
+ header=#db_header{purge_seq=PurgeSeq}}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
- couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
+ couch_task_status:add_task(<<"Database Compaction">>,
+ <<Name/binary, " retry">>,
+ <<"Starting">>),
Retry = true,
case couch_file:read_header(Fd) of
{ok, Header} ->
@@ -941,7 +983,9 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
ok = couch_file:write_header(Fd, Header=#db_header{})
end;
{error, enoent} ->
- couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
+ couch_task_status:add_task(<<"Database Compaction">>,
+ Name,
+ <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
@@ -950,7 +994,8 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
NewDb2 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
{ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs),
- NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ NewDb#db{header=Header#db_header{purge_seq=PurgeSeq,
+ purged_docs=Pointer}};
true ->
NewDb
end,
@@ -960,3 +1005,26 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
close_db(NewDb3),
gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).
+compute_headers(Db, UpdateSeq) ->
+ io:format("only want snapshots after ~p ~n",[UpdateSeq]),
+ [Ss || #db_header{update_seq=Seq}=Ss <- all_snapshots(Db),
+ Seq > UpdateSeq].
+
+all_snapshots(#db{fd=Fd, header=Header}) ->
+ if Header#db_header.snapshot_curr == nil ->
+ [Header];
+ true ->
+ {ok, Ss} =
+ couch_file:read_header_at(Fd, Header#db_header.snapshot_curr),
+ previous_snapshots(Fd, Ss, [Ss, Header])
+ end.
+
+previous_snapshots(Fd, Ss, Acc) ->
+ case Ss#db_header.snapshot_prev of
+ nil ->
+ Acc;
+ PrevPos ->
+ {ok, NextSs} = couch_file:read_header_at(Fd, PrevPos),
+ previous_snapshots(Fd, NextSs, [NextSs | Acc])
+ end.
+
@@ -25,7 +25,8 @@
-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
--export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
+-export([pread_binary/2, read_header/1, read_header_at/2,
+ truncate/2, upgrade_old_header/2]).
-export([append_term_md5/2,append_binary_md5/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
-export([delete/2,delete/3,init_delete_dir/1]).
@@ -223,6 +224,14 @@ read_header(Fd) ->
Else
end.
+read_header_at(Fd, Pos) ->
+ case gen_server:call(Fd, {find_header_at, Pos}, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+
write_header(Fd, Data) ->
Bin = term_to_binary(Data),
Md5 = couch_util:md5(Bin),
@@ -377,7 +386,11 @@ handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call({find_header_at, Pos}, _From, #file{fd=Fd}=File) ->
+ Block = Pos div ?SIZE_BLOCK + 1,
+ {reply, load_header(Fd, Block), File}.
% 09 UPGRADE CODE
-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header

0 comments on commit 41298f6

Please sign in to comment.