Skip to content

Commit

Permalink
Regulate all kinds of running workers up to the number of schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
aronisstav authored and proxyles committed May 21, 2012
1 parent 4e1ed3a commit 720b65d
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 117 deletions.
3 changes: 1 addition & 2 deletions lib/dialyzer/src/dialyzer_callgraph.erl
Expand Up @@ -265,8 +265,7 @@ module_postorder(#callgraph{digraph = DG}) ->
digraph_confirm_vertices(sets:to_list(Nodes), MDG),
Foreach = fun({M1,M2}) -> digraph:add_edge(MDG, M1, M2) end,
lists:foreach(Foreach, sets:to_list(Edges)),
PostOrder = digraph_utils:topsort(MDG),
{PostOrder, {'d', MDG}}.
{digraph_utils:topsort(MDG), {'d', MDG}}.

edge_fold({{M1,_,_},{M2,_,_}}, Set) ->
case M1 =/= M2 of
Expand Down
208 changes: 100 additions & 108 deletions lib/dialyzer/src/dialyzer_coordinator.erl
Expand Up @@ -32,18 +32,18 @@
-export([wait_activation/0, job_done/3]).

%%% Exports for the typesig and dataflow analysis workers
-export([sccs_to_pids/1]).
-export([sccs_to_pids/1, request_activation/1]).

%%% Exports for the compilation workers
-export([get_next_label/2]).

-export_type([coordinator/0, mode/0, init_data/0]).
-export_type([coordinator/0, mode/0, init_data/0, result/0]).

%%--------------------------------------------------------------------

-define(MAP, dialyzer_coordinator_map).

-type coordinator() :: pid(). %%opaque
-type coordinator() :: {pid(), pid()}. %%opaque

-type scc() :: [mfa_or_funlbl()].
-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'.
Expand Down Expand Up @@ -73,12 +73,12 @@
-type job_result() :: dialyzer_analysis_callgraph:one_file_result() |
typesig_result() | dataflow_result() | warnings_result().

-record(state, {active = 0 :: integer(),
result :: result(),
next_label = 0 :: integer(),
tickets = 0 :: integer(),
queue = queue:new() :: queue(),
init_data :: init_data()
-record(state, {mode :: mode(),
active = 0 :: integer(),
result :: result(),
next_label = 0 :: integer(),
init_data :: init_data(),
regulator :: pid()
}).

-include("dialyzer.hrl").
Expand All @@ -96,127 +96,79 @@

parallel_job(Mode, Jobs, InitData) ->
State = spawn_jobs(Mode, Jobs, InitData),
collect_result(Mode, State).

spawn_jobs(Mode, Jobs, InitData) when
Mode =:= 'typesig'; Mode =:= 'dataflow' ->
Coordinator = self(),
?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
collect_result(State).

spawn_jobs(Mode, Jobs, InitData) ->
Collector = self(),
Regulator = spawn_regulator(),
Coordinator = {Collector, Regulator},
TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'),
case TypesigOrDataflow of
true ->
?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]);
false -> ok
end,
Fold =
fun(Job, Count) ->
Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
true = ets:insert(?MAP, {Job, Pid}),
case TypesigOrDataflow of
true -> true = ets:insert(?MAP, {Job, Pid});
false -> request_activation(Regulator, Pid)
end,
Count + 1
end,
JobCount = lists:foldl(Fold, 0, Jobs),
Unit =
case Mode of
'typesig' -> "SCCs";
'dataflow' -> "modules"
_ -> "modules"
end,
dialyzer_timing:send_size_info(JobCount, Unit),
#state{active = JobCount, result = [], init_data = InitData};
spawn_jobs(Mode, Jobs, InitData) when
Mode =:= 'compile'; Mode =:= 'warnings' ->
Coordinator = self(),
Fold =
fun(Job, {InTickets, InQueue, Count}) ->
Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
NewCount = Count + 1,
case InTickets of
0 -> {InTickets, queue:in(Pid, InQueue), NewCount};
N -> activate_pid(Pid), {N-1, InQueue, NewCount}
end
end,
CPUs = erlang:system_info(logical_processors_available),
InitTickets = 4*CPUs,
{Tickets, Queue, JobCount} =
lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs),
dialyzer_timing:send_size_info(JobCount, "modules"),
InitResult =
case Mode of
'warnings' -> [];
'compile' -> dialyzer_analysis_callgraph:compile_init_result()
'compile' -> dialyzer_analysis_callgraph:compile_init_result();
_ -> []
end,
#state{active = JobCount, result = InitResult, next_label = 0,
tickets = Tickets, queue = Queue, init_data = InitData}.

collect_result(Mode, State) ->
case Mode of
'compile' -> compile_loop(State);
'typesig' -> not_fixpoint_loop(State);
'dataflow' -> not_fixpoint_loop(State);
'warnings' -> warnings_loop(State)
end.
#state{mode = Mode, active = JobCount, result = InitResult, next_label = 0,
init_data = InitData, regulator = Regulator}.

