Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
Checking mergeability… Don’t worry, you can still create the pull request.
  • 2 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Commits on Aug 28, 2011
@davisp davisp Implementation of efficient btree copying.
If we have a source of sorted k/v data its possible to efficiently
construct a btree by streaming them to disk and building the upper btree
nodes on demand. This process also has the nice property that it results
in zero garbage for the new btree.

The two downsides to this process are that it can't be resumed if
interrupted and can not be merged into an existing btree.

The couch_btree build API looks like this:

    % Some preparation
    {ok, Fd} = couch_file:open("newbtree.dat", [create]),
    {ok, InitBtree} = couch_btree:open(nil, Fd),

    % Build the new tree.
    Init = couch_btree:build_init(InitBtree),
    Final = lists:foldl(fun(I, Acc) ->
        couch_btree:build_add({I, I}, Acc)
    end, Init, lists:seq(1, 1000)),
    {ok, NewBtree} = couch_btree:build_finish(Final).

Original concept from Filipe Manana
a17ed18
@davisp davisp Improve compaction efficiency.
Databases that are updated randomly by doc id cause compaction to be
noticably less efficient than possible. This is due to the order in
which the id_tree is written. The random updates cause the append only
btree to generate a noticeable amount of waste.

This patch works by writing the id_tree to a temporary file during
compaction and then streams this btree back to the compaction file
during a final pass.
251bc76
View
76 apps/couch/src/couch_btree.erl
@@ -15,6 +15,7 @@
-export([open/2, open/3, query_modify/4, add/2, add_remove/3]).
-export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]).
-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
+-export([copy/2, build_init/1, build_add/2, build_finish/1]).
-record(btree,
{fd,
@@ -687,3 +688,78 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) ->
{stop, {PrevKVs, Reds}, Acc2}
end
end.
+
+
+copy(Src, Fd) ->
+ Dst = Src#btree{fd=Fd, root=nil},
+ FoldFun = fun(KV, _Red, Acc) ->
+ {ok, build_add(Acc, extract(Src, KV))}
+ end,
+ InitAcc = build_init(Dst),
+ {ok, _, FinalAcc} = foldl(Src, FoldFun, InitAcc),
+ build_finish(FinalAcc).
+
+
+build_init(Bt) ->
+ ChunkSize = couch_config:get("couchdb", "btree_chunk_size", "1279"),
+ ChunkSizeInt = list_to_integer(ChunkSize),
+ {Bt, ChunkSizeInt, nil, [{[], 0}]}.
+
+
+build_add({Bt, ChunkSize, nil, Nodes}, {K, _}=KV) ->
+ % First key insertion
+ NewNodes = build_add(Bt, ChunkSize, kv_node, KV, Nodes),
+ {Bt, ChunkSize, {prev, K}, NewNodes};
+build_add({Bt, ChunkSize, {prev,P}, Nodes}, {K, _}=KV) ->
+ % Each new key must be strictly greater than the
+ % previous key inserted.
+ case less(Bt, P, K) and not less(Bt, K, P) of
+ false -> throw({build_error, invalid_key_order});
+ _ -> ok
+ end,
+ NodeList = build_add(Bt, ChunkSize, kv_node, KV, Nodes),
+ {Bt, ChunkSize, {prev, K}, NodeList}.
+
+
+build_add(_Bt, _ChunkSize, _NodeType, Entry, [{[], 0} | RestNodes]) ->
+ [{[Entry], erlang:external_size(Entry)} | RestNodes];
+build_add(Bt, ChunkSize, kp_node, Entry, []) ->
+ build_add(Bt, ChunkSize, kp_node, Entry, [{[], 0}]);
+build_add(Bt, ChunkSize, NodeType, Entry, [{NodeAcc0, Size0} | RestNodes]) ->
+ NodeAcc = [Entry | NodeAcc0],
+ Size = Size0 + erlang:external_size(Entry),
+ case Size > ChunkSize of
+ true ->
+ Node = lists:reverse(NodeAcc),
+ {LastKey, _} = Entry,
+ {ok, Ptr} = couch_file:append_term(Bt#btree.fd, {NodeType, Node}),
+ Red = reduce_node(Bt, NodeType, Node),
+ KP = {LastKey, {Ptr, Red}},
+ NewNodes = build_add(Bt, ChunkSize, kp_node, KP, RestNodes),
+ [{[], 0} | NewNodes];
+ false ->
+ [{NodeAcc, Size} | RestNodes]
+ end.
+
+
+build_finish({Bt, _ChunkSize, _Prev, NodeList}) ->
+ {ok, Bt#btree{root=build_finish(Bt, kv_node, NodeList)}}.
+
+
+build_finish(_Bt, _NodeType, [{[], _}]) ->
+ nil;
+build_finish(Bt, _NodeType, [{[], _} | RestNodes]) ->
+ build_finish(Bt, kp_node, RestNodes);
+build_finish(Bt, NodeType, [{NodeAcc, _} | RestNodes]) ->
+ Node = lists:reverse(NodeAcc),
+ {ok, Ptr} = couch_file:append_term(Bt#btree.fd, {NodeType, Node}),
+ Red = reduce_node(Bt, NodeType, Node),
+ case RestNodes of
+ [] ->
+ {Ptr, Red};
+ [{KPs, _Size} | RestRestNodes] ->
+ [{LastKey, _} | _Rest] = NodeAcc,
+ KP = {LastKey, {Ptr, Red}},
+ build_finish(Bt, kp_node, [{[KP | KPs], nil} | RestRestNodes])
+ end.
+
View
7 apps/couch/src/couch_db.erl
@@ -47,10 +47,11 @@ open_db_file(Filepath, Options) ->
{error, enoent} ->
% couldn't find file. is there a compact version? This can happen if
% crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact") of
+ case couch_file:open(Filepath ++ ".compact.data") of
{ok, Fd} ->
- ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
+ ?LOG_INFO("Found ~s~s compaction file, using as primary storage.",
+ [Filepath, ".compact.data"]),
+ ok = file:rename(Filepath ++ ".compact.data", Filepath),
ok = couch_file:sync(Filepath),
{ok, Fd};
{error, enoent} ->
View
236 apps/couch/src/couch_db_updater.erl
@@ -20,6 +20,13 @@
-include("couch_db.hrl").
+-record(comp_hdr, {
+ id=couch_uuids:random(),
+ db_hdr=#db_header{},
+ id_tree_state=nil
+}).
+
+
init({DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
@@ -28,18 +35,22 @@ init({DbName, Filepath, Fd, Options}) ->
ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
RootDir = couch_config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact");
+ couch_file:delete(RootDir, Filepath ++ ".compact.data"),
+ couch_file:delete(RootDir, Filepath ++ ".compact.meta");
false ->
ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
case couch_file:read_header(Fd) of
- {ok, Header} ->
+ {ok, #db_header{}=Header} ->
ok;
- no_valid_header ->
- % create a new header and writes it to the file
+ _ ->
+ % No valid #db_header found. Reset the file and start
+ % over from scratch.
Header = #db_header{},
+ ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact")
+ file:delete(Filepath ++ ".compact.data"),
+ file:delete(Filepath ++ ".compact.meta")
end
end,
@@ -204,6 +215,9 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
RootDir = couch_config:get("couchdb", "database_dir", "."),
couch_file:delete(RootDir, Filepath),
ok = file:rename(CompactFilepath, Filepath),
+ % Delete the old meta compaction file after promoting
+ % the compaction file.
+ couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
close_db(Db),
NewDb3 = refresh_validate_doc_funs(NewDb2),
ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
@@ -896,7 +910,7 @@ copy_compact(Db, NewDb0, Retry) ->
if TotalCopied rem 1000 =:= 0 ->
NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
if TotalCopied rem 10000 =:= 0 ->
- NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
+ NewDb3 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
{ok, {NewDb3, [], TotalCopied + 1}};
true ->
{ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
@@ -925,38 +939,192 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb4 = NewDb3
end,
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+ NewDb4#db{update_seq=Db#db.update_seq}.
-start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
- CompactFile = Filepath ++ ".compact",
+
+start_copy_compact(#db{}=Db) ->
+ #db{
+ name=Name,
+ filepath=Filepath,
+ header=#db_header{purge_seq=PurgeSeq}
+ } = Db,
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
- case couch_file:open(CompactFile) of
- {ok, Fd} ->
- couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
- Retry = true,
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- ok = couch_file:write_header(Fd, Header=#db_header{})
- end;
- {error, enoent} ->
- couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
- {ok, Fd} = couch_file:open(CompactFile, [create]),
- Retry = false,
- ok = couch_file:write_header(Fd, Header=#db_header{})
- end,
- NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = if PurgeSeq > 0 ->
+
+ {NewDb0, Retry} = open_compaction_db(Name, Filepath),
+
+ TName = if Retry -> <<Name/binary, " retry">>; true -> Name end,
+ couch_task_status:add_task(<<"Database Compaction">>,TName,<<"Starting">>),
+
+ NewDb1 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
- {ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs),
- NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ {ok, Pointer} = couch_file:append_term(NewDb0#db.fd, PurgedIdsRevs),
+ % Grab our piggy-backed header
+ #db{header=#comp_hdr{db_hdr=DbHdr}=CompHdr} = NewDb0,
+ NewCompHdr = CompHdr#comp_hdr{
+ db_hdr=DbHdr#db_header{
+ purge_seq=PurgeSeq,
+ purged_docs=Pointer
+ }
+ },
+ NewDb0#db{header=NewCompHdr};
true ->
- NewDb
+ NewDb0
+ end,
+
+ % This is a bit worrisome. init_db/4 will monitor the data fd
+ % but it doesn't know about the meta fd. For now I'll maintain
+ % that the data fd is the old normal fd and meta fd is special
+ % and hope everything works out for the best.
+ unlink(NewDb1#db.fd),
+
+ NewDb2 = copy_compact(Db, NewDb1, Retry),
+ NewDb3 = copy_meta_data(NewDb2),
+ couch_task_status:update("Finishing."),
+ NewDb4 = final_compaction_commit(NewDb3),
+ close_db(NewDb4),
+
+ gen_server:cast(Db#db.main_pid, {compact_done, NewDb4#db.filepath}).
+
+
+open_compaction_db(DbName, DbFilePath) ->
+ DataFile = DbFilePath ++ ".compact.data",
+ MetaFile = DbFilePath ++ ".compact.meta",
+ {ok, DataFd, DataHdr0} = open_compaction_file(DataFile),
+ {ok, MetaFd, MetaHdr0} = open_compaction_file(MetaFile),
+ {Header, Retry} = case {DataHdr0, MetaHdr0} of
+ {#db_header{}=DH, _} ->
+ % Resuming compaction either due to not caught up or
+ % there was an error after we sent the compact_done
+ % message.
+ %
+ % We reset the id_tree_state here because the meta
+ % file acts as a buffer that we slurp into the data
+ % file at the end of a compaction run.
+ Header0 = #comp_hdr{
+ db_hdr=DH#db_header{id_tree_state=nil},
+ id_tree_state=DH#db_header.id_tree_state
+ },
+ ok = reset_compaction_file(MetaFd, Header0),
+ {Header0, true};
+ {#comp_hdr{}=CHdr, CHdr} ->
+ % Same compaction headers. Resume where we left off.
+ {CHdr, true};
+ _ ->
+ % Unable to resume. Reset and restart.
+ Header0 = #comp_hdr{},
+ ok = reset_compaction_file(DataFd, Header0),
+ ok = reset_compaction_file(MetaFd, Header0),
+ {Header0, false}
+ end,
+
+ DbHdr = Header#comp_hdr.db_hdr,
+ NewDb = init_db(DbName, DataFile, DataFd, DbHdr),
+
+ {ok, IdBtree} = couch_btree:open(DbHdr#db_header.id_tree_state, MetaFd, [
+ {split, fun ?MODULE:btree_by_id_split/1},
+ {join, fun ?MODULE:btree_by_id_join/2},
+ {reduce, fun ?MODULE:btree_by_id_reduce/2}
+ ]),
+
+ CompHeader = Header#comp_hdr{db_hdr=NewDb#db.header},
+ {NewDb#db{header=CompHeader, id_tree=IdBtree}, Retry}.
+
+
+open_compaction_file(FilePath) ->
+ case couch_file:open(FilePath) of
+ {ok, Fd} ->
+ case couch_file:read_header(Fd) of
+ {ok, Header} -> {ok, Fd, Header};
+ no_valid_header -> {ok, Fd, nil}
+ end;
+ {error, enoent} ->
+ {ok, Fd} = couch_file:open(FilePath, [create]),
+ {ok, Fd, nil}
+ end.
+
+
+reset_compaction_file(Fd, Header) ->
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, Header).
+
+
+commit_compaction_data(#db{}=Db) ->
+ % Compaction needs to write headers to both the data file
+ % and the meta file so if we need to restart we can pick
+ % back up from where we left off.
+
+ % Extract our piggy-backed header.
+ #db{header=#comp_hdr{db_hdr=DbHdr}=CompHdr} = Db,
+
+ % XXX: Hack to extract the meta compaction fd
+ % from a btree record.
+ MetaFd = element(2, Db#db.id_tree),
+
+ % Fill out the new compaction header
+ case db_to_header(Db, DbHdr) of
+ DbHdr ->
+ Db;
+ NewDbHdr ->
+ NewCompHdr = CompHdr#comp_hdr{db_hdr=NewDbHdr},
+
+ % Commit meta first in case we die.
+ ok = couch_file:sync(MetaFd),
+ ok = couch_file:write_header(MetaFd, NewCompHdr),
+
+ ok = couch_file:sync(Db#db.fd),
+ ok = couch_file:write_header(Db#db.fd, NewCompHdr),
+
+ Db#db{header=NewCompHdr, committed_update_seq=Db#db.update_seq}
+ end.
+
+
+final_compaction_commit(#db{filepath=Filepath}=Db) ->
+ % Extract our piggy-backed header.
+ #db{header=#comp_hdr{db_hdr=DbHdr}} = Db,
+
+ % Make the real db commit.
+ CompDb = commit_data(Db#db{header=DbHdr}),
+
+ % Close out the meta file.
+ MetaFd = element(2, Db#db.id_tree),
+ couch_file:close(MetaFd),
+ RootDir = couch_config:get("couchdb", "database_dir", "."),
+ couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
+
+ % And done.
+ CompDb.
+
+
+copy_meta_data(#db{fd=Fd, header=CompHdr}=Db) ->
+ Src = Db#db.id_tree,
+ IdTreeState = CompHdr#comp_hdr.id_tree_state,
+ {ok, NewBtree} = case IdTreeState of
+ nil ->
+ couch_btree:copy(Src, Fd);
+ _ ->
+ {ok, Dst} = couch_btree:open(IdTreeState, Fd, [
+ {split, fun ?MODULE:btree_by_id_split/1},
+ {join, fun ?MODULE:btree_by_id_join/2},
+ {reduce, fun ?MODULE:btree_by_id_reduce/2}
+ ]),
+ merge_btrees(Src, Dst)
end,
- unlink(Fd),
+ Db#db{id_tree=NewBtree}.
- NewDb3 = copy_compact(Db, NewDb2, Retry),
- close_db(NewDb3),
- gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).
+merge_btrees(Src, Dst) ->
+ {ok, {NumDocIds, NumDelDocIds, _}} = couch_btree:full_reduce(Src),
+ TotalDocIds = NumDocIds + NumDelDocIds,
+ FoldFun = fun(KV, _Reds, {TreeAcc0, KVs, Copied}) ->
+ couch_task_status:update("Copied ~p of ~p ids (~p%)",
+ [Copied, TotalDocIds, (Copied*100) div TotalDocIds]),
+ case length(KVs) > 1000 of
+ true ->
+ {ok, TreeAcc1} = couch_btree:add(TreeAcc0, [KV | KVs]),
+ {ok, {TreeAcc1, [], Copied+length(KVs)+1}};
+ false ->
+ {ok, {TreeAcc0, [KV | KVs], Copied}}
+ end
+ end,
+ {ok, _, {Tree, KVs, _}} = couch_btree:foldl(Src, FoldFun, {Dst, [], 0}),
+ couch_btree:add(Tree, KVs).
View
9 apps/couch/src/couch_server.erl
@@ -323,9 +323,12 @@ handle_call({delete, DbName, _Options}, _From, Server) ->
Server#server{dbs_open=Server#server.dbs_open - 1}
end,
- %% Delete any leftover .compact files. If we don't do this a subsequent
- %% request for this DB will try to open the .compact file and use it.
- couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"),
+ %% Delete any leftover compaction files. If we don't do this a
+ %% subsequent request for this DB will try to reuse them.
+ DataCompactionFile = FullFilepath ++ ".compact.data",
+ MetaCompactionFile = FullFilepath ++ ".compact.meta",
+ couch_file:delete(Server#server.root_dir, DataCompactionFile),
+ couch_file:delete(Server#server.root_dir, MetaCompactionFile),
case couch_file:delete(Server#server.root_dir, FullFilepath) of
ok ->
View
88 apps/couch/test/etap/022-btree-copy.t
@@ -0,0 +1,88 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa ./src/couchdb -sasl errlog_type error -boot start_sasl -noshell
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+path(FileName) ->
+ test_util:build_file(FileName).
+
+
+main(_) ->
+ test_util:init_code_path(),
+ couch_config:start_link([]),
+ etap:plan(length(counts()) * 2),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally:~n~p", [Other])),
+ timer:sleep(333),
+ etap:bail()
+ end,
+ ok.
+
+
+test() ->
+ lists:foreach(fun(C) -> test_copy(C) end, counts()),
+ ok.
+
+
+counts() ->
+ [
+ 10, 20, 50, 100, 300, 500,
+ 700, 811, 2333, 6666, 9999, 15003,
+ 21477, 38888, 66069, 150123, 420789, 711321
+ ].
+
+
+reduce_fun() ->
+ fun
+ (reduce, KVs) -> length(KVs);
+ (rereduce, Reds) -> lists:sum(Reds)
+ end.
+
+
+test_copy(NumItems) ->
+ {ok, SrcBt0} = open_btree("./apps/couch/test/etap/temp.022.1"),
+ {ok, SrcBt} = load_btree(SrcBt0, NumItems),
+ Opts = [create, overwrite],
+ {ok, Fd} = couch_file:open(path("./apps/couch/test/etap/temp.022.2"), Opts),
+ {ok, DstBt} = couch_btree:copy(SrcBt, Fd),
+ check_same(SrcBt, DstBt).
+
+
+open_btree(Filename) ->
+ {ok, Fd} = couch_file:open(path(Filename), [create, overwrite]),
+ couch_btree:open(nil, Fd, [{reduce, reduce_fun()}]).
+
+
+load_btree(Bt, N) when N < 1000 ->
+ KVs = [{I, I} || I <- lists:seq(1, N)],
+ couch_btree:add(Bt, KVs);
+load_btree(Bt, N) ->
+ C = N - 1000,
+ KVs = [{I+C, I+C} || I <- lists:seq(1, 100)],
+ {ok, Bt1} = couch_btree:add(Bt, KVs),
+ load_btree(Bt1, C).
+
+
+check_same(Src, Dst) ->
+ {_, SrcRed} = couch_btree:get_state(Src),
+ {_, DstRed} = couch_btree:get_state(Dst),
+ etap:is(DstRed, SrcRed, "Same reduction value in copied btree."),
+ {ok, _, SrcKVs} = couch_btree:foldl(Src, fun(KV, A) -> {ok,[KV|A]} end, []),
+ {ok, _, DstKVs} = couch_btree:foldl(Dst, fun(KV, A) -> {ok,[KV|A]} end, []),
+ etap:is(SrcKVs, DstKVs, "Same key value pairs in copied btree.").
+

No commit comments for this range

Something went wrong with that request. Please try again.