Permalink
Browse files

Finished the cleanup and integration of an erlang bloom implementation.

  • Loading branch information...
1 parent 95082bd commit ce8982c79c7658fa26214a8df9fba93aeacdec7a Gregory Burd committed Jul 12, 2012
Showing with 254 additions and 43 deletions.
  1. +0 −2 rebar.config
  2. +223 −0 src/bloom.erl
  3. +3 −2 src/hanoidb.erl
  4. +1 −1 src/hanoidb.hrl
  5. +6 −9 src/hanoidb_nursery.erl
  6. +7 −5 src/hanoidb_reader.erl
  7. +0 −9 src/hanoidb_util.erl
  8. +6 −7 src/hanoidb_writer.erl
  9. +8 −8 test/hanoidb_tests.erl
View
@@ -28,8 +28,6 @@
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
-% , {ebloom, "1.1.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}
- , {bloomerl, ".*", {git, "git://github.com/gburd/bloomerl.git", {branch, "master"}}}
% , {lz4, ".*", {git, "git://github.com/gburd/erlang-lz4.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}}
% , {asciiedoc, "0.1.*", {git, "git://github.com/norton/asciiedoc.git", {branch, "master"}}}
View
@@ -0,0 +1,223 @@
+%% The contents of this file are subject to the Erlang Public License, Version
+%% 1.1, (the "License"); you may not use this file except in compliance with
+%% the License. You should have received a copy of the Erlang Public License
+%% along with this software. If not, it can be retrieved via the world wide web
+%% at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS" basis,
+%% WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+%% the specific language governing rights and limitations under the License.
+
+%% Based on: Scalable Bloom Filters
+%% Paulo Sérgio Almeida, Carlos Baquero, Nuno Preguiça, David Hutchison
+%% Information Processing Letters
+%% Volume 101, Issue 6, 31 March 2007, Pages 255-261
+%%
+%% Provides scalable bloom filters that can grow indefinitely while ensuring a
+%% desired maximum false positive probability. Also provides standard
+%% partitioned bloom filters with a maximum capacity. Bit arrays are
+%% dimensioned as a power of 2 to enable reusing hash values across filters
+%% through bit operations. Double hashing is used (no need for enhanced double
+%% hashing for partitioned bloom filters).
+
+%% Modified slightly by Justin Sheehy to make it a single file (incorporated
+%% the array-based bitarray internally).
+-module(bloom).
+-author("Paulo Sergio Almeida <psa@di.uminho.pt>").
+
+-export([sbf/1, sbf/2, sbf/3, sbf/4,
+ bloom/1, bloom/2,
+ member/2, add/2,
+ size/1, capacity/1,
+ encode/1, decode/1]).
+-import(math, [log/1, pow/2]).
+
+-ifdef(TEST).
+-ifdef(EQC).
+-include_lib("eqc/include/eqc.hrl").
+-endif.
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+-define(W, 27).
+
+-record(bloom, {
+ e :: float(), % error probability
+ n :: non_neg_integer(), % maximum number of elements
+ mb :: non_neg_integer(), % 2^mb = m, the size of each slice (bitvector)
+ size :: non_neg_integer(), % number of elements
+ a :: [binary()] % list of bitvectors
+}).
+
+-record(sbf, {
+ e :: float(), % error probability
+ r :: float(), % error probability ratio
+ s :: non_neg_integer(), % log 2 of growth ratio
+ size :: non_neg_integer(), % number of elements
+ b :: [#bloom{}] % list of plain bloom filters
+}).
+
+%% Constructors for (fixed capacity) bloom filters
+%%
+%% N - capacity
+%% E - error probability
+bloom(N) -> bloom(N, 0.001).
+bloom(N, E) when is_number(N), N > 0,
+ is_float(E), E > 0, E < 1,
+ N >= 4/E -> % rule of thumb; due to double hashing
+ bloom(size, N, E);
+bloom(N, E) when is_number(N), N >= 0,
+ is_float(E), E > 0, E < 1 ->
+ bloom(bits, 32, E).
+
+bloom(Mode, N, E) ->
+ K = 1 + trunc(log2(1/E)),
+ P = pow(E, 1 / K),
+
+ Mb =
+ case Mode of
+ size ->
+ 1 + trunc(-log2(1 - pow(1 - P, 1 / N)));
+ bits ->
+ N
+ end,
+ M = 1 bsl Mb,
+ D = trunc(log(1-P) / log(1-1/M)),
+ #bloom{e=E, n=D, mb=Mb, size = 0,
+ a = [bitarray_new(1 bsl Mb) || _ <- lists:seq(1, K)]}.
+
+log2(X) -> log(X) / log(2).
+
+%% Constructors for scalable bloom filters
+%%
+%% N - initial capacity before expanding
+%% E - error probability
+%% S - growth ratio when full (log 2) can be 1, 2 or 3
+%% R - tightening ratio of error probability
+sbf(N) -> sbf(N, 0.001).
+sbf(N, E) -> sbf(N, E, 1).
+sbf(N, E, 1) -> sbf(N, E, 1, 0.85);
+sbf(N, E, 2) -> sbf(N, E, 2, 0.75);
+sbf(N, E, 3) -> sbf(N, E, 3, 0.65).
+sbf(N, E, S, R) when is_number(N), N > 0,
+ is_float(E), E > 0, E < 1,
+ is_integer(S), S > 0, S < 4,
+ is_float(R), R > 0, R < 1,
+ N >= 4/(E*(1-R)) -> % rule of thumb; due to double hashing
+ #sbf{e=E, s=S, r=R, size=0, b=[bloom(N, E*(1-R))]}.
+
+%% Returns number of elements
+%%
+size(#bloom{size=Size}) -> Size;
+size(#sbf{size=Size}) -> Size.
+
+%% Returns capacity
+%%
+capacity(#bloom{n=N}) -> N;
+capacity(#sbf{}) -> infinity.
+
+%% Test for membership
+%%
+member(Elem, #bloom{mb=Mb}=B) ->
+ Hashes = make_hashes(Mb, Elem),
+ hash_member(Hashes, B);
+member(Elem, #sbf{b=[H|_]}=Sbf) ->
+ Hashes = make_hashes(H#bloom.mb, Elem),
+ hash_member(Hashes, Sbf).
+
+hash_member(Hashes, #bloom{mb=Mb, a=A}) ->
+ Mask = 1 bsl Mb -1,
+ {I1, I0} = make_indexes(Mask, Hashes),
+ all_set(Mask, I1, I0, A);
+hash_member(Hashes, #sbf{b=B}) ->
+ lists:any(fun(X) -> hash_member(Hashes, X) end, B).
+
+make_hashes(Mb, E) when Mb =< 16 ->
+ erlang:phash2({E}, 1 bsl 32);
+make_hashes(Mb, E) when Mb =< 32 ->
+ {erlang:phash2({E}, 1 bsl 32), erlang:phash2([E], 1 bsl 32)}.
+
+make_indexes(Mask, {H0, H1}) when Mask > 1 bsl 16 -> masked_pair(Mask, H0, H1);
+make_indexes(Mask, {H0, _}) -> make_indexes(Mask, H0);
+make_indexes(Mask, H0) -> masked_pair(Mask, H0 bsr 16, H0).
+
+masked_pair(Mask, X, Y) -> {X band Mask, Y band Mask}.
+
+all_set(_Mask, _I1, _I, []) -> true;
+all_set(Mask, I1, I, [H|T]) ->
+ case bitarray_get(I, H) of
+ true -> all_set(Mask, I1, (I+I1) band Mask, T);
+ false -> false
+ end.
+
+%% Adds element to set
+%%
+add(Elem, #bloom{mb=Mb} = B) ->
+ Hashes = make_hashes(Mb, Elem),
+ hash_add(Hashes, B);
+add(Elem, #sbf{size=Size, r=R, s=S, b=[H|T]=Bs}=Sbf) ->
+ #bloom{mb=Mb, e=E, n=N, size=HSize} = H,
+ Hashes = make_hashes(Mb, Elem),
+ case hash_member(Hashes, Sbf) of
+ true -> Sbf;
+ false ->
+ case HSize < N of
+ true -> Sbf#sbf{size=Size+1, b=[hash_add(Hashes, H)|T]};
+ false ->
+ B = add(Elem, bloom(bits, Mb + S, E * R)),
+ Sbf#sbf{size=Size+1, b=[B|Bs]}
+ end
+ end.
+
+hash_add(Hashes, #bloom{mb=Mb, a=A, size=Size} = B) ->
+ Mask = 1 bsl Mb -1,
+ {I1, I0} = make_indexes(Mask, Hashes),
+ case all_set(Mask, I1, I0, A) of
+ true -> B;
+ false -> B#bloom{size=Size+1, a=set_bits(Mask, I1, I0, A, [])}
+ end.
+
+set_bits(_Mask, _I1, _I, [], Acc) -> lists:reverse(Acc);
+set_bits(Mask, I1, I, [H|T], Acc) ->
+ set_bits(Mask, I1, (I+I1) band Mask, T, [bitarray_set(I, H) | Acc]).
+
+bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
+
+bitarray_set(I, A) ->
+ AI = I div ?W,
+ V = array:get(AI, A),
+ V1 = V bor (1 bsl (I rem ?W)),
+ array:set(AI, V1, A).
+
+bitarray_get(I, A) ->
+ AI = I div ?W,
+ V = array:get(AI, A),
+ V band (1 bsl (I rem ?W)) =/= 0.
+
+encode(Bloom) ->
+ zlib:gzip(term_to_binary(Bloom)).
+
+decode(Bin) ->
+ binary_to_term(zlib:gunzip(Bin)).
+
+%% UNIT TESTS
+
+-ifdef(TEST).
+-ifdef(EQC).
+
+prop_bloom_test_() ->
+ {timeout, 60, fun() -> ?assert(eqc:quickcheck(prop_bloom())) end}.
+
+g_keys() ->
+ non_empty(list(non_empty(binary()))).
+
+prop_bloom() ->
+ ?FORALL(Keys, g_keys(),
+ begin
+ Bloom = ?MODULE:bloom(Keys),
+ F = fun(X) -> member(X, Bloom) end,
+ lists:all(F, Keys)
+ end).
+
+-endif.
+-endif.
View
@@ -72,7 +72,7 @@
%% @doc
%% Create or open a hanoidb store. Argument `Dir' names a
-%%% directory in which to keep the data files. By convention, we
+%% directory in which to keep the data files. By convention, we
%% name hanoidb data directories with extension ".hanoidb".
- spec open(Dir::string()) -> hanoidb().
open(Dir) ->
@@ -395,8 +395,9 @@ handle_call({get, Key}, From, State=#state{ top=Top, nursery=Nursery } ) when is
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
try
ok = hanoidb_nursery:finish(Nursery, Top),
+ {ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
ok = hanoidb_level:close(Top),
- {stop, normal, ok, State#state{ nursery=hanoidb_nursery:new(Dir, MaxLevel, Config)}}
+ {stop, normal, ok, State#state{ nursery=Nursery2 }}
catch
E:R ->
error_logger:info_msg("exception from close ~p:~p~n", [E,R]),
View
@@ -26,7 +26,7 @@
%% smallest levels are 256 entries
-define(TOP_LEVEL, 8).
-define(BTREE_SIZE(Level), (1 bsl (Level))).
--define(FILE_FORMAT, <<"HAN1">>).
+-define(FILE_FORMAT, <<"HAN2">>).
-define(FIRST_BLOCK_POS, byte_size(?FILE_FORMAT)).
-define(TOMBSTONE, 'deleted').
View
@@ -177,18 +177,15 @@ lookup(Key, #nursery{cache=Cache}) ->
%% Finish this nursery (encode it to a btree, and delete the nursery file)
%% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
-finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
- total_size=_TotalSize, count=Count,
- config=Config, merge_done=DoneMerge
- }, TopLevel) ->
+finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, count=Count,
+ config=Config, merge_done=DoneMerge }, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
- %% first, close the log file (if it is open)
- if LogFile /= undefined ->
- ok = file:close(LogFile);
- true ->
- ok
+ %% First, close the log file (if it is open)
+ case LogFile /= undefined of
+ true -> ok = file:close(LogFile);
+ false -> ok
end,
case Count of
View
@@ -84,11 +84,11 @@ open(Name, Config) ->
Bloom =
case BloomSize of
0 ->
- {ok, <<FilterSize:32/unsigned>>} = file:pread(File, (FileInfo#file_info.size - 16), 4),
- bloom:new(FilterSize, 0.001);
+ {ok, <<NumElems:32/unsigned>>} = file:pread(File, (FileInfo#file_info.size - 16), 4),
+ bloom:bloom(NumElems);
_ ->
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
- hanoidb_util:decode_bloom(BloomData)
+ bloom:decode(BloomData)
end,
%% read in the root node
@@ -256,7 +256,9 @@ lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
first_node(#index{file=File}) ->
case read_node(File, ?FIRST_BLOCK_POS) of
{ok, #node{level=0, members=Members}} ->
- {node, Members}
+ {node, Members};
+ eof->
+ none
end.
next_node(#index{file=File}=_Index) ->
@@ -274,7 +276,7 @@ close(#index{file=File}) ->
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
- case bloom:is_element(Key, Bloom) of
+ case bloom:member(Key, Bloom) of
true ->
case lookup_in_node(File, Node, Key) of
not_found ->
View
@@ -205,15 +205,6 @@ decode_kv_data(<<?TAG_TRANSACT, Rest/binary>>) ->
{ok, TX} = decode_crc_data(Rest, [], []),
TX.
-encode_bloom(Bloom) ->
- case bloom:is_bloom(Bloom) of
- true -> zlib:gzip(term_to_binary(Bloom));
- false -> <<>>
- end.
-
-decode_bloom(Bin) ->
- binary_to_term(zlib:gunzip(Bin)).
-
%% @doc Return number of seconds since 1970
-spec tstamp() -> pos_integer().
tstamp() ->
View
@@ -94,7 +94,7 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
ok = file:write(IdxFile, ?FILE_FORMAT),
- Bloom = bloom:new(erlang:min(Size, 16#ffffffff), 0.001),
+ Bloom = bloom:bloom(Size),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
@@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
- erlang:term_to_binary( { State#state{ index_file=closed }, hanoidb_util:encode_bloom(Bloom) } ).
+ erlang:term_to_binary( { State#state{ index_file=closed }, bloom:encode(Bloom) } ).
deserialize(Binary) ->
{State, Bin} = erlang:binary_to_term(Binary),
- Bloom = hanoidb_util:decode_bloom(Bin),
+ Bloom = bloom:decode(Bin),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom=Bloom, index_file=IdxFile }.
@@ -193,10 +193,9 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN
undefined ->
%% store contains no entries
ok = file:write(IdxFile, <<0:32/unsigned, 0:16/unsigned>>),
- FilterSize = bloom:filter_size(Bloom),
- {<<FilterSize:32/unsigned>>, 0, ?FIRST_BLOCK_POS};
+ {<<(bloom:size(Bloom)):32/unsigned>>, 0, ?FIRST_BLOCK_POS};
_ ->
- EncodedBloom = hanoidb_util:encode_bloom(Bloom),
+ EncodedBloom = bloom:encode(Bloom),
{EncodedBloom, byte_size(EncodedBloom), LastNodePos}
end,
@@ -240,7 +239,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value),
- NewBloom = bloom:add_element(Key, Bloom),
+ NewBloom = bloom:add(Key, Bloom),
{TC1, VC1} =
case Level of
Oops, something went wrong.

0 comments on commit ce8982c

Please sign in to comment.