Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
keep index fd open and update it as we update items
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Newson committed Mar 8, 2011
1 parent 571cfd5 commit 39ed764
Showing 1 changed file with 52 additions and 32 deletions.
84 changes: 52 additions & 32 deletions src/monic_file_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

-record(state, {
tid,
fd=nil,
index_fd=nil,
main_fd=nil,
eof
}).

Expand All @@ -45,57 +46,52 @@ close(Pid) ->

init(Path) ->
Tid = ets:new(index, []),
{ok, LastLocation} = load_index(Tid, Path),
case file:open(Path, [read, write, append, raw, binary]) of
{ok, Fd} ->
{ok, LastLocation1} = load_main_items(Tid, Fd, LastLocation),
{ok, #state{fd=Fd,tid=Tid,eof=LastLocation1}};
Error ->
{stop, Error}
case load_index(Tid, Path) of
{ok, IndexFd, Hint} ->
case load_main(Tid, Path, Hint) of
{ok, MainFd, Eof} ->
{ok, #state{index_fd=IndexFd,main_fd=MainFd,eof=Eof}};
Else ->
{stop, Else}
end;
Else ->
{stop, Else}
end.

handle_call({write, Key, Cookie, Size, Fun}, _From, #state{tid=Tid,fd=Fd,eof=Eof}=State) ->
case update_item(Tid, Fd, Eof, Key, Cookie, Size, Fun) of
handle_call({write, Key, Cookie, Size, Fun}, _From, #state{main_fd=Fd,eof=Eof}=State) ->
case update_item(Key, Cookie, Size, Fun, State) of
{ok, Eof1} ->
{reply, ok, State#state{eof=Eof1}};
Else ->
file:truncate(Fd, Eof),
{reply, Else, State}
end;
handle_call(close, _From, #state{fd=nil}=State) ->
{stop, normal, ok, State};
handle_call(close, _From, #state{fd=Fd}=State) ->
{stop, normal, file:close(Fd), State#state{fd=nil}}.
handle_call(close, _From, #state{index_fd=IndexFd,main_fd=MainFd}=State) ->
{stop, normal, close_files([IndexFd, MainFd]), State#state{index_fd=nil,main_fd=nil}}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, #state{fd=nil}) ->
ok;
terminate(_Reason, #state{fd=Fd}) ->
file:close(Fd).
terminate(_Reason, #state{index_fd=IndexFd,main_fd=MainFd}) ->
close_files([IndexFd, MainFd]).

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%% private functions

load_index(Tid, Path) ->
case file:open(Path ++ ".idx", [binary, raw, read]) of
case file:open(Path ++ ".idx", [binary, raw, read, write, append]) of
{ok, Fd} ->
Res = load_index_items(Tid, Fd, 0),
file:close(Fd),
Res;
{error, enoent} ->
{ok, 0};
{ok, Fd, load_index_items(Tid, Fd, 0)};
Else ->
Else
end.

load_index_items(Tid, Fd, LastLocation) ->
load_index_items(Tid, Fd, Hint) ->
case monic_utils:read_index(Fd) of
{ok, #index{key=Key,location=Location,size=Size,version=Version,flags= <<Deleted:1,_:15>>}} ->
case Deleted of
Expand All @@ -104,7 +100,15 @@ load_index_items(Tid, Fd, LastLocation) ->
end,
load_index_items(Tid, Fd, Location);
eof ->
{ok, LastLocation};
{ok, Hint};
Else ->
Else
end.

load_main(Tid, Path, Hint) ->
case file:open(Path, [binary, raw, read, write]) of
{ok, Fd} ->
{ok, Fd, load_main_items(Tid, Fd, Hint)};
Else ->
Else
end.
Expand All @@ -123,22 +127,30 @@ load_main_items(Tid, Fd, Location) ->
Else
end.

update_item(Tid, Fd, Location, Key, Cookie, Size, Fun) ->
update_item(Key, Cookie, Size, Fun, #state{tid=Tid,index_fd=IndexFd,main_fd=MainFd,eof=Location}) ->
Version = case ets:lookup(Tid, Key) of
[] -> 1;
[65535] -> 1;
[V] -> V + 1
end,
Header = #header{key=Key,cookie=Cookie,size=Size,version=Version, flags=0},
case monic_utils:pwrite_header(Fd, Location, Header) of
case monic_utils:pwrite_header(MainFd, Location, Header) of
ok ->
case copy_in(Fd, Fun, Location + ?HEADER_SIZE, Size) of
case copy_in(MainFd, Fun, Location + ?HEADER_SIZE, Size) of
{ok, Sha} ->
Footer = #footer{sha=Sha},
case monic_utils:pwrite_footer(Fd, Location + ?HEADER_SIZE + Size, Footer) of
case monic_utils:pwrite_footer(MainFd, Location + ?HEADER_SIZE + Size, Footer) of
ok ->
ets:insert(Tid, {Key, Location, Size, Version}),
{ok, Location + ?HEADER_SIZE + Size + ?FOOTER_SIZE};
case file:datasync(MainFd) of
ok ->
monic_utils:write_index(IndexFd,
#index{key=Key,location=Location,size=Size,version=Version,flags=0}
),
ets:insert(Tid, {Key, Location, Size, Version}),
{ok, Location + ?HEADER_SIZE + Size + ?FOOTER_SIZE};
Else ->
Else
end;
Else ->
Else
end;
Expand Down Expand Up @@ -175,3 +187,11 @@ copy_in(Fd, Fun, Location, Remaining, Sha) ->
Else
end.

close_files([]) ->
ok;
close_files([nil|T]) ->
close_files(T);
close_files([Fd|T]) ->
file:close(Fd),
close(T).

0 comments on commit 39ed764

Please sign in to comment.