Permalink
Browse files

Merge branch 'jdb-avoid-file-nifs'

Conflicts:
	test/bitcask_timeshift.erl
  • Loading branch information...
2 parents a3d7023 + 9621aaf commit 523140ad743be787460e370236f0fed88a53f67e @jtuple jtuple committed Dec 3, 2012
Showing with 230 additions and 29 deletions.
  1. +7 −1 c_src/bitcask_nifs.c
  2. +172 −0 src/bitcask_file.erl
  3. +22 −22 src/bitcask_fileops.erl
  4. +4 −1 src/bitcask_merge_delete.erl
  5. +24 −4 src/bitcask_nifs.erl
  6. +1 −1 test/pulse/bitcask_eqc.erl
View
@@ -1647,7 +1647,13 @@ static void msg_pending_awaken(ErlNifEnv* env, bitcask_keydir* keydir,
{
enif_clear_env(msg_env);
#ifdef PULSE
- PULSE_SEND(env, &keydir->pending_awaken[idx], msg_env, msg);
+ /* Using PULSE_SEND here sometimes deadlocks the Bitcask PULSE test.
+ Reverting to using enif_send for now.
+ TODO: Check if PULSE_SEND is really necessary and investigate/fix
+ deadlock in the future
+ */
+ /* PULSE_SEND(env, &keydir->pending_awaken[idx], msg_env, msg); */
+ enif_send(env, &keydir->pending_awaken[idx], msg_env, msg);
#else
enif_send(env, &keydir->pending_awaken[idx], msg_env, msg);
#endif
View
@@ -0,0 +1,172 @@
+-module(bitcask_file).
+-compile(export_all).
+-behaviour(gen_server).
+
+%% API
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {fd :: file:fd(),
+ owner :: pid()}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+file_open(Filename, Opts) ->
+ {ok, Pid} = gen_server:start(?MODULE, [], []),
+ Owner = self(),
+ case gen_server:call(Pid, {file_open, Owner, Filename, Opts}, infinity) of
+ ok ->
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+file_close(Pid) ->
+ file_request(Pid, file_close).
+
+file_sync(Pid) ->
+ file_request(Pid, file_sync).
+
+file_pread(Pid, Offset, Size) ->
+ file_request(Pid, {file_pread, Offset, Size}).
+
+file_pwrite(Pid, Offset, Bytes) ->
+ file_request(Pid, {file_pwrite, Offset, Bytes}).
+
+file_read(Pid, Size) ->
+ file_request(Pid, {file_read, Size}).
+
+file_write(Pid, Bytes) ->
+ file_request(Pid, {file_write, Bytes}).
+
+file_seekbof(Pid) ->
+ file_request(Pid, file_seekbof).
+
+%%%===================================================================
+%%% API helper functions
+%%%===================================================================
+
+file_request(Pid, Request) ->
+ case check_pid(Pid) of
+ ok ->
+ gen_server:call(Pid, Request, infinity);
+ Error ->
+ Error
+ end.
+
+check_pid(Pid) ->
+ IsPid = is_pid(Pid),
+ IsAlive = IsPid andalso is_process_alive(Pid),
+ case {IsAlive, IsPid} of
+ {true, _} ->
+ ok;
+ {false, true} ->
+ %% Same result as `file' module when accessing closed FD
+ {error, einval};
+ _ ->
+ %% Same result as `file' module when providing wrong arg
+ {error, badarg}
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ {ok, #state{}}.
+
+handle_call({file_open, Owner, Filename, Opts}, _From, State) ->
+ monitor(process, Owner),
+ IsCreate = proplists:get_bool(create, Opts),
+ IsReadOnly = proplists:get_bool(readonly, Opts),
+ Mode = case {IsReadOnly, IsCreate} of
+ {true, _} ->
+ [read, raw, binary, read_ahead];
+ {_, false} ->
+ [read, write, raw, binary, read_ahead];
+ {_, true} ->
+ [read, write, exclusive, raw, binary, read_ahead]
+ end,
+ [warn("Bitcask file option '~p' not supported~n", [Opt])
+ || Opt <- [o_sync],
+ proplists:get_bool(Opt, Opts)],
+ case file:open(Filename, Mode) of
+ {ok, Fd} ->
+ State2 = State#state{fd=Fd, owner=Owner},
+ {reply, ok, State2};
+ Error ->
+ {reply, Error, State}
+ end;
+handle_call(file_close, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ ok = file:close(Fd),
+ {stop, normal, ok, State};
+handle_call(file_sync, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ Reply = file:sync(Fd),
+ {reply, Reply, State};
+handle_call({file_pread, Offset, Size}, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ Reply = file:pread(Fd, Offset, Size),
+ {reply, Reply, State};
+handle_call({file_pwrite, Offset, Bytes}, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ Reply = file:pwrite(Fd, Offset, Bytes),
+ {reply, Reply, State};
+handle_call({file_read, Size}, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ Reply = file:read(Fd, Size),
+ {reply, Reply, State};
+handle_call({file_write, Bytes}, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ Reply = file:write(Fd, Bytes),
+ {reply, Reply, State};
+handle_call(file_seekbof, From, State=#state{fd=Fd}) ->
+ check_owner(From, State),
+ {ok, _} = file:position(Fd, bof),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _Ref, _, _Pid, _Status}, State=#state{fd=Fd}) ->
+ %% Owner has stopped, close file and shutdown
+ ok = file:close(Fd),
+ {stop, normal, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+check_owner({Pid, _Mref}, #state{owner=Owner}) ->
+ case Pid == Owner of
+ true ->
+ ok;
+ false ->
+ throw(owner_invariant_failed),
+ ok
+ end.
+
+warn(Fmt, Args) ->
+ case code:which(lager) of
+ non_existing ->
+ io:format(Fmt, Args);
+ _ ->
+ lager:warning(Fmt, Args)
+ end.
View
@@ -69,7 +69,7 @@ create_file(DirName, Opts) ->
%% Called with fully-qualified filename.
-spec open_file(Filename :: string()) -> {ok, #filestate{}} | {error, any()}.
open_file(Filename) ->
- case bitcask_nifs:file_open(Filename, [readonly]) of
+ case bitcask_file:file_open(Filename, [readonly]) of
{ok, FD} ->
{ok, #filestate{mode = read_only,
filename = Filename, tstamp = file_tstamp(Filename),
@@ -84,7 +84,7 @@ close(fresh) -> ok;
close(undefined) -> ok;
close(State = #filestate{ fd = FD }) ->
close_hintfile(State),
- bitcask_nifs:file_close(FD),
+ bitcask_file:file_close(FD),
ok.
%% @doc Close a file for writing, but leave it open for reads.
@@ -93,7 +93,7 @@ close_for_writing(fresh) -> ok;
close_for_writing(undefined) -> ok;
close_for_writing(State = #filestate{ mode = read_write, fd = Fd }) ->
S2 = close_hintfile(State),
- bitcask_nifs:file_sync(Fd),
+ bitcask_file:file_sync(Fd),
S2#filestate { mode = read_only }.
close_hintfile(State = #filestate { hintfd = undefined }) ->
@@ -104,9 +104,9 @@ close_hintfile(State = #filestate { hintfd = HintFd, hintcrc = HintCRC }) ->
%% an older version of bitcask will just reject the record at the end of the
%% hintfile and otherwise work normally.
Iolist = hintfile_entry(<<>>, 0, {?MAXOFFSET, HintCRC}),
- ok = bitcask_nifs:file_write(HintFd, Iolist),
- bitcask_nifs:file_sync(HintFd),
- bitcask_nifs:file_close(HintFd),
+ ok = bitcask_file:file_write(HintFd, Iolist),
+ bitcask_file:file_sync(HintFd),
+ bitcask_file:file_close(HintFd),
State#filestate { hintfd = undefined, hintcrc = 0 }.
@@ -151,11 +151,11 @@ write(Filestate=#filestate{fd = FD, hintfd = HintFD,
<<ValueSz:?VALSIZEFIELD>>, Key, Value],
Bytes = [<<(erlang:crc32(Bytes0)):?CRCSIZEFIELD>> | Bytes0],
%% Store the full entry in the data file
- ok = bitcask_nifs:file_pwrite(FD, Offset, Bytes),
+ ok = bitcask_file:file_pwrite(FD, Offset, Bytes),
%% Create and store the corresponding hint entry
TotalSz = KeySz + ValueSz + ?HEADER_SIZE,
Iolist = hintfile_entry(Key, Tstamp, {Offset, TotalSz}),
- ok = bitcask_nifs:file_write(HintFD, Iolist),
+ ok = bitcask_file:file_write(HintFD, Iolist),
%% Record our final offset
TotalSz = iolist_size(Bytes),
HintCRC = erlang:crc32(HintCRC0, Iolist), % compute crc of hint
@@ -176,7 +176,7 @@ read(Filename, Offset, Size) when is_list(Filename) ->
{error, Reason}
end;
read(#filestate { fd = FD }, Offset, Size) ->
- case bitcask_nifs:file_pread(FD, Offset, Size) of
+ case bitcask_file:file_pread(FD, Offset, Size) of
{ok, <<Crc32:?CRCSIZEFIELD/unsigned, Bytes/binary>>} ->
%% Verify the CRC of the data
case erlang:crc32(Bytes) of
@@ -198,8 +198,8 @@ read(#filestate { fd = FD }, Offset, Size) ->
%% @doc Call the OS's fsync(2) system call on the cask and hint files.
-spec sync(#filestate{}) -> ok.
sync(#filestate { mode = read_write, fd = Fd, hintfd = HintFd }) ->
- ok = bitcask_nifs:file_sync(Fd),
- ok = bitcask_nifs:file_sync(HintFd).
+ ok = bitcask_file:file_sync(Fd),
+ ok = bitcask_file:file_sync(HintFd).
-spec fold(fresh | #filestate{},
fun((binary(), binary(), integer(),
@@ -210,8 +210,8 @@ fold(fresh, _Fun, Acc) -> Acc;
fold(#filestate { fd=Fd, filename=Filename, tstamp=FTStamp }, Fun, Acc) ->
%% TODO: Add some sort of check that this is a read-only file
%% TODO: Need to position+read?!
- ok = bitcask_nifs:file_seekbof(Fd),
- case bitcask_nifs:file_read(Fd, ?HEADER_SIZE) of
+ ok = bitcask_file:file_seekbof(Fd),
+ case bitcask_file:file_read(Fd, ?HEADER_SIZE) of
{ok, <<_Crc:?CRCSIZEFIELD, _Tstamp:?TSTAMPFIELD, _KeySz:?KEYSIZEFIELD,
_ValueSz:?VALSIZEFIELD>> = H} ->
fold_loop(Fd, Filename, FTStamp, H, 0, Fun, Acc, 0);
@@ -335,7 +335,7 @@ fold_loop(Fd, Filename, FTStamp, Header, Offset, Fun, Acc0, CrcSkipCount) ->
ValueSz:?VALSIZEFIELD>> = Header,
<<_:4/binary, HeaderMinusCRC/binary>> = Header,
TotalSz = KeySz + ValueSz + ?HEADER_SIZE,
- case bitcask_nifs:file_read(Fd, TotalSz) of
+ case bitcask_file:file_read(Fd, TotalSz) of
{ok, <<Key:KeySz/bytes, Value:ValueSz/bytes, Rest/binary>>} ->
CrcMatch = erlang:crc32([HeaderMinusCRC, Key, Value]) =:= Crc32,
Acc = case CrcMatch of
@@ -378,15 +378,15 @@ fold_loop(Fd, Filename, FTStamp, Header, Offset, Fun, Acc0, CrcSkipCount) ->
end.
fold_keys_loop(Fd, Offset, Fun, Acc0) ->
- case bitcask_nifs:file_pread(Fd, Offset, ?HEADER_SIZE) of
+ case bitcask_file:file_pread(Fd, Offset, ?HEADER_SIZE) of
{ok, Header} when erlang:size(Header) =:= ?HEADER_SIZE ->
<<_Crc32:?CRCSIZEFIELD, Tstamp:?TSTAMPFIELD, KeySz:?KEYSIZEFIELD,
ValueSz:?VALSIZEFIELD>> = Header,
TotalSz = KeySz + ValueSz + ?HEADER_SIZE,
PosInfo = {Offset, TotalSz},
%% NOTE: We are intentionally *not* reading the value blob,
%% so we cannot check the checksum.
- case bitcask_nifs:file_pread(Fd, Offset + ?HEADER_SIZE, KeySz) of
+ case bitcask_file:file_pread(Fd, Offset + ?HEADER_SIZE, KeySz) of
{ok, Key} when erlang:size(Key) =:= KeySz ->
Acc = Fun(Key, Tstamp, PosInfo, Acc0),
fold_keys_loop(Fd, Offset + TotalSz, Fun, Acc);
@@ -410,12 +410,12 @@ fold_keys_loop(Fd, Offset, Fun, Acc0) ->
fold_hintfile(State, Fun, Acc) ->
HintFile = hintfile_name(State),
- case bitcask_nifs:file_open(HintFile, [readonly]) of
+ case bitcask_file:file_open(HintFile, [readonly]) of
{ok, HintFd} ->
try
{ok, DataI} = file:read_file_info(State#filestate.filename),
DataSize = DataI#file_info.size,
- case bitcask_nifs:file_read(HintFd, ?HINT_RECORD_SZ) of
+ case bitcask_file:file_read(HintFd, ?HINT_RECORD_SZ) of
{ok, <<H:?HINT_RECORD_SZ/bytes>>} ->
fold_hintfile_loop(DataSize, HintFile,
HintFd, 0, H, Fun, Acc);
@@ -432,7 +432,7 @@ fold_hintfile(State, Fun, Acc) ->
{error, {fold_hintfile, Reason}}
end
after
- bitcask_nifs:file_close(HintFd)
+ bitcask_file:file_close(HintFd)
end;
{error, Reason} ->
{error, {fold_hintfile, Reason}}
@@ -455,7 +455,7 @@ fold_hintfile_loop(DataSize, HintFile, Fd, HintCRC0,
<<Tstamp:?TSTAMPFIELD, KeySz:?KEYSIZEFIELD,
TotalSz:?TOTALSIZEFIELD, Offset:?OFFSETFIELD>> = HintRecord,
ReadSz = KeySz + ?HINT_RECORD_SZ,
- case bitcask_nifs:file_read(Fd, ReadSz) of
+ case bitcask_file:file_read(Fd, ReadSz) of
{ok, <<Key:KeySz/bytes, Rest/binary>>} when
Offset + TotalSz =< DataSize ->
PosInfo = {Offset, TotalSz},
@@ -503,9 +503,9 @@ create_file_loop(DirName, Opts, Tstamp) ->
Opts
end,
- case bitcask_nifs:file_open(Filename, FinalOpts) of
+ case bitcask_file:file_open(Filename, FinalOpts) of
{ok, FD} ->
- {ok, HintFD} = bitcask_nifs:file_open(hintfile_name(Filename), FinalOpts),
+ {ok, HintFD} = bitcask_file:file_open(hintfile_name(Filename), FinalOpts),
{ok, #filestate{mode = read_write,
filename = Filename,
tstamp = file_tstamp(Filename),
@@ -137,7 +137,10 @@ delete_files(Files) ->
-ifdef(TEST).
-multiple_merges_during_fold_test() ->
+multiple_merges_during_fold_test_() ->
+ {timeout, 10, fun multiple_merges_during_fold_test_body/0}.
+
+multiple_merges_during_fold_test_body() ->
Dir = "/tmp/bc.multiple-merges-fold",
B = bitcask:open(Dir, [read_write, {max_file_size, 50}]),
PutSome = fun() ->
View
@@ -245,11 +245,11 @@ keydir_fold(Ref, Fun, Acc0, MaxAge, MaxPuts) ->
keydir_frozen(Ref, FrozenFun, MaxAge, MaxPuts) ->
case keydir_itr(Ref, MaxAge, MaxPuts) of
out_of_date ->
- receive
- ready -> % fold no matter what on second attempt
+ case keydir_wait_ready() of
+ ok ->
keydir_frozen(Ref, FrozenFun, -1, -1);
- error ->
- {error, shutdown}
+ Else ->
+ Else
end;
ok ->
try
@@ -278,6 +278,26 @@ keydir_wait_pending(Ref) ->
ok
end.
+-ifdef(PULSE).
+keydir_wait_ready() ->
+ receive
+ ready -> % fold no matter what on second attempt
+ ok;
+ error ->
+ {error, shutdown}
+ after 1000 ->
+ keydir_wait_ready()
+ end.
+-else.
+keydir_wait_ready() ->
+ receive
+ ready -> % fold no matter what on second attempt
+ ok;
+ error ->
+ {error, shutdown}
+ end.
+-endif.
+
keydir_info(_Ref) ->
erlang:nif_error({error, not_loaded}).
Oops, something went wrong.

0 comments on commit 523140a

Please sign in to comment.