Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Use Erlang file I/O from dedicated procs rather than NIFs

Bitcask previously used raw file I/O to read/write files. However, since
raw file I/O uses a non-optimized selective receive to wait for a reply
back from the efile driver, this approach had numerous problems when
Bitcask was used within processes with many incoming messages (such as how
Bitcask is used in Riak).

In commit 79d5eb3, NIFs were introduced
to solve this problem. The file I/O NIFs would block the Erlang scheduler,
but solve the issue encountered with selective receive. Unfortunately,
using blocking NIFs is much worse than originally thought. Thus, NIFs are
not the right solution to this problem.

This commit changes Bitcask to once again use Erlang's built-in file I/O,
but now wraps each open file in a separate gen_server that interacts with
the raw port. The original process now waits on a gen_server reply which
uses an optimized selective receive, while the file process handles the
unoptimized selective receive from the port driver. In our usage, the file
process only has a single request outstanding, and therefore does not run
into the selective receive issue.
  • Loading branch information...
commit f20c14374b7f8d4cc1ab852dcaf22ff245d39aa4 1 parent f706bad
Joseph Blomstedt jtuple authored
Showing with 158 additions and 22 deletions.
  1. +136 −0 src/bitcask_file.erl
  2. +22 −22 src/bitcask_fileops.erl
