Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

Commit

Permalink
use variable length headers and footers, allow client to specify key.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Newson committed Mar 13, 2011
1 parent a7a745d commit bdb22a4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 116 deletions.
11 changes: 4 additions & 7 deletions include/monic.hrl
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
-define(BUFFER_SIZE, 64*1024).
-define(INDEX_SIZE, 32).
-define(HEADER_SIZE, 28).
-define(FOOTER_SIZE, 24).

-record(index, {
key,
cookie,
location,
size,
version, %% last_modified,
flags
last_modified,
deleted=false
}).

-record(header, {
key,
cookie,
size,
version, %% last_modified,
flags
last_modified,
deleted=false
}).

-record(footer, {
Expand Down
107 changes: 54 additions & 53 deletions src/monic_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-include("monic.hrl").

%% public API
-export([open/1, open_new/1, close/1, add/3, info/3, read/3]).
-export([open/1, open_new/1, close/1, add/4, info/3, read/3]).

%% gen_server API
-export([init/1, terminate/2, code_change/3,handle_call/3, handle_cast/2, handle_info/2]).
Expand All @@ -26,9 +26,8 @@
data_start_pos,
tid,
index_fd=nil,
main_fd=nil,
main_fd=nil,
next_index,
next_key,
reset_pos,
write_pos,
writer=nil
Expand All @@ -52,8 +51,8 @@ open_new(Path) ->
close(Pid) ->
gen_server:call(Pid, close, infinity).

add(Pid, Size, StreamBody) ->
case gen_server:call(Pid, {start_writing, Size}) of
add(Pid, Key, Size, StreamBody) when is_binary(Key) andalso is_integer(Size) ->
case gen_server:call(Pid, {start_writing, Key, Size}) of
{ok, Ref} ->
stream_in(Pid, Ref, StreamBody);
Else ->
Expand All @@ -76,15 +75,14 @@ read(Pid, Key, Cookie) ->
init(Path) ->
Tid = ets:new(index, [set, private]),
case load_index(Tid, Path) of
{ok, IndexFd, Hints} ->
case load_main(Tid, Path, Hints) of
{ok, MainFd, {NextKey, NextLocation}} ->
{ok, IndexFd, LastLoc} ->
case load_main(Tid, Path, LastLoc) of
{ok, MainFd, Eof} ->
{ok, #state{
index_fd=IndexFd,
main_fd=MainFd,
next_key=NextKey,
reset_pos=NextLocation,
write_pos=NextLocation,
reset_pos=Eof,
write_pos=Eof,
tid=Tid
}};
Else ->
Expand All @@ -94,18 +92,17 @@ init(Path) ->
{stop, Else}
end.

handle_call({start_writing, Size}, _From,
#state{main_fd=MainFd, next_key=Key, write_pos=Pos, writer=nil}=State) ->
handle_call({start_writing, Key, Size}, _From,
#state{main_fd=MainFd, write_pos=Pos, writer=nil}=State) ->
Ref = make_ref(),
Cookie = monic_utils:new_cookie(),
Flags = <<0:16>>,
Version = 1,
Header = #header{cookie=Cookie, flags=Flags, key=Key, size=Size, version=Version},
Index = #index{cookie=Cookie, key=Key, flags=Flags, location=Pos, size=Size, version=Version},
case monic_utils:pwrite_header(MainFd, Pos, Header) of
ok ->
{reply, {ok, Ref}, State#state{data_start_pos=Pos+?HEADER_SIZE,
next_index=Index, write_pos=Pos + ?HEADER_SIZE, writer=Ref}};
LastModified = now(),
Header = #header{cookie=Cookie, key=Key, size=Size, last_modified=LastModified},
Index = #index{cookie=Cookie, key=Key, location=Pos, size=Size, last_modified=LastModified},
case monic_utils:pwrite_term(MainFd, Pos, Header) of
{ok, HeaderSize} ->
{reply, {ok, Ref}, State#state{data_start_pos=Pos + HeaderSize,
next_index=Index, write_pos=Pos + HeaderSize, writer=Ref}};
Else ->
{reply, Else, abandon_write(State)}
end;
Expand All @@ -128,12 +125,13 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, write_pos=Pos,
{done, ok} ->
case file:datasync(Fd) of
ok ->
monic_utils:write_index(State#state.index_fd, Index),
{ok, IndexPos} = file:position(Fd, cur), %% TODO track this in state.
monic_utils:pwrite_term(State#state.index_fd, IndexPos, Index),
ets:insert(State#state.tid, {Index#index.key, Index#index.cookie,
Index#index.location, Index#index.size, Index#index.version}),
{reply, {ok, {Index#index.key, Index#index.cookie}},
State#state{next_index=nil, next_key=State#state.next_key+1,
reset_pos=Pos + Size, write_pos=Pos + Size, writer=nil}};
Index#index.location, Index#index.size, Index#index.last_modified}),
{reply, {ok, Index#index.cookie},
State#state{next_index=nil, reset_pos=Pos + Size,
write_pos=Pos + Size, writer=nil}};
Else ->
{reply, Else, abandon_write(State)}
end;
Expand All @@ -145,11 +143,12 @@ handle_call({write, Ref, {Bin, Next}}, _From, #state{main_fd=Fd, write_pos=Pos,
handle_call({write, _Ref, _StreamBody}, _From, State) ->
{reply, {error, not_writing}, State};

handle_call({read, Key, Cookie}, _From, #state{tid=Tid}=State) ->
handle_call({read, Key, Cookie}, _From, #state{main_fd=Fd, tid=Tid}=State) ->
case info_int(Tid, Key, Cookie) of
{ok, {Location, Size, _Version}} ->
{ok, {Location, Size, _LastModified}} ->
{ok, HeaderSize, _} = monic_utils:pread_term(Fd, Location),
Self = self(),
{reply, {ok, fun() -> stream_out(Self, Location + ?HEADER_SIZE, Size) end}, State};
{reply, {ok, fun() -> stream_out(Self, Location + HeaderSize, Size) end}, State};
Else ->
{reply, Else, State}
end;
Expand Down Expand Up @@ -177,11 +176,11 @@ code_change(_OldVsn, State, _Extra) ->
%% private functions

load_index(Tid, Path) ->
case file:open(Path ++ ".idx", [binary, raw, read, write, append]) of
case file:open(Path ++ ".idx", [binary, raw, read, write]) of
{ok, Fd} ->
case load_index_items(Tid, Fd) of
{ok, Hints} ->
{ok, Fd, Hints};
{ok, LastLoc} ->
{ok, Fd, LastLoc};
Else ->
Else
end;
Expand All @@ -190,45 +189,47 @@ load_index(Tid, Path) ->
end.

load_index_items(Tid, Fd) ->
load_index_items(Tid, Fd, {0, 0}).
load_index_items(Tid, Fd, 0, 0).

load_index_items(Tid, Fd, Hints) ->
case monic_utils:read_index(Fd) of
{ok, #index{key=Key,cookie=Cookie,location=Location,size=Size,version=Version,flags= <<Deleted:1,_:15>>}} ->
load_index_items(Tid, Fd, IndexLocation, LastLoc) ->
case monic_utils:pread_term(Fd, IndexLocation) of
{ok, IndexSize, #index{key=Key,cookie=Cookie,location=Location,size=Size,
last_modified=LastModified,deleted=Deleted}} ->
case Deleted of
0 -> ets:insert(Tid, {Key, Cookie, Location, Size, Version});
1 -> ets:delete(Tid, Key)
false -> ets:insert(Tid, {Key, Cookie, Location, Size, LastModified});
true -> ets:delete(Tid, Key)
end,
load_index_items(Tid, Fd, {Key + 1, Location + Size + ?HEADER_SIZE + ?FOOTER_SIZE});
load_index_items(Tid, Fd, IndexLocation + IndexSize, Location);
eof ->
{ok, Hints};
{ok, LastLoc};
Else ->
Else
end.

load_main(Tid, Path, Hints) ->
load_main(Tid, Path, LastLoc) ->
case file:open(Path, [binary, raw, read, write]) of
{ok, Fd} ->
case load_main_items(Tid, Fd, Hints) of
{ok, Hints} ->
{ok, Fd, Hints};
case load_main_items(Tid, Fd, LastLoc) of
{ok, Eof} ->
{ok, Fd, Eof};
Else ->
Else
end;
Else ->
Else
end.

load_main_items(Tid, Fd, {_, Location}=Hints) ->
case monic_utils:pread_header(Fd, Location) of
{ok, #header{key=Key,cookie=Cookie,size=Size,version=Version,flags= <<Deleted:1,_:15>>}} ->
load_main_items(Tid, Fd, Location) ->
case monic_utils:pread_term(Fd, Location) of
{ok, HeaderSize, #header{key=Key,cookie=Cookie,size=Size,last_modified=LastModified,deleted=Deleted}} ->
case Deleted of
0 -> ets:insert(Tid, {Key, Cookie, Location, Size, Version});
1 -> ets:delete(Tid, Key)
false -> ets:insert(Tid, {Key, Cookie, Location, Size, LastModified});
true -> ets:delete(Tid, Key)
end,
load_main_items(Tid, Fd, {Key + 1, Location + Size + ?HEADER_SIZE + ?FOOTER_SIZE});
{ok, FooterSize, _} = monic_utils:pread_term(Fd, Location + HeaderSize + Size),
load_main_items(Tid, Fd, Location + HeaderSize + Size + FooterSize);
eof ->
{ok, Hints};
{ok, Location};
Else ->
Else
end.
Expand All @@ -244,8 +245,8 @@ abandon_write(#state{main_fd=Fd, reset_pos=Pos}=State) ->

stream_in(Pid, Ref, StreamBody) ->
case gen_server:call(Pid, {write, Ref, StreamBody}, infinity) of
{ok, Key, Cookie} ->
{ok, Key, Cookie};
{ok, Cookie} ->
{ok, Cookie};
{continue, Next} ->
stream_in(Pid, Ref, Next());
Else ->
Expand Down
67 changes: 22 additions & 45 deletions src/monic_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

-module(monic_utils).
-export([path/2, exists/2, open/2]).
-export([pread_header/2, pwrite_header/3]).
-export([pread_footer/2, pwrite_footer/3]).
-export([read_index/1, write_index/2]).
-export([pwrite_term/3, pread_term/2]).
-export([new_cookie/0]).

-include("monic.hrl").

-define(ITEM_HEADER_MAGIC, 16#0f0f0f0f).
-define(ITEM_FOOTER_MAGIC, 16#7f7f7f7f).

-define(MAX_TERM, (1 bsl 16)).

path(ReqData, Context) ->
Root = proplists:get_value(root, Context, "tmp"),
File = wrq:path_info(file, ReqData),
Expand All @@ -41,50 +41,27 @@ open(ReqData, Context) ->
exists(ReqData, Context) ->
filelib:is_file(path(ReqData, Context)).

pwrite_header(Fd, Location, #header{key=Key,cookie=Cookie,size=Size,
version=Version,flags=Flags}) ->
Bin = <<?ITEM_HEADER_MAGIC:32/integer, Key:64/integer, Cookie:32/integer,
Size:64/integer, Version:16/integer, Flags:16/bitstring>>,
file:pwrite(Fd, Location, Bin).

pread_header(Fd, Location) ->
case file:pread(Fd, Location, ?HEADER_SIZE) of
{ok, <<?ITEM_HEADER_MAGIC:32/integer, Key:64/integer, Cookie:32/integer,
Size:64/integer, Version:16/integer, Flags:16/bitstring>>} ->
{ok, #header{key=Key,cookie=Cookie,size=Size,version=Version,flags=Flags}};
{ok, _} ->
{error, invalid_header};
Else ->
Else
end.

pwrite_footer(Fd, Location, #footer{sha=Sha}) ->
Bin = <<?ITEM_FOOTER_MAGIC:32/integer, Sha:20/binary>>,
file:pwrite(Fd, Location, Bin).

pread_footer(Fd, Location) ->
case file:pread(Fd, Location, ?FOOTER_SIZE) of
{ok, <<?ITEM_FOOTER_MAGIC:32/integer, Sha:20/binary>>} ->
{ok, #footer{sha=Sha}};
{ok, _} ->
{error, invalid_footer};
Else ->
Else
-spec pwrite_term(term(), integer(), term()) -> {ok, integer()} | {error, term()}.
pwrite_term(Fd, Location, Term) ->
Bin = term_to_binary(Term),
Size = iolist_size(Bin),
case Size =< ?MAX_TERM of
true ->
case file:pwrite(Fd, Location, <<Size:16/integer, Bin/binary>>) of
ok ->
{ok, Size + 2};
Else ->
Else
end;
false ->
{error, term_too_long}
end.

write_index(Fd, #index{cookie=Cookie,key=Key,location=Location,size=Size,version=Version,flags=Flags}) ->
Bin = <<Key:64/integer, Cookie:32/integer, Location:64/integer, Size:64/integer,
Version:16/integer, Flags:16/bitstring>>,
file:write(Fd, Bin).

read_index(Fd) ->
case file:read(Fd, ?INDEX_SIZE) of
{ok, <<Key:64/integer, Cookie:32/integer, Location:64/integer, Size:64/integer,
Version:16/integer, Flags:16/bitstring>>} ->
{ok, #index{cookie=Cookie,key=Key,location=Location,size=Size,
version=Version,flags=Flags}};
{ok, _} ->
{error, invalid_index};
-spec pread_term(term(), integer()) -> {ok, integer(), binary()} | eof | {error, term()}.
pread_term(Fd, Location) ->
case file:pread(Fd, Location, ?MAX_TERM) of
{ok, <<Size:16/integer, Bin/binary>>} ->
{ok, Size + 2, binary_to_term(<<Bin:Size/binary>>)};
Else ->
Else
end.
Expand Down
21 changes: 10 additions & 11 deletions test/monic_file_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,28 @@ add_single_hunk(Pid) ->
{"add an item in one hunk",
fun() ->
StreamBody = {<<"123">>, done},
Result = monic_file:add(Pid, 3, StreamBody),
?assertMatch({ok, _}, Result),
{ok, {Key, Cookie}} = Result,
?assertMatch({ok, StreamBody}, monic_file:read(Pid, Key, Cookie))
Result = monic_file:add(Pid, <<"foo">>, 3, StreamBody),
{ok, Cookie} = Result,
?assertMatch({ok, StreamBody}, monic_file:read(Pid, <<"foo">>, Cookie))
end}.

add_multi_hunk(Pid) ->
{"add an item in multiple hunks",
fun() ->
StreamBody = {<<"123">>, fun() -> {<<"456">>, done} end},
?assertMatch({ok, _}, monic_file:add(Pid, 6, StreamBody))
?assertMatch({ok, _}, monic_file:add(Pid, <<"foo">>, 6, StreamBody))
end}.

add_multi_items(Pid) ->
[?_assertMatch({ok, _}, monic_file:add(Pid, 3, {<<"123">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, 3, {<<"456">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, 3, {<<"789">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, 3, {<<"abc">>, done}))].
[?_assertMatch({ok, _}, monic_file:add(Pid, <<"foo">>, 3, {<<"123">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, <<"bar">>, 3, {<<"456">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, <<"baz">>, 3, {<<"789">>, done})),
?_assertMatch({ok, _}, monic_file:add(Pid, <<"foobar">>, 3, {<<"abc">>, done}))].

overflow(Pid) ->
Res = monic_file:add(Pid, 3, {<<"1234">>, done}),
Res = monic_file:add(Pid, <<"foo">>, 3, {<<"1234">>, done}),
?_assertEqual({error, overflow}, Res).

underflow(Pid) ->
Res = monic_file:add(Pid, 3, {<<"12">>, done}),
Res = monic_file:add(Pid, <<"foo">>, 3, {<<"12">>, done}),
?_assertEqual({error, underflow}, Res).

0 comments on commit bdb22a4

Please sign in to comment.