From bbef4eda56de8610c5f0c174f71bcedb5dd0470f Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Sat, 12 Mar 2011 17:51:32 +0000 Subject: [PATCH] restore underflow check, remove ttl. --- src/monic_file.erl | 62 +++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 37 deletions(-) diff --git a/src/monic_file.erl b/src/monic_file.erl index de91b86..cc8cf17 100644 --- a/src/monic_file.erl +++ b/src/monic_file.erl @@ -22,17 +22,13 @@ %% gen_server API -export([init/1, terminate/2, code_change/3,handle_call/3, handle_cast/2, handle_info/2]). --define(TTL, 2). - -record(state, { - ttl=?TTL, tid, index_fd=nil, main_fd=nil, next_index, next_key, reset_pos, - tref=nil, write_pos, writer=nil }). @@ -82,13 +78,11 @@ init(Path) -> {ok, IndexFd, Hints} -> case load_main(Tid, Path, Hints) of {ok, MainFd, {NextKey, NextLocation}} -> - {ok, TRef} = timer:apply_interval(10000, gen_server, cast, [self(), ttl]), {ok, #state{ index_fd=IndexFd, main_fd=MainFd, next_key=NextKey, reset_pos=NextLocation, - tref=TRef, write_pos=NextLocation, tid=Tid }}; @@ -109,9 +103,9 @@ handle_call({start_writing, Size}, _From, Index = #index{cookie=Cookie, key=Key, flags=Flags, location=Pos, size=Size, version=Version}, case monic_utils:pwrite_header(MainFd, Pos, Header) of ok -> - {reply, {ok, Ref}, reset_ttl(State#state{next_index=Index, write_pos=Pos + ?HEADER_SIZE, writer=Ref})}; + {reply, {ok, Ref}, State#state{next_index=Index, write_pos=Pos + ?HEADER_SIZE, writer=Ref}}; Else -> - {reply, Else, reset_ttl(abandon_write(State))} + {reply, Else, abandon_write(State)} end; handle_call({start_writing, _Size}, _From, State) -> {reply, {error, already_writing}, State}; @@ -122,20 +116,26 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, write_pos=Pos, Pos1 = Pos + iolist_size(Bin), case Next of done -> - case file:datasync(Fd) of - ok -> - Index = State#state.next_index, - monic_utils:write_index(State#state.index_fd, Index), - ets:insert(State#state.tid, {Index#index.key, Index#index.cookie, - Index#index.location, Index#index.size, Index#index.version}), - {reply, {ok, {Index#index.key, Index#index.cookie}}, - reset_ttl(State#state{next_index=nil, next_key=State#state.next_key+1, - reset_pos=Pos1, write_pos=Pos1, writer=nil})}; - Else -> - {reply, Else, abandon_write(State)} + Len = Pos1 - State#state.reset_pos, + case Len < (State#state.next_index)#index.size of + true -> + {reply, {error, underflow}, abandon_write(State)}; + false -> + case file:datasync(Fd) of + ok -> + Index = State#state.next_index, + monic_utils:write_index(State#state.index_fd, Index), + ets:insert(State#state.tid, {Index#index.key, Index#index.cookie, + Index#index.location, Index#index.size, Index#index.version}), + {reply, {ok, {Index#index.key, Index#index.cookie}}, + State#state{next_index=nil, next_key=State#state.next_key+1, + reset_pos=Pos1, write_pos=Pos1, writer=nil}}; + Else -> + {reply, Else, abandon_write(State)} + end end; Next -> - {reply, {continue, Next}, reset_ttl(State#state{write_pos=Pos1})} + {reply, {continue, Next}, State#state{write_pos=Pos1}} end; {Else, _} -> {reply, Else, abandon_write(State)} @@ -147,23 +147,19 @@ handle_call({read, Key, Cookie}, _From, #state{tid=Tid}=State) -> case info_int(Tid, Key, Cookie) of {ok, {Location, Size, _Version}} -> Self = self(), - {reply, {ok, fun() -> stream_out(Self, Location + ?HEADER_SIZE, Size) end}, reset_ttl(State)}; + {reply, {ok, fun() -> stream_out(Self, Location + ?HEADER_SIZE, Size) end}, State}; Else -> {reply, Else, State} end; handle_call({read_hunk, Location, Size}, _From, #state{main_fd=Fd}=State) -> - {reply, file:pread(Fd, Location, Size), reset_ttl(State)}; + {reply, file:pread(Fd, Location, Size), State}; handle_call({info, Key, Cookie}, _From, #state{tid=Tid}=State) -> - {reply, info_int(Tid, Key, Cookie), reset_ttl(State)}; + {reply, info_int(Tid, Key, Cookie), State}; handle_call(close, _From, State) -> {stop, normal, ok, cleanup(State)}. -handle_cast(ttl, #state{ttl=0}=State) -> - {stop, normal, cleanup(State)}; -handle_cast(ttl, #state{ttl=Ttl}=State) -> - {noreply, State#state{ttl=Ttl-1}}; handle_cast(_Msg, State) -> {noreply, State}. @@ -277,12 +273,11 @@ info_int(Tid, Key, Cookie) -> {error, not_found} end. -cleanup(#state{tid=Tid,tref=TRef,index_fd=IndexFd,main_fd=MainFd}=State) -> +cleanup(#state{tid=Tid,index_fd=IndexFd,main_fd=MainFd}=State) -> close_int(IndexFd), close_int(MainFd), close_ets(Tid), - cancel_timer(TRef), - State#state{tid=nil,tref=nil,index_fd=nil,main_fd=nil}. + State#state{tid=nil,index_fd=nil,main_fd=nil}. close_int(nil) -> ok; @@ -294,10 +289,3 @@ close_ets(nil) -> close_ets(Tid) -> ets:delete(Tid). -cancel_timer(nil) -> - ok; -cancel_timer(TRef) -> - timer:cancel(TRef). - -reset_ttl(State) -> - State#state{ttl=?TTL}.