136 src/bitcask_file.erl
View
@@ -0,0 +1,136 @@
+-module(bitcask_file).
+-compile(export_all).
+-behaviour(gen_server).
+
+%% API
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {fd}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+file_open(Filename, Opts) ->
+ {ok, Pid} = gen_server:start(?MODULE, [], []),
+ case gen_server:call(Pid, {file_open, Filename, Opts}, infinity) of
+ ok ->
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+file_close(Pid) ->
+ file_request(Pid, file_close).
+
+file_sync(Pid) ->
+ file_request(Pid, file_sync).
+
+file_pread(Pid, Offset, Size) ->
+ file_request(Pid, {file_pread, Offset, Size}).
+
+file_pwrite(Pid, Offset, Bytes) ->
+ file_request(Pid, {file_pwrite, Offset, Bytes}).
+
+file_read(Pid, Size) ->
+ file_request(Pid, {file_read, Size}).
+
+file_write(Pid, Bytes) ->
+ file_request(Pid, {file_write, Bytes}).
+
+file_seekbof(Pid) ->
+ file_request(Pid, file_seekbof).
+
+%%%===================================================================
+%%% API helper functions
+%%%===================================================================
+
+file_request(Pid, Request) ->
+ case check_pid(Pid) of
+ ok ->
+ gen_server:call(Pid, Request, infinity);
+ Error ->
+ Error
+ end.
+
+check_pid(Pid) ->
+ IsPid = is_pid(Pid),
+ IsAlive = IsPid andalso is_process_alive(Pid),
+ case {IsAlive, IsPid} of
+ {true, _} ->
+ ok;
+ {false, true} ->
+ %% Same result as `file' module when accessing closed FD
+ {error, einval};
+ _ ->
+ %% Same result as `file' module when providing wrong arg
+ {error, badarg}
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init([]) ->
+ {ok, #state{}}.
+
+handle_call({file_open, Filename, Opts}, _From, State) ->
+ Mode = case proplists:get_bool(readonly, Opts) of
+ true ->
+ [read, raw, binary, read_ahead];
+ false ->
+ [read, write, raw, binary, read_ahead]
+ end,
+ %% [lager:warning("Option ~p ignored", [Opt]) || Opt <- [create, o_sync],
+ %% proplists:get_bool(Opt, Opts)],
+ case file:open(Filename, Mode) of
+ {ok, Fd} ->
+ State2 = State#state{fd=Fd},
+ {reply, ok, State2};
+ Error ->
+ {reply, Error, State}
+ end;
+handle_call(file_close, _From, State=#state{fd=Fd}) ->
+ ok = file:close(Fd),
+ {stop, normal, ok, State};
+handle_call(file_sync, _From, State=#state{fd=Fd}) ->
+ Reply = file:sync(Fd),
+ {reply, Reply, State};
+handle_call({file_pread, Offset, Size}, _From, State=#state{fd=Fd}) ->
+ Reply = file:pread(Fd, Offset, Size),
+ {reply, Reply, State};
+handle_call({file_pwrite, Offset, Bytes}, _From, State=#state{fd=Fd}) ->
+ Reply = file:pwrite(Fd, Offset, Bytes),
+ {reply, Reply, State};
+handle_call({file_read, Size}, _From, State=#state{fd=Fd}) ->
+ Reply = file:read(Fd, Size),
+ {reply, Reply, State};
+handle_call({file_write, Bytes}, _From, State=#state{fd=Fd}) ->
+ Reply = file:write(Fd, Bytes),
+ {reply, Reply, State};
+handle_call(file_seekbof, _From, State=#state{fd=Fd}) ->
+ {ok, _} = file:position(Fd, bof),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
44 src/bitcask_fileops.erl
View
@@ -69,7 +69,7 @@ create_file(DirName, Opts) ->
%% Called with fully-qualified filename.
-spec open_file(Filename :: string()) -> {ok, #filestate{}} | {error, any()}.
open_file(Filename) ->
- case bitcask_nifs:file_open(Filename, [readonly]) of
+ case bitcask_file:file_open(Filename, [readonly]) of
{ok, FD} ->
{ok, #filestate{mode = read_only,
filename = Filename, tstamp = file_tstamp(Filename),
@@ -84,7 +84,7 @@ close(fresh) -> ok;
close(undefined) -> ok;
close(State = #filestate{ fd = FD }) ->
close_hintfile(State),
- bitcask_nifs:file_close(FD),
+ bitcask_file:file_close(FD),
ok.
%% @doc Close a file for writing, but leave it open for reads.
@@ -93,7 +93,7 @@ close_for_writing(fresh) -> ok;
close_for_writing(undefined) -> ok;
close_for_writing(State = #filestate{ mode = read_write, fd = Fd }) ->
S2 = close_hintfile(State),
- bitcask_nifs:file_sync(Fd),
+ bitcask_file:file_sync(Fd),
S2#filestate { mode = read_only }.
close_hintfile(State = #filestate { hintfd = undefined }) ->
@@ -104,9 +104,9 @@ close_hintfile(State = #filestate { hintfd = HintFd, hintcrc = HintCRC }) ->
%% an older version of bitcask will just reject the record at the end of the
%% hintfile and otherwise work normally.
Iolist = hintfile_entry(<<>>, 0, {?MAXOFFSET, HintCRC}),
- ok = bitcask_nifs:file_write(HintFd, Iolist),
- bitcask_nifs:file_sync(HintFd),
- bitcask_nifs:file_close(HintFd),
+ ok = bitcask_file:file_write(HintFd, Iolist),
+ bitcask_file:file_sync(HintFd),
+ bitcask_file:file_close(HintFd),
State#filestate { hintfd = undefined, hintcrc = 0 }.
@@ -151,11 +151,11 @@ write(Filestate=#filestate{fd = FD, hintfd = HintFD,
<<ValueSz:?VALSIZEFIELD>>, Key, Value],
Bytes = [<<(erlang:crc32(Bytes0)):?CRCSIZEFIELD>> | Bytes0],
%% Store the full entry in the data file
- ok = bitcask_nifs:file_pwrite(FD, Offset, Bytes),
+ ok = bitcask_file:file_pwrite(FD, Offset, Bytes),
%% Create and store the corresponding hint entry
TotalSz = KeySz + ValueSz + ?HEADER_SIZE,
Iolist = hintfile_entry(Key, Tstamp, {Offset, TotalSz}),
- ok = bitcask_nifs:file_write(HintFD, Iolist),
+ ok = bitcask_file:file_write(HintFD, Iolist),
%% Record our final offset
TotalSz = iolist_size(Bytes),
HintCRC = erlang:crc32(HintCRC0, Iolist), % compute crc of hint
@@ -176,7 +176,7 @@ read(Filename, Offset, Size) when is_list(Filename) ->
{error, Reason}
end;
read(#filestate { fd = FD }, Offset, Size) ->
- case bitcask_nifs:file_pread(FD, Offset, Size) of
+ case bitcask_file:file_pread(FD, Offset, Size) of
{ok, <<Crc32:?CRCSIZEFIELD/unsigned, Bytes/binary>>} ->
%% Verify the CRC of the data
case erlang:crc32(Bytes) of
@@ -198,8 +198,8 @@ read(#filestate { fd = FD }, Offset, Size) ->
%% @doc Call the OS's fsync(2) system call on the cask and hint files.
-spec sync(#filestate{}) -> ok.
sync(#filestate { mode = read_write, fd = Fd, hintfd = HintFd }) ->
- ok = bitcask_nifs:file_sync(Fd),
- ok = bitcask_nifs:file_sync(HintFd).
+ ok = bitcask_file:file_sync(Fd),
+ ok = bitcask_file:file_sync(HintFd).
-spec fold(fresh | #filestate{},
fun((binary(), binary(), integer(),
@@ -210,8 +210,8 @@ fold(fresh, _Fun, Acc) -> Acc;
fold(#filestate { fd=Fd, filename=Filename, tstamp=FTStamp }, Fun, Acc) ->
%% TODO: Add some sort of check that this is a read-only file
%% TODO: Need to position+read?!
- ok = bitcask_nifs:file_seekbof(Fd),
- case bitcask_nifs:file_read(Fd, ?HEADER_SIZE) of
+ ok = bitcask_file:file_seekbof(Fd),
+ case bitcask_file:file_read(Fd, ?HEADER_SIZE) of
{ok, <<_Crc:?CRCSIZEFIELD, _Tstamp:?TSTAMPFIELD, _KeySz:?KEYSIZEFIELD,
_ValueSz:?VALSIZEFIELD>> = H} ->
fold_loop(Fd, Filename, FTStamp, H, 0, Fun, Acc, 0);
@@ -335,7 +335,7 @@ fold_loop(Fd, Filename, FTStamp, Header, Offset, Fun, Acc0, CrcSkipCount) ->
ValueSz:?VALSIZEFIELD>> = Header,
<<_:4/binary, HeaderMinusCRC/binary>> = Header,
TotalSz = KeySz + ValueSz + ?HEADER_SIZE,
- case bitcask_nifs:file_read(Fd, TotalSz) of
+ case bitcask_file:file_read(Fd, TotalSz) of
{ok, <<Key:KeySz/bytes, Value:ValueSz/bytes, Rest/binary>>} ->
CrcMatch = erlang:crc32([HeaderMinusCRC, Key, Value]) =:= Crc32,
Acc = case CrcMatch of
@@ -378,7 +378,7 @@ fold_loop(Fd, Filename, FTStamp, Header, Offset, Fun, Acc0, CrcSkipCount) ->
end.
fold_keys_loop(Fd, Offset, Fun, Acc0) ->
- case bitcask_nifs:file_pread(Fd, Offset, ?HEADER_SIZE) of
+ case bitcask_file:file_pread(Fd, Offset, ?HEADER_SIZE) of
{ok, Header} when erlang:size(Header) =:= ?HEADER_SIZE ->
<<_Crc32:?CRCSIZEFIELD, Tstamp:?TSTAMPFIELD, KeySz:?KEYSIZEFIELD,
ValueSz:?VALSIZEFIELD>> = Header,
@@ -386,7 +386,7 @@ fold_keys_loop(Fd, Offset, Fun, Acc0) ->
PosInfo = {Offset, TotalSz},
%% NOTE: We are intentionally *not* reading the value blob,
%% so we cannot check the checksum.
- case bitcask_nifs:file_pread(Fd, Offset + ?HEADER_SIZE, KeySz) of
+ case bitcask_file:file_pread(Fd, Offset + ?HEADER_SIZE, KeySz) of
{ok, Key} when erlang:size(Key) =:= KeySz ->
Acc = Fun(Key, Tstamp, PosInfo, Acc0),
fold_keys_loop(Fd, Offset + TotalSz, Fun, Acc);
@@ -410,12 +410,12 @@ fold_keys_loop(Fd, Offset, Fun, Acc0) ->
fold_hintfile(State, Fun, Acc) ->
HintFile = hintfile_name(State),
- case bitcask_nifs:file_open(HintFile, [readonly]) of
+ case bitcask_file:file_open(HintFile, [readonly]) of
{ok, HintFd} ->
try
{ok, DataI} = file:read_file_info(State#filestate.filename),
DataSize = DataI#file_info.size,
- case bitcask_nifs:file_read(HintFd, ?HINT_RECORD_SZ) of
+ case bitcask_file:file_read(HintFd, ?HINT_RECORD_SZ) of
{ok, <<H:?HINT_RECORD_SZ/bytes>>} ->
fold_hintfile_loop(DataSize, HintFile,
HintFd, 0, H, Fun, Acc);
@@ -432,7 +432,7 @@ fold_hintfile(State, Fun, Acc) ->
{error, {fold_hintfile, Reason}}
end
after
- bitcask_nifs:file_close(HintFd)
+ bitcask_file:file_close(HintFd)
end;
{error, Reason} ->
{error, {fold_hintfile, Reason}}
@@ -455,7 +455,7 @@ fold_hintfile_loop(DataSize, HintFile, Fd, HintCRC0,
<<Tstamp:?TSTAMPFIELD, KeySz:?KEYSIZEFIELD,
TotalSz:?TOTALSIZEFIELD, Offset:?OFFSETFIELD>> = HintRecord,
ReadSz = KeySz + ?HINT_RECORD_SZ,
- case bitcask_nifs:file_read(Fd, ReadSz) of
+ case bitcask_file:file_read(Fd, ReadSz) of
{ok, <<Key:KeySz/bytes, Rest/binary>>} when
Offset + TotalSz =< DataSize ->
PosInfo = {Offset, TotalSz},
@@ -503,9 +503,9 @@ create_file_loop(DirName, Opts, Tstamp) ->
Opts
end,
- case bitcask_nifs:file_open(Filename, FinalOpts) of
+ case bitcask_file:file_open(Filename, FinalOpts) of
{ok, FD} ->
- {ok, HintFD} = bitcask_nifs:file_open(hintfile_name(Filename), FinalOpts),
+ {ok, HintFD} = bitcask_file:file_open(hintfile_name(Filename), FinalOpts),
{ok, #filestate{mode = read_write,
filename = Filename,
tstamp = file_tstamp(Filename),
Please sign in to comment.
Something went wrong with that request. Please try again.