Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
restore underflow check, remove ttl.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Newson committed Mar 12, 2011
1 parent 699faef commit bbef4ed
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions src/monic_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
%% gen_server API
-export([init/1, terminate/2, code_change/3,handle_call/3, handle_cast/2, handle_info/2]).

-define(TTL, 2).

-record(state, {
ttl=?TTL,
tid,
index_fd=nil,
main_fd=nil,
next_index,
next_key,
reset_pos,
tref=nil,
write_pos,
writer=nil
}).
Expand Down Expand Up @@ -82,13 +78,11 @@ init(Path) ->
{ok, IndexFd, Hints} ->
case load_main(Tid, Path, Hints) of
{ok, MainFd, {NextKey, NextLocation}} ->
{ok, TRef} = timer:apply_interval(10000, gen_server, cast, [self(), ttl]),
{ok, #state{
index_fd=IndexFd,
main_fd=MainFd,
next_key=NextKey,
reset_pos=NextLocation,
tref=TRef,
write_pos=NextLocation,
tid=Tid
}};
Expand All @@ -109,9 +103,9 @@ handle_call({start_writing, Size}, _From,
Index = #index{cookie=Cookie, key=Key, flags=Flags, location=Pos, size=Size, version=Version},
case monic_utils:pwrite_header(MainFd, Pos, Header) of
ok ->
{reply, {ok, Ref}, reset_ttl(State#state{next_index=Index, write_pos=Pos + ?HEADER_SIZE, writer=Ref})};
{reply, {ok, Ref}, State#state{next_index=Index, write_pos=Pos + ?HEADER_SIZE, writer=Ref}};
Else ->
{reply, Else, reset_ttl(abandon_write(State))}
{reply, Else, abandon_write(State)}
end;
handle_call({start_writing, _Size}, _From, State) ->
{reply, {error, already_writing}, State};
Expand All @@ -122,20 +116,26 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, write_pos=Pos,
Pos1 = Pos + iolist_size(Bin),
case Next of
done ->
case file:datasync(Fd) of
ok ->
Index = State#state.next_index,
monic_utils:write_index(State#state.index_fd, Index),
ets:insert(State#state.tid, {Index#index.key, Index#index.cookie,
Index#index.location, Index#index.size, Index#index.version}),
{reply, {ok, {Index#index.key, Index#index.cookie}},
reset_ttl(State#state{next_index=nil, next_key=State#state.next_key+1,
reset_pos=Pos1, write_pos=Pos1, writer=nil})};
Else ->
{reply, Else, abandon_write(State)}
Len = Pos1 - State#state.reset_pos,
case Len < (State#state.next_index)#index.size of
true ->
{reply, {error, underflow}, abandon_write(State)};
false ->
case file:datasync(Fd) of
ok ->
Index = State#state.next_index,
monic_utils:write_index(State#state.index_fd, Index),
ets:insert(State#state.tid, {Index#index.key, Index#index.cookie,
Index#index.location, Index#index.size, Index#index.version}),
{reply, {ok, {Index#index.key, Index#index.cookie}},
State#state{next_index=nil, next_key=State#state.next_key+1,
reset_pos=Pos1, write_pos=Pos1, writer=nil}};
Else ->
{reply, Else, abandon_write(State)}
end
end;
Next ->
{reply, {continue, Next}, reset_ttl(State#state{write_pos=Pos1})}
{reply, {continue, Next}, State#state{write_pos=Pos1}}
end;
{Else, _} ->
{reply, Else, abandon_write(State)}
Expand All @@ -147,23 +147,19 @@ handle_call({read, Key, Cookie}, _From, #state{tid=Tid}=State) ->
case info_int(Tid, Key, Cookie) of
{ok, {Location, Size, _Version}} ->
Self = self(),
{reply, {ok, fun() -> stream_out(Self, Location + ?HEADER_SIZE, Size) end}, reset_ttl(State)};
{reply, {ok, fun() -> stream_out(Self, Location + ?HEADER_SIZE, Size) end}, State};
Else ->
{reply, Else, State}
end;
handle_call({read_hunk, Location, Size}, _From, #state{main_fd=Fd}=State) ->
{reply, file:pread(Fd, Location, Size), reset_ttl(State)};
{reply, file:pread(Fd, Location, Size), State};

handle_call({info, Key, Cookie}, _From, #state{tid=Tid}=State) ->
{reply, info_int(Tid, Key, Cookie), reset_ttl(State)};
{reply, info_int(Tid, Key, Cookie), State};

handle_call(close, _From, State) ->
{stop, normal, ok, cleanup(State)}.

handle_cast(ttl, #state{ttl=0}=State) ->
{stop, normal, cleanup(State)};
handle_cast(ttl, #state{ttl=Ttl}=State) ->
{noreply, State#state{ttl=Ttl-1}};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -277,12 +273,11 @@ info_int(Tid, Key, Cookie) ->
{error, not_found}
end.

cleanup(#state{tid=Tid,tref=TRef,index_fd=IndexFd,main_fd=MainFd}=State) ->
cleanup(#state{tid=Tid,index_fd=IndexFd,main_fd=MainFd}=State) ->
close_int(IndexFd),
close_int(MainFd),
close_ets(Tid),
cancel_timer(TRef),
State#state{tid=nil,tref=nil,index_fd=nil,main_fd=nil}.
State#state{tid=nil,index_fd=nil,main_fd=nil}.

close_int(nil) ->
ok;
Expand All @@ -294,10 +289,3 @@ close_ets(nil) ->
close_ets(Tid) ->
ets:delete(Tid).

cancel_timer(nil) ->
ok;
cancel_timer(TRef) ->
timer:cancel(TRef).

reset_ttl(State) ->
State#state{ttl=?TTL}.

0 comments on commit bbef4ed

Please sign in to comment.