Skip to content

Commit

Permalink
Implement dbets:match/2 and dbets:select/2
Browse files Browse the repository at this point in the history
  • Loading branch information
krestenkrab committed Mar 24, 2011
1 parent 5ed2c2b commit 9c8c1bc
Showing 1 changed file with 127 additions and 31 deletions.
158 changes: 127 additions & 31 deletions src/dbets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,52 @@
%%@doc
%% DB-ETS is an ETS-lookalike, based on the Erlang Berkeley DB API
%%@end
-export([open/2,insert/2,lookup/2,close/1,close/2]).
-export([open/2,insert/2,lookup/2,close/1,close/2,match/2,select/2]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-record(db, {
env :: edbd_nifs:env(),
store :: edbd_nifs:db(),
env :: ebdb_nifs:env(),
store :: ebdb_nifs:db(),
keypos = 1 :: pos_integer(),
method = hash :: hash|btree,
duplicates = false :: boolean()
}).

process_options(Options, DB, Flags) ->
process_options(Options, DB, Flags, Method) ->
case Options of
[set|R] ->
process_options(R, DB#db{ method = hash, duplicates = false }, Flags);
[ordered_set|R] ->
process_options(R, DB#db{ method = btree, duplicates = false }, Flags);
process_options(R, DB#db{ duplicates = false }, Flags, hash);
[bag|R] ->
process_options(R, DB#db{ method = hash, duplicates = true }, Flags);
process_options(R, DB#db{ duplicates = true }, Flags, hash);
[ordered_set|R] ->
process_options(R, DB#db{ duplicates = false }, Flags, btree);
[ordered_bag|R] ->
process_options(R, DB#db{ method = btree, duplicates = false }, Flags);
process_options(R, DB#db{ duplicates = false }, Flags, btree);
[{keypos,N}|R] ->
process_options(R, DB#db{ keypos=N }, Flags);
process_options(R, DB#db{ keypos=N }, Flags, Method);

%%
%% pass-thru for BerkeleyDB options
%%
[OPT|R] when is_atom(OPT) ->
process_options(R, DB, [OPT | Flags ] );
process_options(R, DB, [OPT | Flags ], Method );
[{OPT,true}|R] when is_atom(OPT) ->
process_options(R, DB, [OPT | Flags ] );
process_options(R, DB, [OPT | Flags ], Method );
[{OPT,false}|R] when is_atom(OPT) ->
process_options(R, DB, [E || E <- Flags, E =/= OPT ] );
process_options(R, DB, [E || E <- Flags, E =/= OPT ], Method );

[] ->
{DB, Flags}
{DB, Flags, Method}
end.

open(Directory, Options) ->
DefaultFlags = [create,init_txn,recover,init_mpool,thread],
{DB,Flags} = process_options(Options, #db{}, DefaultFlags),
{ok, Env} = edbd_nifs:db_open_env(Directory,
{DB,Flags,Method} = process_options(Options, #db{}, DefaultFlags, hash),
{ok, Env} = ebdb_nifs:db_open_env(Directory,
Flags),
{ok, Store} = edbd_nifs:db_open(Env, undefined, "data.db", DB#db.method, [create,thread]),
{ok, Store} = ebdb_nifs:db_open(Env, undefined, "data.db", Method, DB#db.duplicates, [create,thread]),
{ok, DB#db{ env=Env, store=Store }}.

close(#db{}=DB) ->
Expand All @@ -78,13 +81,13 @@ close(#db{}=DB, nosync) ->
insert(#db{ keypos=KeyIndex, store=Store }, Tuple) ->
Key = element(KeyIndex, Tuple),
BinKey = sext:encode(Key),
edbd_nifs:db_put(Store, undefined, BinKey, term_to_binary(Tuple), [auto_commit]).
ebdb_nifs:db_put(Store, undefined, BinKey, term_to_binary(Tuple), []).

lookup(#db{ duplicates=Dups }=DB, Key) ->
BinKey = sext:encode(Key),
case Dups of
false ->
case edbd_nifs:db_get(DB#db.store, undefined, BinKey, []) of
case ebdb_nifs:db_get(DB#db.store, undefined, BinKey, []) of
{ok, BinTuple} ->
[binary_to_term(BinTuple)];
{error, notfound} ->
Expand All @@ -98,34 +101,127 @@ lookup(#db{ duplicates=Dups }=DB, Key) ->
end.

lookup_duplicates(DB,BinKey) ->
{ok, TX} = edbd_nifs:txn_begin(DB#db.env, [read_committed]),
{ok, TX} = ebdb_nifs:txn_begin(DB#db.env, [read_committed]),
try
{ok, Cursor} = edbd_nifs:cursor_open(DB#db.store, TX, []),
try edbd_nifs:cursor_set(Cursor, BinKey) of
{ok, BinTuple} ->
lookup_next_dups(Cursor, [binary_to_term(BinTuple)]);
{ok, Cursor} = ebdb_nifs:cursor_open(DB#db.store, TX, []),
try ebdb_nifs:cursor_get(Cursor, BinKey, [set]) of
{ok, _, BinTuple} ->
lookup_next_dups(Cursor, BinKey, [binary_to_term(BinTuple)]);
{error, notfound} ->
[];
{error, _} = E ->
E
after
ok = edbd_nifs:cursor_close(Cursor)
ok = ebdb_nifs:cursor_close(Cursor)
end
after
%% just abort it, it's a read-only txn anyway
edbd_nifs:txn_abort(TX)
ebdb_nifs:txn_abort(TX)
end.

lookup_next_dups(Cursor, Rest) ->
case edbd_nifs:cursor_next_dup(Cursor) of
{ok, BinTuple} ->
lookup_next_dups(Cursor, [binary_to_term(BinTuple) | Rest]);
lookup_next_dups(Cursor, BinKey, Rest) ->
case ebdb_nifs:cursor_get(Cursor, BinKey, [next_dup]) of
{ok, _, BinTuple} ->
lookup_next_dups(Cursor, BinKey, [binary_to_term(BinTuple) | Rest]);
{error, notfound} ->
Rest;
{error, keyempty} ->
Rest;
{error, _} = E ->
E
end.

match(DB, Pattern) ->
select(DB, [{Pattern,[],['$$']}]).

select(#db{keypos=KeyIndex}=DB, MatchSpec) ->

lists:foldl(fun({Pattern,_,_}=MSE, Acc) ->
CMS = ets:match_spec_compile([MSE]),
KeyPattern = element(KeyIndex, Pattern),
KeyPrefix = sext:prefix(KeyPattern),

fold(fun(Tuple, Acc0) ->
case ets:match_spec_run([Tuple], CMS) of
[] -> Acc0;
[Res] -> [Res|Acc0]
end
end,
Acc,
KeyPrefix,
DB)
end,
[],
MatchSpec).

%%@doc
%% fold/4 folds over all elements with a given prefix
%%@end
fold(Fun,Acc,KeyPrefix,#db{}=DB) when is_binary(KeyPrefix) ->
PrefixLen = byte_size(KeyPrefix),
{ok, TX} = ebdb_nifs:txn_begin(DB#db.env, [read_committed]),
try
{ok, Cursor} = ebdb_nifs:cursor_open(DB#db.store, TX, []),
try ebdb_nifs:cursor_get(Cursor, KeyPrefix, [set_range]) of
{ok, <<KeyPrefix:PrefixLen/binary, _/binary>>=BinKey, BinValue} ->
Value = binary_to_term(BinValue),
Acc1 = Fun(Value,Acc),
case DB#db.duplicates of
true ->
Dups = lookup_next_dups(Cursor, BinKey, []),
Acc2 = lists:foldl(Fun, Acc1, Dups);
false ->
Acc2 = Acc1
end,

fold_prefix_next(DB, Fun, Cursor, KeyPrefix, PrefixLen, Acc2);

{ok, _, _} ->
[];
{error, notfound} ->
[];
{error, keyempty} ->
[];
{error, _} = E ->
E
after
ok = ebdb_nifs:cursor_close(Cursor)
end
after
%% just abort it, it's a read-only txn anyway
ebdb_nifs:txn_abort(TX)
end.


fold_prefix_next(DB, Fun, Cursor, KeyPrefix, PrefixLen, Acc) ->
case ebdb_nifs:cursor_get(Cursor, <<>>, [next]) of
{ok, <<KeyPrefix:PrefixLen/binary, _/binary>>=BinKey, BinValue} ->
Value = binary_to_term(BinValue),
Acc1 = Fun(Value,Acc),

case DB#db.duplicates of
true ->
Dups = lookup_next_dups(Cursor, BinKey, []),
Acc2 = lists:foldl(Fun, Acc1, Dups);
false ->
Acc2 = Acc1
end,

fold_prefix_next(DB, Fun, Cursor, KeyPrefix, PrefixLen, Acc2);
{ok, _, _} ->
Acc;
{error, notfound} ->
Acc;
{error, keyempty} ->
Acc
end.



-ifdef(TEST).

%% ready for testing!


-endif.

0 comments on commit 9c8c1bc

Please sign in to comment.