Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce receive optimisation to dets calls #6045

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 54 additions & 44 deletions lib/stdlib/src/dets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
tab_name/0]).

-compile({inline, [{einval,2},{badarg,2},{undefined,1},
{badarg_exit,2},{lookup_reply,2}]}).
{badarg_exit,2},{lookup_reply,2},
{pidof,1},{resp,2}]}).

-include_lib("kernel/include/file.hrl").

Expand Down Expand Up @@ -1237,15 +1238,24 @@ treq(Tab, R) ->

req(Proc, R) ->
Ref = erlang:monitor(process, Proc),
Proc ! ?DETS_CALL(self(), R),
Proc ! ?DETS_CALL({self(), Ref}, R),
receive
{'DOWN', Ref, process, Proc, _Info} ->
badarg;
{Proc, Reply} ->
{Ref, Reply} ->
erlang:demonitor(Ref, [flush]),
Reply
end.

%% Inlined.
pidof({Pid, _Tag}) ->
Pid.

%% Inlined.
resp({Pid, Tag} = _From, Message) ->
Pid ! {Tag, Message},
ok.

ssmyczynski marked this conversation as resolved.
Show resolved Hide resolved
%% Inlined.
einval({error, {file_error, _, einval}}, A) ->
erlang:error(badarg, A);
Expand Down Expand Up @@ -1398,7 +1408,7 @@ apply_op(Op, From, Head, N) ->
true ->
err({error, incompatible_arguments})
end,
From ! {self(), Res},
resp(From, Res),
ok;
auto_save ->
case Head#head.update_mode of
Expand All @@ -1419,33 +1429,33 @@ apply_op(Op, From, Head, N) ->
{0, Head}
end;
close ->
From ! {self(), fclose(Head)},
resp(From, fclose(Head)),
_NewHead = unlink_fixing_procs(Head),
?PROFILE(ep:done()),
exit(normal);
{close, Pid} ->
%% Used from dets_server when Pid has closed the table,
%% but the table is still opened by some process.
NewHead = remove_fix(Head, Pid, close),
From ! {self(), status(NewHead)},
resp(From, status(NewHead)),
NewHead;
{corrupt, Reason} ->
{H2, Error} = dets_utils:corrupt_reason(Head, Reason),
From ! {self(), Error},
resp(From, Error),
H2;
{delayed_write, WrTime} ->
delayed_write(Head, WrTime);
info ->
{H2, Res} = finfo(Head),
From ! {self(), Res},
resp(From, Res),
H2;
{info, Tag} ->
{H2, Res} = finfo(Head, Tag),
From ! {self(), Res},
resp(From, Res),
H2;
{is_compatible_bchunk_format, Term} ->
Res = test_bchunk_format(Head, Term),
From ! {self(), Res},
resp(From, Res),
ok;
{internal_open, Ref, Args} ->
do_internal_open(Head#head.parent, Head#head.server, From,
Expand All @@ -1462,27 +1472,27 @@ apply_op(Op, From, Head, N) ->
end;
{set_verbose, What} ->
set_verbose(What),
From ! {self(), ok},
resp(From, ok),
ok;
{where, Object} ->
{H2, Res} = where_is_object(Head, Object),
From ! {self(), Res},
resp(From, Res),
H2;
_Message when element(1, Head#head.update_mode) =:= error ->
From ! {self(), status(Head)},
resp(From, status(Head)),
ok;
%% The following messages assume that the status of the table is OK.
{bchunk_init, Tab} ->
{H2, Res} = do_bchunk_init(Head, Tab),
From ! {self(), Res},
resp(From, Res),
H2;
{bchunk, State} ->
{H2, Res} = do_bchunk(Head, State),
From ! {self(), Res},
resp(From, Res),
H2;
delete_all_objects ->
{H2, Res} = fdelete_all_objects(Head),
From ! {self(), Res},
resp(From, Res),
erlang:garbage_collect(),
{0, H2};
{delete_key, _Keys} when Head#head.update_mode =:= dirty ->
Expand All @@ -1492,29 +1502,29 @@ apply_op(Op, From, Head, N) ->
true ->
stream_op(Op, From, [], Head, N);
false ->
From ! {self(), badarg},
resp(From, badarg),
ok
end;
first ->
{H2, Res} = ffirst(Head),
From ! {self(), Res},
resp(From, Res),
H2;
{initialize, InitFun, Format, MinNoSlots} ->
{H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
From ! {self(), Res},
resp(From, Res),
erlang:garbage_collect(),
H2;
{insert, Objs} when Head#head.update_mode =:= dirty ->
case check_objects(Objs, Head#head.keypos) of
true ->
stream_op(Op, From, [], Head, N);
false ->
From ! {self(), badarg},
resp(From, badarg),
ok
end;
{insert_new, Objs} when Head#head.update_mode =:= dirty ->
{H2, Res} = finsert_new(Head, Objs),
From ! {self(), Res},
resp(From, Res),
{N + 1, H2};
{lookup_keys, _Keys} ->
stream_op(Op, From, [], Head, N);
Expand All @@ -1523,48 +1533,48 @@ apply_op(Op, From, Head, N) ->
H2 = case Res of
{cont,_} -> H1;
_ when Safe =:= no_safe-> H1;
_ when Safe =:= safe -> do_safe_fixtable(H1, From, false)
_ when Safe =:= safe -> do_safe_fixtable(H1, pidof(From), false)
end,
From ! {self(), Res},
resp(From, Res),
H2;
{match, MP, Spec, NObjs, Safe} ->
{H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From),
From ! {self(), Res},
resp(From, Res),
H2;
{member, _Key} = Op ->
stream_op(Op, From, [], Head, N);
{next, Key} ->
{H2, Res} = fnext(Head, Key),
From ! {self(), Res},
resp(From, Res),
H2;
{match_delete, State} when Head#head.update_mode =:= dirty ->
{H1, Res} = fmatch_delete(Head, State),
H2 = case Res of
{cont,_S,_N} -> H1;
_ -> do_safe_fixtable(H1, From, false)
_ -> do_safe_fixtable(H1, pidof(From), false)
end,
From ! {self(), Res},
resp(From, Res),
{N + 1, H2};
{match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
{H2, Res} = fmatch_delete_init(Head, MP, Spec, From),
From ! {self(), Res},
resp(From, Res),
{N + 1, H2};
{safe_fixtable, Bool} ->
NewHead = do_safe_fixtable(Head, From, Bool),
From ! {self(), ok},
NewHead = do_safe_fixtable(Head, pidof(From), Bool),
resp(From, ok),
NewHead;
{slot, Slot} ->
{H2, Res} = fslot(Head, Slot),
From ! {self(), Res},
resp(From, Res),
H2;
sync ->
{NewHead, Res} = perform_save(Head, true),
From ! {self(), Res},
resp(From, Res),
erlang:garbage_collect(),
{0, NewHead};
{update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
{NewHead, Res} = do_update_counter(Head, Key, Incr),
From ! {self(), Res},
resp(From, Res),
{N + 1, NewHead};
WriteOp when Head#head.update_mode =:= new_dirty ->
H2 = Head#head{update_mode = dirty},
Expand All @@ -1577,12 +1587,12 @@ apply_op(Op, From, Head, N) ->
H2 = Head#head{update_mode = dirty},
apply_op(WriteOp, From, H2, 0);
{NewHead, Error} when is_record(NewHead, head) ->
From ! {self(), Error},
resp(From, Error),
NewHead
end;
WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
Reason = {access_mode, Head#head.filename},
From ! {self(), err({error, Reason})},
resp(From, err({error, Reason})),
ok
end.

Expand All @@ -1603,7 +1613,7 @@ bug_found(Name, Op, Bad, Stacktrace, From) ->
end,
if
From =/= self() ->
From ! {self(), {error, {dets_bug, Name, Op, Bad}}},
resp(From, {error, {dets_bug, Name, Op, Bad}}),
ok;
true -> % auto_save | may_grow | {delayed_write, _}
ok
Expand All @@ -1613,10 +1623,10 @@ do_internal_open(Parent, Server, From, Ref, Args) ->
?PROFILE(ep:do()),
case do_open_file(Args, Parent, Server, Ref) of
{ok, Head} ->
From ! {self(), ok},
resp(From, ok),
Head;
Error ->
From ! {self(), Error},
resp(From, Error),
exit(normal)
end.

Expand Down Expand Up @@ -1698,7 +1708,7 @@ stream_end1(Pids, Next, N, C, Head, PwriteList) ->
stream_end2(Pids, Pids, Next, N, C, Head1, PR).

stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
Pid ! {self(), Reply},
resp(Pid, Reply),
stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
penalty(Head, Ps, C),
Expand All @@ -1710,7 +1720,7 @@ penalty(H, _Ps, _C) when H#head.fixed =:= false ->
ok;
penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
ok;
penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
penalty(#head{fixed = {_,[{Pid, _}]}}, [{Pid, _Tag} = _From], _C) ->
ok;
penalty(_H, _Ps, _C) ->
timer:sleep(1).
Expand All @@ -1729,9 +1739,9 @@ lookup_replies(P, O, [{P2,O2} | L]) ->

%% If a list of Pid then op was {member, Key}. Inlined.
lookup_reply([P], O) ->
P ! {self(), O =/= []};
resp(P, O =/= []);
lookup_reply(P, O) ->
P ! {self(), O}.
resp(P, O).

%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
Expand Down Expand Up @@ -2253,7 +2263,7 @@ fmatch(Head, MP, Spec, N, Safe, From) ->
{Head1, []} ->
NewHead =
case Safe of
safe -> do_safe_fixtable(Head1, From, true);
safe -> do_safe_fixtable(Head1, pidof(From), true);
no_safe -> Head1
end,
C0 = init_scan(NewHead, N),
Expand Down Expand Up @@ -2370,7 +2380,7 @@ do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From)
Reply
end;
do_fmatch_delete_var_keys(Head, MP, _Spec, From) ->
Head1 = do_safe_fixtable(Head, From, true),
Head1 = do_safe_fixtable(Head, pidof(From), true),
{NewHead, []} = write_cache(Head1),
C0 = init_scan(NewHead, default),
{NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.
Expand Down
32 changes: 30 additions & 2 deletions lib/stdlib/test/dets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
otp_5487/1, otp_6206/1, otp_6359/1, otp_4738/1, otp_7146/1,
otp_8070/1, otp_8856/1, otp_8898/1, otp_8899/1, otp_8903/1,
otp_8923/1, otp_9282/1, otp_11245/1, otp_11709/1, otp_13229/1,
otp_13260/1, otp_13830/1]).
otp_13260/1, otp_13830/1, receive_optimisation/1]).

-export([dets_dirty_loop/0]).

Expand Down Expand Up @@ -94,7 +94,7 @@ all() ->
insert_new, repair_continuation, otp_5487, otp_6206,
otp_6359, otp_4738, otp_7146, otp_8070, otp_8856, otp_8898,
otp_8899, otp_8903, otp_8923, otp_9282, otp_11245, otp_11709,
otp_13229, otp_13260, otp_13830
otp_13229, otp_13260, otp_13830, receive_optimisation
].

groups() ->
Expand Down Expand Up @@ -3492,6 +3492,34 @@ otp_13830(Config) ->
{ok, Tab} = dets:open_file(Tab, [{file, File}, {version, default}]),
ok = dets:close(Tab).

receive_optimisation(Config) ->
Tab = dets_receive_optimisation_test,
FName = filename(Tab, Config),

% Spam message box
lists:foreach(fun(_) -> self() ! {spam, it} end, lists:seq(1, 1_000_000)),

{ok, _} = dets:open_file(Tab,[{file, FName}]),
ok = dets:insert(Tab,{one, record}),

StartTime = os:system_time(millisecond),

% We expect one thousand of simple lookups to finish in one second
Lookups = 1000,
Timeout = 1000,
Loop = fun Loop(N) when N =< 0 -> ok;
Loop(N) ->
Now = os:system_time(millisecond),
(Now - StartTime > Timeout) andalso throw({timeout_after, Lookups - N}),
[{one, record}] = dets:lookup(Tab, one),
Loop(N-1)
end,

ok = Loop(Lookups),

ok = dets:close(Tab),
ok = file:delete(FName).

%%
%% Parts common to several test cases
%%
Expand Down