compile_loop(#state{active = Active, result = Result,
next_label = NextLabel, tickets = Tickets,
queue = Queue, init_data = InitData} = State) ->
collect_result(#state{mode = Mode, active = Active, result = Result,
next_label = NextLabel, init_data = InitData,
regulator = Regulator} = State) ->
receive
{next_label_request, Estimation, Pid} ->
Pid ! {next_label_reply, NextLabel},
compile_loop(State#state{next_label = NextLabel + Estimation});
collect_result(State#state{next_label = NextLabel + Estimation});
{done, Job, Data} ->
NewResult =
dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData),
NewResult = update_result(Mode, InitData, Job, Data, Result),
case Active of
1 ->
{NewResult, NextLabel};
_ ->
NewActive = Active - 1,
{NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
NewState =
State#state{result = NewResult, active = NewActive,
queue = NewQueue, tickets = NewTickets},
compile_loop(NewState)
kill_regulator(Regulator),
case Mode of
'compile' ->
{NewResult, NextLabel};
X when X =:= 'typesig'; X =:= 'dataflow' ->
ets:delete(?MAP),
NewResult;
'warnings' ->
NewResult
end;
N ->
collect_result(State#state{result = NewResult, active = N - 1})
end
end.

not_fixpoint_loop(#state{active = Active, result = Result,
init_data = InitData} = State) ->
receive
{done, _Job, Data} ->
FinalData = dialyzer_succ_typings:lookup_names(Data, InitData),
NewResult = FinalData ++ Result,
case Active of
1 ->
ets:delete(?MAP),
NewResult;
_ ->
NewActive = Active - 1,
NewState = State#state{active = NewActive, result = NewResult},
not_fixpoint_loop(NewState)
end
end.

warnings_loop(#state{active = Active, result = Result, tickets = Tickets,
queue = Queue} = State) ->
receive
{done, _Job, Data} ->
NewResult = Data ++ Result,
case Active of
1 -> NewResult;
_ ->
NewActive = Active - 1,
{NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
NewState =
State#state{result = NewResult, active = NewActive,
queue = NewQueue, tickets = NewTickets},
warnings_loop(NewState)
end
update_result(Mode, InitData, Job, Data, Result) ->
case Mode of
'compile' ->
dialyzer_analysis_callgraph:add_to_result(Job, Data, Result,
InitData);
X when X =:= 'typesig'; X =:= 'dataflow' ->
dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result;
'warnings' ->
Data ++ Result
end.

manage_waiting(Queue, Tickets) ->
{Waiting, NewQueue} = queue:out(Queue),
NewTickets =
case Waiting of
empty -> Tickets + 1;
{value, Pid} ->
activate_pid(Pid),
Tickets
end,
{NewQueue, NewTickets}.

-spec sccs_to_pids([scc() | module()]) ->
{[dialyzer_worker:worker()], [scc() | module()]}.

Expand All @@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) ->

-spec job_done(job(), job_result(), coordinator()) -> ok.

job_done(Job, Result, Coordinator) ->
Coordinator ! {done, Job, Result},
job_done(Job, Result, {Collector, Regulator}) ->
Regulator ! done,
Collector ! {done, Job, Result},
ok.

-spec get_next_label(integer(), coordinator()) -> integer().

get_next_label(EstimatedSize, Coordinator) ->
Coordinator ! {next_label_request, EstimatedSize, self()},
get_next_label(EstimatedSize, {Collector, _Regulator}) ->
Collector ! {next_label_request, EstimatedSize, self()},
receive
{next_label_reply, NextLabel} -> NextLabel
end.
Expand All @@ -251,3 +204,42 @@ wait_activation() ->

activate_pid(Pid) ->
Pid ! activate.

-spec request_activation(coordinator()) -> ok.

request_activation({_Collector, Regulator}) ->
Regulator ! {req, self()},
wait_activation().

request_activation(Regulator, Pid) ->
Regulator ! {req, Pid}.

spawn_regulator() ->
InitTickets = dialyzer_utils:parallelism(),
spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end).

regulator_loop(Tickets, Queue) ->
receive
{req, Pid} ->
case Tickets of
0 ->
regulator_loop(0, queue:in(Pid, Queue));
N ->
activate_pid(Pid),
regulator_loop(N-1, Queue)
end;
done ->
{Waiting, NewQueue} = queue:out(Queue),
NewTickets =
case Waiting of
empty -> Tickets + 1;
{value, Pid} ->
activate_pid(Pid),
Tickets
end,
regulator_loop(NewTickets, NewQueue);
stop -> ok
end.

kill_regulator(Regulator) ->
Regulator ! stop.
2 changes: 1 addition & 1 deletion lib/dialyzer/src/dialyzer_typesig.erl
Expand Up @@ -1743,7 +1743,7 @@ parallel_split(SCC) ->
case Length > 2*?worth_it of
false -> false;
true ->
case min(erlang:system_info(logical_processors_available), 8) of
case min(dialyzer_utils:parallelism(), 8) of
1 -> false;
CPUs ->
FullShare = Length div CPUs + 1,
Expand Down
12 changes: 11 additions & 1 deletion lib/dialyzer/src/dialyzer_utils.erl
Expand Up @@ -43,7 +43,8 @@
pp_hook/0,
process_record_remote_types/1,
sets_filter/2,
src_compiler_opts/0
src_compiler_opts/0,
parallelism/0
]).

-include("dialyzer.hrl").
Expand Down Expand Up @@ -536,3 +537,12 @@ pp_unit(Unit, Ctxt, Cont) ->
pp_atom(Atom) ->
String = atom_to_list(cerl:atom_val(Atom)),
prettypr:text(String).

%%------------------------------------------------------------------------------

-spec parallelism() -> integer().

parallelism() ->
CPUs = erlang:system_info(logical_processors_available),
Schedulers = erlang:system_info(schedulers),
min(CPUs, Schedulers).

0 comments on commit 720b65d

Please sign in to comment.