Permalink
Browse files

MB-7656 Header write/read functions now return header position

Change-Id: Ie3da2a0a9ed4f20bade5b8c4be24aee4e11965c4
Reviewed-on: http://review.couchbase.org/28816
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
1 parent 7845758 commit 79038773b2ed4beaa1e1bf2e889fcff4141ff49e @fdmanana fdmanana committed with fdmanana Sep 5, 2013
@@ -726,7 +726,7 @@ handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = Stat
% Compactor might have received a group snapshot from an updater.
NewGroup = fix_updater_group(NewGroup0, Group),
HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
- ok = couch_file:write_header_bin(NewGroup#set_view_group.fd, HeaderBin),
+ {ok, _Pos} = couch_file:write_header_bin(NewGroup#set_view_group.fd, HeaderBin),
if is_pid(UpdaterPid) ->
?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compact group"
" up to date - restarting updater",
@@ -1455,7 +1455,7 @@ prepare_group({RootDir, SetName, Group0}, ForceReset)->
{ok, reset_file(Fd, Group)};
true ->
case (catch couch_file:read_header_bin(Fd)) of
- {ok, HeaderBin} ->
+ {ok, HeaderBin, _Pos} ->
HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
_ ->
HeaderSig = <<>>,
@@ -1753,7 +1753,7 @@ reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
},
EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
- ok = couch_file:write_header_bin(Fd, EmptyHeaderBin),
+ {ok, _Pos} = couch_file:write_header_bin(Fd, EmptyHeaderBin),
init_group(Fd, reset_group(EmptyGroup), EmptyHeader).
@@ -1803,7 +1803,7 @@ commit_header(Group) ->
-spec commit_header(#set_view_group{}, boolean()) -> 'ok'.
commit_header(Group, Fsync) ->
HeaderBin = couch_set_view_util:group_to_header_bin(Group),
- ok = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
+ {ok, _Pos} = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
case Fsync of
true ->
ok = couch_file:sync(Group#set_view_group.fd);
@@ -2908,7 +2908,7 @@ process_partial_update(State, NewGroup0) ->
}
end,
HeaderBin = couch_set_view_util:group_to_header_bin(NewState#state.group),
- ok = couch_file:write_header_bin(Fd, HeaderBin),
+ {ok, _Pos} = couch_file:write_header_bin(Fd, HeaderBin),
Listeners2 = notify_update_listeners(NewState, Listeners, NewState#state.group),
ok = couch_file:flush(Fd),
NewState#state{update_listeners = Listeners2}.
@@ -24,7 +24,7 @@ check_db_file(Filename) when is_list(Filename) or is_binary(Filename)->
fatal_error("Couldn't open file ~s:~p", [Filename, Error])
end,
Header = try
- {ok, NewHeaderBin} = couch_file:read_header_bin(Fd),
+ {ok, NewHeaderBin, _Pos} = couch_file:read_header_bin(Fd),
couch_db_updater:header_bin_to_db_header(NewHeaderBin)
catch
Type0:Error0 ->
@@ -28,19 +28,19 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header_bin(Fd, db_header_to_header_bin(Header)),
+ {ok, _Pos} = couch_file:write_header_bin(Fd, db_header_to_header_bin(Header)),
% delete any old compaction files that might be hanging around
RootDir = couch_config:get("couchdb", "database_dir", "."),
couch_file:delete(RootDir, Filepath ++ ".compact");
false ->
case couch_file:read_header_bin(Fd) of
- {ok, BinHeader} ->
+ {ok, BinHeader, _Pos} ->
Header = header_bin_to_db_header(BinHeader),
ok;
no_valid_header ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header_bin(Fd,
+ {ok, _Pos} = couch_file:write_header_bin(Fd,
db_header_to_header_bin(Header)),
% delete any old compaction files that might be hanging around
file2:delete(Filepath ++ ".compact")
@@ -194,7 +194,7 @@ handle_call({compact_done, _Path}, _From, #db{compactor_info = nil} = Db) ->
handle_call({compact_done, CompactFilepath}, _From, Db) ->
#db{filepath = Filepath, fd = OldFd} = Db,
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeaderBin} = couch_file:read_header_bin(NewFd),
+ {ok, NewHeaderBin, _Pos} = couch_file:read_header_bin(NewFd),
NewHeader = header_bin_to_db_header(NewHeaderBin),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
@@ -643,7 +643,7 @@ commit_data(Db, _) ->
_ -> ok
end,
- ok = couch_file:write_header_bin(Fd, db_header_to_header_bin(Header)),
+ {ok, _Pos} = couch_file:write_header_bin(Fd, db_header_to_header_bin(Header)),
ok = couch_file:flush(Fd),
case lists:member(after_header, FsyncOptions) of
@@ -827,7 +827,7 @@ make_target_db(Db, CompactFile) ->
case couch_file:open(CompactFile) of
{ok, Fd} ->
case couch_file:read_header_bin(Fd) of
- {ok, NewHeaderBin} ->
+ {ok, NewHeaderBin, _Pos} ->
Header = header_bin_to_db_header(NewHeaderBin),
{ok, fd_to_db(Db, CompactFile, Header, Fd)};
no_valid_header ->
@@ -837,7 +837,7 @@ make_target_db(Db, CompactFile) ->
{ok, Fd} = couch_file:open(CompactFile, [create]),
Header=#db_header{},
HeaderBin = db_header_to_header_bin(Header),
- ok = couch_file:write_header_bin(Fd, HeaderBin),
+ {ok, _Pos} = couch_file:write_header_bin(Fd, HeaderBin),
{ok, fd_to_db(Db, CompactFile, Header, Fd)}
end.
View
@@ -253,8 +253,8 @@ init_delete_dir(RootDir) ->
read_header(Fd) ->
case gen_server:call(Fd, find_header, infinity) of
- {ok, Bin} ->
- {ok, binary_to_term(Bin)};
+ {ok, Bin, Pos} ->
+ {ok, binary_to_term(Bin), Pos};
Else ->
Else
end.
@@ -286,7 +286,7 @@ write_header_bin(Fd, Bin) ->
Crc32 = erlang:crc32(Bin),
% now we assemble the final header binary and write to disk
FinalBin = <<Crc32:32, Bin/binary>>,
- ok = gen_server:call(Fd, {write_header, FinalBin}, infinity).
+ {ok, _Pos} = gen_server:call(Fd, {write_header, FinalBin}, infinity).
% server functions
@@ -382,16 +382,16 @@ handle_call({append_bin, Bin}, From, #file{writer = W, eof = Pos} = File) ->
{noreply, File#file{eof = Pos + Size}};
handle_call({write_header, Bin}, From, #file{writer = W, eof = Pos} = File) ->
- gen_server:reply(From, ok),
W ! {header, Bin},
Pos2 = case Pos rem ?SIZE_BLOCK of
0 ->
- Pos + 5;
+ Pos;
BlockOffset ->
- Pos + 5 + (?SIZE_BLOCK - BlockOffset)
+ Pos + (?SIZE_BLOCK - BlockOffset)
end,
+ gen_server:reply(From, {ok, Pos2}),
File2 = File#file{
- eof = Pos2 + calculate_total_read_len(5, byte_size(Bin))
+ eof = Pos2 + 5 + calculate_total_read_len(5, byte_size(Bin))
},
{noreply, File2};
@@ -491,7 +491,7 @@ find_header(_Fd, -1) ->
find_header(Fd, Block) ->
case (catch load_header(Fd, Block)) of
{ok, Bin} ->
- {ok, Bin};
+ {ok, Bin, Block * ?SIZE_BLOCK};
_Error ->
find_header(Fd, Block -1)
end.
@@ -294,7 +294,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
if CommittedSeq >= Group#group.current_seq ->
% save the header
Header = {Group#group.sig, get_index_header_data(Group)},
- ok = couch_file:write_header(Group#group.fd, Header),
+ {ok, _Pos} = couch_file:write_header(Group#group.fd, Header),
{noreply, State#group_state{waiting_commit=false}};
true ->
% We can't commit the header because the database seq that's fully
@@ -418,7 +418,7 @@ prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
{ok, Db, reset_file(Db, Fd, DDocDbName, Group)};
true ->
case (catch couch_file:read_header(Fd)) of
- {ok, {Sig, HeaderInfo}} ->
+ {ok, {Sig, HeaderInfo}, _Pos} ->
% sigs match!
{ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
_ ->
@@ -639,7 +639,7 @@ reset_group(#group{views=Views}=Group) ->
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, {Sig, nil}),
+ {ok, _Pos} = couch_file:write_header(Fd, {Sig, nil}),
init_group(Db, Fd, reset_group(Group), nil).
delete_index_file(RootDir, DbName, GroupSig) ->
@@ -86,9 +86,9 @@ test() ->
etap:is({ok, BigBin}, couch_file:pread_binary(Fd, BigBinPos),
"Reading a large term from a written representation succeeds."),
- ok = couch_file:write_header(Fd, hello),
+ {ok, HeaderPos} = couch_file:write_header(Fd, hello),
ok = couch_file:flush(Fd),
- etap:is({ok, hello}, couch_file:read_header(Fd),
+ etap:is({ok, hello, HeaderPos}, couch_file:read_header(Fd),
"Reading a header succeeds."),
{ok, BigBinPos2, _} = couch_file:append_binary(Fd, BigBin),
@@ -39,31 +39,31 @@ test() ->
etap:is({ok, 0}, couch_file:bytes(Fd),
"File should be initialized to contain zero bytes."),
- etap:is(ok, couch_file:write_header(Fd, {<<"some_data">>, 32}),
+ etap:is({ok, 0}, couch_file:write_header(Fd, {<<"some_data">>, 32}),
"Writing a header succeeds."),
ok = couch_file:flush(Fd),
{ok, Size1} = couch_file:bytes(Fd),
etap:is_greater(Size1, 0,
"Writing a header allocates space in the file."),
- etap:is({ok, {<<"some_data">>, 32}}, couch_file:read_header(Fd),
+ etap:is({ok, {<<"some_data">>, 32}, 0}, couch_file:read_header(Fd),
"Reading the header returns what we wrote."),
- etap:is(ok, couch_file:write_header(Fd, [foo, <<"more">>]),
+ etap:is({ok, 4096}, couch_file:write_header(Fd, [foo, <<"more">>]),
"Writing a second header succeeds."),
{ok, Size2} = couch_file:bytes(Fd),
etap:is_greater(Size2, Size1,
"Writing a second header allocates more space."),
ok = couch_file:flush(Fd),
- etap:is({ok, [foo, <<"more">>]}, couch_file:read_header(Fd),
+ etap:is({ok, [foo, <<"more">>], 4096}, couch_file:read_header(Fd),
"Reading the second header does not return the first header."),
% Delete the second header.
ok = couch_file:truncate(Fd, Size1),
- etap:is({ok, {<<"some_data">>, 32}}, couch_file:read_header(Fd),
+ etap:is({ok, {<<"some_data">>, 32}, 0}, couch_file:read_header(Fd),
"Reading the header after a truncation returns a previous header."),
couch_file:write_header(Fd, [foo, <<"more">>]),
@@ -74,7 +74,7 @@ test() ->
ok = couch_file:flush(Fd),
etap:is(
couch_file:read_header(Fd),
- {ok, erlang:make_tuple(5000, <<"CouchDB">>)},
+ {ok, erlang:make_tuple(5000, <<"CouchDB">>), 8192},
"Headers larger than the block size can be saved (COUCHDB-1319)"
),
@@ -134,12 +134,12 @@ check_header_recovery(CheckFun) ->
{ok, _} = write_random_data(Fd),
ExpectHeader = {some_atom, <<"a binary">>, 756},
- ok = couch_file:write_header(Fd, ExpectHeader),
+ {ok, ValidHeaderPos} = couch_file:write_header(Fd, ExpectHeader),
{ok, HeaderPos} = write_random_data(Fd),
- ok = couch_file:write_header(Fd, {2342, <<"corruption! greed!">>}),
+ {ok, _} = couch_file:write_header(Fd, {2342, <<"corruption! greed!">>}),
- CheckFun(Fd, RawFd, {ok, ExpectHeader}, HeaderPos),
+ CheckFun(Fd, RawFd, {ok, ExpectHeader, ValidHeaderPos}, HeaderPos),
ok = file:close(RawFd),
ok = couch_file:close(Fd),
@@ -119,27 +119,29 @@ test() ->
Header = Db2#db.header,
Header2 = Header#db_header{update_seq = 0},
- HeaderBin = couch_db_updater:db_header_to_header_bin(Header2),
- etap:is(couch_file:write_header_bin(Fd, HeaderBin), ok,
- "Should write new header outside of couchdb"),
- couch_file:flush(Fd),
+
% calculate where new header goes
case FileLen rem ?SIZE_BLOCK of
0 ->
NewHeaderPos = FileLen + ?SIZE_BLOCK;
BlockOffset ->
NewHeaderPos = FileLen + (?SIZE_BLOCK - BlockOffset)
end,
+
+ HeaderBin = couch_db_updater:db_header_to_header_bin(Header2),
+ etap:is(couch_file:write_header_bin(Fd, HeaderBin), {ok, NewHeaderPos},
+ "Should write new header outside of couchdb"),
+ couch_file:flush(Fd),
{ok, FileLen2} = couch_file:bytes(Fd),
etap:is(couch_db:update_header_pos(Db2, 1, NewHeaderPos), update_behind_couchdb,
"Should be ahead couchdb"),
Header3 = Header#db_header{update_seq = Header#db_header.update_seq + 1},
HeaderBin3 = couch_db_updater:db_header_to_header_bin(Header3),
- etap:is(couch_file:write_header_bin(Fd, HeaderBin3), ok,
+ NewHeaderPos2 = NewHeaderPos + ?SIZE_BLOCK,
+ etap:is(couch_file:write_header_bin(Fd, HeaderBin3), {ok, NewHeaderPos2},
"Should write new header outside of couchdb"),
couch_file:flush(Fd),
- NewHeaderPos2 = NewHeaderPos + ?SIZE_BLOCK,
etap:is(couch_db:update_header_pos(Db2, 1, NewHeaderPos2), ok,
"Should accept new header"),
couch_db:close(Db2),

0 comments on commit 7903877

Please sign in to comment.