Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
abstract ets-backing of index into separate module.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Newson committed Apr 7, 2011
1 parent 1c07b81 commit 4ff6ead
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 43 deletions.
88 changes: 45 additions & 43 deletions src/monic_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
%% gen_server API
-export([init/1, terminate/2, code_change/3,handle_call/3, handle_cast/2, handle_info/2]).

-define(INDEX_MODULE, monic_index_ets).

-record(state, {
tid,
index,
index_fd=nil,
last_write=nil,
main_fd=nil,
Expand Down Expand Up @@ -153,7 +155,7 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, next_header=Hea
case file:datasync(Fd) of
ok ->
monic_utils:write_term(State#state.index_fd, Header),
ets:insert(State#state.tid, Header),
?INDEX_MODULE:insert(State#state.index, Header),
{reply, ok, finish_write(Pos + Written + Size + FooterSize, State)};
Else ->
{reply, Else, abandon_write(State)}
Expand All @@ -170,8 +172,8 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, next_header=Hea
handle_call({write, _Ref, _StreamBody}, _From, State) ->
{reply, {error, not_writing}, State};

handle_call({read, Key, Cookie}, _From, #state{main_fd=Fd, tid=Tid}=State) ->
case info_int(Tid, Key, Cookie) of
handle_call({read, Key, Cookie}, _From, #state{main_fd=Fd, index=Index}=State) ->
case info_int(Index, Key, Cookie) of
{ok, {Location, Size, _LastModified}} ->
{ok, HeaderSize, _} = monic_utils:pread_term(Fd, Location),
Self = self(),
Expand All @@ -182,15 +184,15 @@ handle_call({read, Key, Cookie}, _From, #state{main_fd=Fd, tid=Tid}=State) ->
handle_call({read_hunk, Location, Size}, _From, #state{main_fd=Fd}=State) ->
{reply, file:pread(Fd, Location, Size), State};

handle_call({delete, Key, Cookie}, _From, #state{index_fd=IndexFd,main_fd=MainFd,reset_pos=Pos,tid=Tid}=State) ->
case info_int(Tid, Key, Cookie) of
handle_call({delete, Key, Cookie}, _From, #state{index_fd=IndexFd,main_fd=MainFd,reset_pos=Pos,index=Index}=State) ->
case info_int(Index, Key, Cookie) of
{ok, _} ->
case monic_utils:write_term(MainFd, #header{key=Key,deleted=true}) of
{ok, HeaderSize} ->
case file:datasync(MainFd) of
ok ->
monic_utils:write_term(IndexFd, #header{key=Key,cookie=Cookie,deleted=true}),
true = ets:delete(Tid, Key),
true = ?INDEX_MODULE:delete(Index, Key),
{reply, ok, State#state{reset_pos=Pos+HeaderSize}};
Else ->
{reply, Else, abandon_write(State)}
Expand All @@ -202,8 +204,8 @@ handle_call({delete, Key, Cookie}, _From, #state{index_fd=IndexFd,main_fd=MainFd
{reply, Else, abandon_write(State)}
end;

handle_call({info, Key, Cookie}, _From, #state{tid=Tid}=State) ->
{reply, info_int(Tid, Key, Cookie), State};
handle_call({info, Key, Cookie}, _From, #state{index=Index}=State) ->
{reply, info_int(Index, Key, Cookie), State};

handle_call(close, _From, State) ->
{stop, normal, ok, cleanup(State)};
Expand Down Expand Up @@ -238,10 +240,10 @@ code_change(_OldVsn, State, _Extra) ->

%% private functions

load_index(Tid, Path) ->
load_index(Index, Path) ->
case file:open(Path ++ ".idx", [binary, raw, read, append]) of
{ok, Fd} ->
case load_index_items(Tid, Fd) of
case load_index_items(Index, Fd) of
{ok, LastLoc} ->
{ok, Fd, LastLoc};
Else ->
Expand All @@ -251,27 +253,27 @@ load_index(Tid, Path) ->
Else
end.

load_index_items(Tid, Fd) ->
load_index_items(Tid, Fd, 0, 0).
load_index_items(Index, Fd) ->
load_index_items(Index, Fd, 0, 0).

load_index_items(Tid, Fd, IndexLocation, Eof) ->
load_index_items(Index, Fd, IndexLocation, Eof) ->
case monic_utils:pread_term(Fd, IndexLocation) of
{ok, IndexSize, #header{deleted=true,key=Key}} ->
ets:delete(Tid, Key),
load_index_items(Tid, Fd, IndexLocation + IndexSize, Eof);
?INDEX_MODULE:delete(Index, Key),
load_index_items(Index, Fd, IndexLocation + IndexSize, Eof);
{ok, IndexSize, #header{deleted=false,location=Location}=Header} ->
ets:insert(Tid, Header),
load_index_items(Tid, Fd, IndexLocation + IndexSize, Location);
?INDEX_MODULE:insert(Index, Header),
load_index_items(Index, Fd, IndexLocation + IndexSize, Location);
eof ->
{ok, Eof};
Else ->
Else
end.

load_main(Tid, Path, IndexFd, Eof) ->
load_main(Index, Path, IndexFd, Eof) ->
case file:open(Path, [binary, raw, read, append]) of
{ok, MainFd} ->
case load_main_items(Tid, MainFd, IndexFd, Eof) of
case load_main_items(Index, MainFd, IndexFd, Eof) of
{ok, Eof1} ->
{ok, MainFd, Eof1};
Else ->
Expand All @@ -281,19 +283,19 @@ load_main(Tid, Path, IndexFd, Eof) ->
Else
end.

load_main_items(Tid, MainFd, IndexFd, Location) ->
load_main_items(Index, MainFd, IndexFd, Location) ->
case monic_utils:pread_term(MainFd, Location) of
{ok, HeaderSize, #header{deleted=true,key=Key}=Header} ->
ets:delete(Tid, Key),
?INDEX_MODULE:delete(Index, Key),
monic_utils:write_term(IndexFd, Header),
load_main_items(Tid, MainFd, IndexFd, Location + HeaderSize);
load_main_items(Index, MainFd, IndexFd, Location + HeaderSize);
{ok, HeaderSize, #header{deleted=false,size=Size}=Header} ->
Header1 = Header#header{location=Location},
ets:insert(Tid, Header1),
?INDEX_MODULE:insert(Index, Header1),
monic_utils:write_term(IndexFd, Header1),
case monic_utils:pread_term(MainFd, Location + HeaderSize + Size) of
{ok, FooterSize, _} ->
load_main_items(Tid, MainFd, IndexFd, Location + HeaderSize + Size + FooterSize);
load_main_items(Index, MainFd, IndexFd, Location + HeaderSize + Size + FooterSize);
eof ->
truncate(MainFd, Location),
{ok, Location}
Expand Down Expand Up @@ -378,42 +380,42 @@ stream_out(Pid, Location, Remaining) ->
{<<>>, done}
end.

info_int(Tid, Key, Cookie) ->
case ets:lookup(Tid, Key) of
info_int(Index, Key, Cookie) ->
case ?INDEX_MODULE:lookup(Index, Key) of
[#header{cookie=Cookie,location=Location,size=Size,last_modified=LastModified}] ->
{ok, {Location, Size, LastModified}};
_ ->
{error, not_found}
end.

cleanup(#state{tid=Tid,index_fd=IndexFd,main_fd=MainFd}=State) ->
cleanup(#state{index=Index,index_fd=IndexFd,main_fd=MainFd}=State) ->
close_int(IndexFd),
close_int(MainFd),
close_ets(Tid),
State#state{tid=nil,index_fd=nil,main_fd=nil}.
close_index(Index),
State#state{index=nil,index_fd=nil,main_fd=nil}.

close_int(nil) ->
ok;
close_int(Fd) ->
file:close(Fd).

close_ets(nil) ->
close_index(nil) ->
ok;
close_ets(Tid) ->
ets:delete(Tid).
close_index(Index) ->
?INDEX_MODULE:delete(Index).

init_int(Path) ->
Tid = ets:new(index, [{keypos, #header.key}, set, private]),
case load_index(Tid, Path) of
Index = ?INDEX_MODULE:new(),
case load_index(Index, Path) of
{ok, IndexFd, LastLoc} ->
case load_main(Tid, Path, IndexFd, LastLoc) of
case load_main(Index, Path, IndexFd, LastLoc) of
{ok, MainFd, Eof} ->
{ok, #state{
index_fd=IndexFd,
main_fd=MainFd,
path=Path,
reset_pos=Eof,
tid=Tid
index=Index
}};
Else ->
Else
Expand All @@ -422,10 +424,10 @@ init_int(Path) ->
Else
end.

compact_int(#state{index_fd=IndexFd, main_fd=MainFd, path=Path, tid=Tid}) ->
compact_int(#state{index_fd=IndexFd, main_fd=MainFd, path=Path, index=Index}) ->
case file:open(Path ++ ".compact", [binary, raw, append]) of
{ok, CompactFd} ->
case compact_int(IndexFd, MainFd, CompactFd, Tid, 0) of
case compact_int(IndexFd, MainFd, CompactFd, Index, 0) of
ok ->
case file:datasync(CompactFd) of
ok ->
Expand All @@ -441,10 +443,10 @@ compact_int(#state{index_fd=IndexFd, main_fd=MainFd, path=Path, tid=Tid}) ->
Else
end.

compact_int(IndexFd, MainFd, CompactFd, Tid, IndexLocation) ->
compact_int(IndexFd, MainFd, CompactFd, Index, IndexLocation) ->
case monic_utils:pread_term(IndexFd, IndexLocation) of
{ok, IndexHeaderSize, #header{key=Key,location=OldLocation,size=Size}=Header} ->
case ets:lookup(Tid, Key) of
case ?INDEX_MODULE:lookup(Index, Key) of
[_] ->
case monic_utils:write_term(CompactFd, Header#header{location=nil}) of
{ok, CompactHeaderSize} ->
Expand All @@ -454,7 +456,7 @@ compact_int(IndexFd, MainFd, CompactFd, Tid, IndexLocation) ->
{ok, _, #footer{}=Footer} ->
case monic_utils:write_term(CompactFd, Footer) of
{ok, _} ->
compact_int(IndexFd, MainFd, CompactFd, Tid,
compact_int(IndexFd, MainFd, CompactFd, Index,
IndexLocation + IndexHeaderSize);
Else ->
Else
Expand All @@ -469,7 +471,7 @@ compact_int(IndexFd, MainFd, CompactFd, Tid, IndexLocation) ->
Else
end;
[] ->
compact_int(IndexFd, MainFd, CompactFd, Tid, IndexLocation + IndexHeaderSize)
compact_int(IndexFd, MainFd, CompactFd, Index, IndexLocation + IndexHeaderSize)
end;
eof ->
ok;
Expand Down
38 changes: 38 additions & 0 deletions src/monic_index_ets.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
%% Copyright 2011 Cloudant
%%
%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
%% use this file except in compliance with the License. You may obtain a copy of
%% the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations under
%% the License.

%% ETS-backed index. other implementations to follow.

-module(monic_index_ets).
-include("monic.hrl").

%% public API

-export([new/0, delete/1]).
-export([insert/2, delete/2, lookup/2]).

new() ->
ets:new(index, [{keypos, #header.key}, set, private]).

delete(Tid) ->
ets:delete(Tid).

insert(Tid, #header{}=Header) ->
ets:insert(Tid, Header).

delete(Tid, Key) ->
ets:delete(Tid, Key).

lookup(Tid, Key) ->
ets:lookup(Tid, Key).

0 comments on commit 4ff6ead

Please sign in to comment.