Skip to content

Commit

Permalink
Merge pull request #13 from Licenser/timeout_in_fold
Browse files Browse the repository at this point in the history
Timeout in fold. Thanks, @Licenser!
  • Loading branch information
krestenkrab committed Nov 15, 2013
2 parents 88a763f + 3e1d44c commit 930f1c1
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TARGET= hanoidb

REBAR= rebar
REBAR= ./rebar
DIALYZER= dialyzer


Expand Down
Binary file added rebar
Binary file not shown.
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
{xref_checks, [undefined_function_calls]}.

{deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {tag, "2.0.0"}}}
, {snappy, ".*", {git, "https://github.com/ahf/snappy-erlang-nif.git", {branch, "r16-support"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
% , {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
% , {triq, ".*", {git, "git://github.com/krestenkrab/triq", {branch, "master"}}}
Expand Down
51 changes: 49 additions & 2 deletions src/hanoidb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@
%% PUBLIC API

-type hanoidb() :: pid().

-type key_range() :: #key_range{}.

-type range_spec() :: key_range()
| [{from_key, binary()}
| {from_inclusive, true|false}
| from_inclusive
| {to_key, binary() | undefined}
| {to_inclusive, true|false}
| to_inclusive
| {limit, pos_integer() | undefined}].

-type config_option() :: {compress, none | gzip | snappy | lz4}
| {page_size, pos_integer()}
| {read_buffer_size, pos_integer()}
Expand Down Expand Up @@ -180,7 +191,7 @@ transact(Ref, TransactionSpec) ->
fold(Ref,Fun,Acc0) ->
fold_range(Ref,Fun,Acc0,#key_range{from_key= <<>>, to_key=undefined}).

-spec fold_range(hanoidb(),kv_fold_fun(),any(),key_range()) -> any().
-spec fold_range(hanoidb(),kv_fold_fun(),any(), range_spec()) -> any().
fold_range(Ref,Fun,Acc0,#key_range{limit=Limit}=Range) ->
RangeType =
if Limit < 10 -> blocking_range;
Expand All @@ -192,7 +203,10 @@ fold_range(Ref,Fun,Acc0,#key_range{limit=Limit}=Range) ->
ok = gen_server:call(Ref, {RangeType, FoldWorkerPID, Range}, infinity),
Result = receive_fold_range(MRef, FoldWorkerPID, Fun, Acc0, Limit),
?log("fold_range done: self:~p, result=~p~n", [self(), Result]),
Result.
Result;

fold_range(Ref, Fun, Acc0, Range) ->
fold_range(Ref, Fun, Acc0, range_proplist_to_record(Range, #key_range{})).

receive_fold_range(MRef,PID,_,Acc0, 0) ->
erlang:exit(PID, shutdown),
Expand Down Expand Up @@ -236,6 +250,9 @@ receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
{'DOWN', MRef, _, _PID, Reason} ->
?log("> fold worker ~p DOWN reason:~p~n", [_PID, Reason]),
error({fold_worker_died, Reason})
after 500 ->
erlang:exit(PID, shutdown),
error(fold_worker_timeout)
end.

decr(undefined) ->
Expand Down Expand Up @@ -465,3 +482,33 @@ get_opt(Key, Opts, Default) ->
Value ->
Value
end.


-spec range_proplist_to_record(list(), key_range()) -> key_range().

range_proplist_to_record([], Record) ->
Record;

range_proplist_to_record([{from_key, Key} | L], Record) ->
range_proplist_to_record(L, Record#key_range{from_key = Key});

range_proplist_to_record([{from_inclusive, Key} | L], Record) ->
range_proplist_to_record(L, Record#key_range{from_inclusive = Key});

range_proplist_to_record([from_inclusive | L], Record) ->
range_proplist_to_record(L, Record#key_range{from_inclusive = true});

range_proplist_to_record([{to_key, Key} | L], Record) ->
range_proplist_to_record(L, Record#key_range{to_key = Key});

range_proplist_to_record([{to_inclusive, Key} | L], Record) ->
range_proplist_to_record(L, Record#key_range{to_inclusive = Key});

range_proplist_to_record([to_inclusive | L], Record) ->
range_proplist_to_record(L, Record#key_range{to_inclusive = true});

range_proplist_to_record([{limit, Key} | L], Record) ->
range_proplist_to_record(L, Record#key_range{limit = Key});

range_proplist_to_record([_ | L], Record) ->
range_proplist_to_record(L, Record).

0 comments on commit 930f1c1

Please sign in to comment.