Skip to content

Commit

Permalink
Use couchstore compactor
Browse files Browse the repository at this point in the history
Change-Id: Id66cccb3873bb40d2d594f5148f19244e07b5b3d
  • Loading branch information
apage43 committed May 3, 2012
1 parent ab63772 commit c3939f0
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 32 deletions.
2 changes: 2 additions & 0 deletions etc/couchdb/default.ini.tpl.in
Expand Up @@ -28,6 +28,8 @@ max_parallel_indexers = 4
max_parallel_replica_indexers = 2
; BTree Implmentation to use (native or erlang)
btree_implementation = native
consistency_check_precompacted = true
consistency_check_compacted = true

[database_compaction]
; larger buffer sizes can originate smaller files
Expand Down
2 changes: 1 addition & 1 deletion src/couchdb/couch_db_consistency_check.erl
Expand Up @@ -17,7 +17,6 @@
-include("couch_db.hrl").

check_db_file(Filename) when is_list(Filename) or is_binary(Filename)->
?LOG_DEBUG("validation process for db file \"~s\"", [Filename]),
Fd = case couch_file:open(Filename) of
{ok, Fd0} ->
Fd0;
Expand All @@ -41,6 +40,7 @@ check_db_file(Filename) when is_list(Filename) or is_binary(Filename)->
couch_file:close(Fd);
check_db_file(Db) ->
Filename = Db#db.filepath,
?LOG_INFO("Doing consistency check for db file \"~s\"", [Filename]),
% first scan the by_sequence index
Count = couch_db:count_changes_since(Db, 0),
EtsById = ets:new(couch_db_consistency_check_name, [set,private]),
Expand Down
97 changes: 66 additions & 31 deletions src/couchdb/couch_db_updater.erl
Expand Up @@ -742,7 +742,56 @@ initial_copy_compact(#db{docinfo_by_seq_btree=SrcBySeq,
NewDb#db{docinfo_by_seq_btree=NewBySeqBtree,
docinfo_by_id_btree=DestById#btree{root=NewByIdRoot}}.

start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
fd_to_db(#db{name=Name, header=#db_header{purge_seq=PurgeSeq}}=Db, CompactFile, Header, Fd) ->
NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options),
NewDb2 =
if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
{ok, Pointer, _} = couch_file:append_term(Fd, PurgedIdsRevs),
NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
true ->
NewDb
end,
unlink(Fd),
NewDb2.

make_target_db(Db, CompactFile) ->
case couch_file:open(CompactFile) of
{ok, Fd} ->
case couch_file:read_header_bin(Fd) of
{ok, NewHeaderBin} ->
Header = header_bin_to_db_header(NewHeaderBin),
{ok, fd_to_db(Db, CompactFile, Header, Fd)};
no_valid_header ->
{error, no_valid_header}
end;
{error, enoent} ->
{ok, Fd} = couch_file:open(CompactFile, [create]),
Header=#db_header{},
HeaderBin = db_header_to_header_bin(Header),
ok = couch_file:write_header_bin(Fd, HeaderBin),
{ok, fd_to_db(Db, CompactFile, Header, Fd)}
end.

native_initial_compact(#db{filepath=Filepath}=Db, CompactFile) ->
ok = couch_file:flush(Db#db.fd),
CompactCmd = os:find_executable("couch_compact"),
try
Compactor = open_port({spawn_executable, CompactCmd}, [{args, [Filepath, CompactFile]}, exit_status]),
receive {Compactor, {exit_status, Status}} ->
case Status of
0 ->
make_target_db(Db, CompactFile);
Error ->
{error, {exit_status, Error}}
end
end
catch
T:E ->
{error, {T, E}}
end.

start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_config:get("couchdb", "consistency_check_precompacted", "false") of
Expand All @@ -751,37 +800,23 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
_ ->
ok
end,
case couch_file:open(CompactFile) of
{ok, Fd} ->
Retry = true,
case couch_file:read_header_bin(Fd) of
{ok, NewHeaderBin} ->
Header = header_bin_to_db_header(NewHeaderBin),
ok;
no_valid_header ->
Header=#db_header{},
HeaderBin = db_header_to_header_bin(Header),
ok = couch_file:write_header_bin(Fd, HeaderBin)
end;
{error, enoent} ->
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
Header=#db_header{},
HeaderBin = db_header_to_header_bin(Header),
ok = couch_file:write_header_bin(Fd, HeaderBin)
end,
NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options),
NewDb2 = if PurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
{ok, Pointer, _} = couch_file:append_term(Fd, PurgedIdsRevs),
NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
true ->
NewDb
% Compact it
NewDb = case file:read_file_info(CompactFile) of
{ok, _} -> % Catch up
{ok, TargetDB} = make_target_db(Db, CompactFile),
copy_compact(Db, TargetDB, true);
{error, enoent} -> % Initial compact
case native_initial_compact(Db, CompactFile) of
{ok, CompactedDb} ->
?LOG_DEBUG("Native initial compact succeeded for \"~s\"", [Name]),
CompactedDb;
{error, _} ->
?LOG_DEBUG("Doing fallback compact for \"~s\"", [Name]),
{ok, TargetDB} = make_target_db(Db, CompactFile),
copy_compact(Db, TargetDB, false)
end
end,
unlink(Fd),

NewDb3 = copy_compact(Db, NewDb2, Retry),
close_db(NewDb3),
close_db(NewDb),
case couch_config:get("couchdb", "consistency_check_compacted", "false") of
"true" ->
couch_db_consistency_check:check_db_file(CompactFile);
Expand Down

0 comments on commit c3939f0

Please sign in to comment.