Skip to content

Commit

Permalink
New set of hybrid skeletons (hyb_cluster and hyb_map)
Browse files Browse the repository at this point in the history
  • Loading branch information
vjanjic committed May 15, 2015
1 parent 939af75 commit 164abba
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 34 deletions.
25 changes: 25 additions & 0 deletions src/sk_assembler.erl
Expand Up @@ -13,6 +13,7 @@

-export([
make/2
,make_hyb/4
,run/2
]).

Expand All @@ -32,6 +33,12 @@ make(WorkFlow, EndPid) when is_pid(EndPid) ->
MakeFns = [parse(Section) || Section <- WorkFlow],
lists:foldr(fun(MakeFn, Pid) -> MakeFn(Pid) end, EndPid, MakeFns).

-spec make_hyb(workflow(), pid(), pos_integer(), pos_integer()) -> pid().
make_hyb(WorkFlow, EndPid, NCPUWorkers, NGPUWorkers) when is_pid(EndPid) ->
MakeFns = [parse_hyb(Section, NCPUWorkers, NGPUWorkers) || Section <- WorkFlow],
lists:foldr(fun(MakeFn, Pid) -> MakeFn(Pid) end, EndPid, MakeFns).


-spec run(pid() | workflow(), input()) -> pid().
%% @doc Function to produce and start a set of processes according to the
%% given workflow specification and input.
Expand All @@ -43,6 +50,14 @@ run(WorkFlow, Input) when is_list(WorkFlow) ->
AssembledWF = make(WorkFlow, DrainPid),
run(AssembledWF, Input).

parse_hyb(Section, NCPUWorkers, NGPUWorkers) ->
case Section of
{hyb_map, WorkFlowCPU, WorkFlowGPU} ->
parse({hyb_map, WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers});
Other -> parse(Other)
end.


-spec parse(wf_item()) -> maker_fun().
%% @doc Determines the course of action to be taken according to the type of
%% workflow specified. Constructs and starts specific skeleton instances.
Expand All @@ -62,11 +77,21 @@ parse({map, WorkFlow}) ->
sk_map:make(WorkFlow);
parse({map, WorkFlow, NWorkers}) ->
sk_map:make(WorkFlow, NWorkers);
parse({hyb_map, WorkFlowCPU, WorkFlowGPU}) ->
sk_map:make_hyb(WorkFlowCPU, WorkFlowGPU);
parse({hyb_map, WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers}) ->
sk_map:make_hyb(WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers);
parse({cluster, WorkFlow, Decomp, Recomp}) when is_function(Decomp, 1),
is_function(Recomp, 1) ->
sk_cluster:make(WorkFlow, Decomp, Recomp);
parse({hyb_cluster, WorkFlow, Decomp, Recomp, NCPUWorkers, NGPUWorkers}) when
is_function(Decomp, 1), is_function(Recomp, 1) ->
sk_cluster:make_hyb(WorkFlow, Decomp, Recomp, NCPUWorkers, NGPUWorkers);
parse({hyb_cluster, WorkFlow, TimeRatio, NCPUWorkers, NGPUWorkers}) ->
sk_cluster:make_hyb(WorkFlow, TimeRatio, NCPUWorkers, NGPUWorkers);
parse({hyb_cluster, WorkFlow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers}) ->
sk_cluster:make_hyb(WorkFlow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers);

% parse({decomp, WorkFlow, Decomp, Recomp}) when is_function(Decomp, 1),
% is_function(Recomp, 1) ->
% sk_decomp:make(WorkFlow, Decomp, Recomp);
Expand Down
99 changes: 98 additions & 1 deletion src/sk_cluster.erl
Expand Up @@ -33,7 +33,10 @@
-module(sk_cluster).

-export([
make/3
make/3,
make_hyb/4,
make_hyb/5,
make_hyb/7
]).

-include("skel.hrl").
Expand All @@ -55,3 +58,97 @@ make(WorkFlow, Decomp, Recomp) ->
WorkerPid = sk_utils:start_worker(WorkFlow, RecompPid),
spawn(sk_cluster_decomp, start, [Decomp, WorkerPid])
end.

ceiling(X) ->
T = erlang:trunc(X),
case (X - T) of
Neg when Neg < 0 -> T;
Pos when Pos > 0 -> T + 1;
_ -> T
end.


mark_tasks([], _NCPUWorkers, _NGPUWorkers) ->
[];
mark_tasks(_Tasks, 0, 0) ->
[];
mark_tasks([Task|Tasks], 0, NGPUWorkers) ->
[{gpu, Task} | mark_tasks(Tasks, 0, NGPUWorkers-1)];
mark_tasks([Task|Tasks], NCPUWorkers, NGPUWorkers) ->
[{cpu, Task} | mark_tasks(Tasks, NCPUWorkers-1, NGPUWorkers)].

hyb_cluster_decomp(Decomp, NCPUWorkers, NGPUWorkers, Input) ->
Tasks = Decomp(Input),
mark_tasks(Tasks, NCPUWorkers, NGPUWorkers).

calculate_ratio(TimeRatio, NTasks, NCPUW, NGPUW) ->
TasksCPU = lists:seq(0, NTasks),
Time = fun(CPUTasks, GPUTasks) ->
max (ceiling(CPUTasks/NCPUW)*TimeRatio, ceiling(GPUTasks/NGPUW))
end,
Ratio = lists:foldl(fun(Elem,Acc) -> FooBar = Time(Elem, NTasks-Elem),
if
(FooBar < element(1,Acc)) or (element(1,Acc) == -1)
-> {FooBar, Elem};
true -> Acc
end end,
{-1,0}, TasksCPU),
{element(2,Ratio), NTasks-element(2,Ratio)}.

calculate_chunk_sizes(NrItems, NrWorkers) ->
ChunkSize = NrItems div NrWorkers,
Remainder = NrItems rem NrWorkers,
ChunkSizes = lists:duplicate(Remainder, {ChunkSize+1}) ++ lists:duplicate(NrWorkers-Remainder, {ChunkSize}),
ChunkSizes.

create_task_list([], [], _MakeChunkFun, _Input) ->
[];
create_task_list([CPUChunk|CPUChunks], GPUChunks, MakeChunkFun, Input) ->
CPUChunkSize = element(1,CPUChunk),
{Work, Rest} = MakeChunkFun(Input, CPUChunkSize),
[ {cpu, Work} | create_task_list(CPUChunks, GPUChunks, MakeChunkFun, Rest) ];
create_task_list([], [GPUChunk|GPUChunks], MakeChunkFun, Input) ->
GPUChunkSize = element(1,GPUChunk),
{Work, Rest} = MakeChunkFun(Input, GPUChunkSize),
[ {gpu, Work} | create_task_list([], GPUChunks, MakeChunkFun, Rest) ].

hyb_cluster_decomp_default(TimeRatio, StructSizeFun, MakeChunkFun, NCPUWorkers, NGPUWorkers, Input) ->
NItems = StructSizeFun(Input),
{CPUItems, GPUItems} = if
(NCPUWorkers>0) and (NGPUWorkers>0) -> calculate_ratio(TimeRatio, NItems, NCPUWorkers, NGPUWorkers);
NGPUWorkers == 0 -> {NItems,0};
NCPUWorkers == 0 -> {0, NItems}
end,
CPUChunkSizes = calculate_chunk_sizes(CPUItems, NCPUWorkers),
GPUChunkSizes = calculate_chunk_sizes(GPUItems, NGPUWorkers),
[create_task_list(CPUChunkSizes, GPUChunkSizes, MakeChunkFun, Input)].

-spec make_hyb(workflow(), decomp_fun(), recomp_fun(), pos_integer(), pos_integer()) -> fun((pid()) -> pid()).
make_hyb(Workflow, Decomp, Recomp, NCPUWorkers, NGPUWorkers) ->
fun(NextPid) ->
RecompPid = spawn(sk_cluster_recomp, start, [Recomp, NextPid]),
WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers),
spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp(Decomp, NCPUWorkers, NGPUWorkers, Input) end,
WorkerPid])
end.

-spec make_hyb(workflow(), float(), fun((any()) -> pos_integer()), fun((any(),pos_integer()) -> pos_integer()),
fun((any())->any()),
pos_integer(), pos_integer()) -> fun((pid()) -> pid()).
make_hyb(Workflow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers) ->
fun(NextPid) ->
RecompPid = spawn(sk_cluster_recomp, start, [RecompFun, NextPid]),
WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers),
spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp_default(TimeRatio, StructSizeFun, MakeChunkFun, NCPUWorkers, NGPUWorkers, Input) end,
WorkerPid])
end.

-spec make_hyb(workflow(), float(), pos_integer(), pos_integer()) -> fun((pid())->pid()).
make_hyb(Workflow, TimeRatio, NCPUWorkers, NGPUWorkers) ->
fun(NextPid) ->
RecompPid = spawn(sk_cluster_recomp, start, [fun lists:flatten/1, NextPid]),
WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers),
spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp_default(TimeRatio, fun length/1,fun (Data,Pos) -> lists:split(Pos,Data) end, NCPUWorkers, NGPUWorkers, Input) end,
WorkerPid])
end.

12 changes: 6 additions & 6 deletions src/sk_cluster_recomp.erl
Expand Up @@ -41,7 +41,7 @@ start(Recomp, NextPid) ->
DataRecompFun = sk_data:recomp_with(Recomp),
loop(dict:new(), DataRecompFun, NextPid).

-spec loop(dict(), data_recomp_fun(), pid()) -> 'eos'.
-spec loop(dict:dict(), data_recomp_fun(), pid()) -> 'eos'.
%% @doc Worker function for {@link start/2}; recursively receives and combines
%% messages using the recomposition function under `DataRecompFun'. Once all
%% partite elements for each original input have been received and merged, the
Expand All @@ -59,7 +59,7 @@ loop(Dict, DataRecompFun, NextPid) ->
eos
end.

-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict()) -> dict().
-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict:dict()) -> dict:dict().
%% @doc Facilitates the storing of all partite and semi-combined messages, for each input.
%%
%% The total number of partite elements expected, the partite element data
Expand All @@ -71,7 +71,7 @@ store(Ref, Idx, NPartitions, PartitionMessage, Dict) ->
Dict2 = dict:store({Ref, Idx}, PartitionMessage, Dict1),
dict:update_counter({Ref, received}, 1, Dict2).

-spec combine_and_forward(reference(), dict(), data_recomp_fun(), pid()) -> dict().
-spec combine_and_forward(reference(), dict:dict(), data_recomp_fun(), pid()) -> dict:dict().
%% @doc Attempts to find the grouping reference under `Ref' in the given
%% dictionary. If that reference is found, all message parts are combined
%% using the recomposition function given under `DataCombinerFun'.
Expand All @@ -81,7 +81,7 @@ combine_and_forward(Ref, Dict, DataCombinerFun, NextPid) ->
{ok, NPartitions} -> combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid)
end.

-spec combine_and_forward(reference(), dict(), pos_integer(), data_recomp_fun(), pid()) -> dict().
-spec combine_and_forward(reference(), dict:dict(), pos_integer(), data_recomp_fun(), pid()) -> dict:dict().
%% @doc Worker function for {@link combine_and_forward/4}; attempts to
%% recompose a partite elements for a given original input, as indicated by
%% `Ref'. Should all partite elements be stored in the dictionary, they are
Expand All @@ -101,7 +101,7 @@ combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid) ->
Dict
end.

-spec fetch_partitions(reference(), non_neg_integer(), dict(), [any()]) -> [any()].
-spec fetch_partitions(reference(), non_neg_integer(), dict:dict(), [any()]) -> [any()].
%% @doc Retrieves and returns a list of all entries with the same reference in
%% the specified dictionary.
fetch_partitions(_Ref, 0, _Dict, Acc) ->
Expand All @@ -110,7 +110,7 @@ fetch_partitions(Ref, NPartitions, Dict, Acc) ->
{ok, Piece} = dict:find({Ref, NPartitions}, Dict),
fetch_partitions(Ref, NPartitions-1, Dict, [Piece|Acc]).

-spec purge_partitions(reference(), non_neg_integer(), dict()) -> dict().
-spec purge_partitions(reference(), non_neg_integer(), dict:dict()) -> dict:dict().
%% @doc Recursively removes all entries with the same reference in a given
%% dictionary.
purge_partitions(Ref, 0, Dict) ->
Expand Down
14 changes: 12 additions & 2 deletions src/sk_map.erl
Expand Up @@ -88,7 +88,17 @@ make(WorkFlow, NWorkers) ->
make_hyb(WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers) ->
fun(NextPid) ->
CombinerPid = spawn(sk_map_combiner, start, [NextPid, NCPUWorkers+NGPUWorkers]),
WorkerPids = sk_utils:start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, CombinerPid),
spawn(sk_map_partitioner, start, [man, WorkerPids, CombinerPid])
{CPUWorkerPids, GPUWorkerPids} = sk_utils:start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, CombinerPid),
spawn(sk_map_partitioner, start_hyb, [man, CPUWorkerPids, GPUWorkerPids, CombinerPid])
end.

%make_hyb(WorkFlowCPU, WorkFlowGPU) ->
% fun(NextPid) ->
% CombinerPid = spawn(sk_map_combiner, start, [NextPid]),
% spawn(sk_map_partitioner, start, [auto, [{seq, fun(X) -> sk_utils:hyb_worker(WorkFlowCPU, WorkFlowGPU, X) end}],
% CombinerPid])
% end.




12 changes: 6 additions & 6 deletions src/sk_map_combiner.erl
Expand Up @@ -50,7 +50,7 @@ start(NextPid, NWorkers) ->
loop(man, NWorkers, 0, dict:new(), recomp_with(), NextPid).


-spec loop(atom(), non_neg_integer(), non_neg_integer(), dict(), data_recomp_fun(), pid()) -> 'eos'.
-spec loop(atom(), non_neg_integer(), non_neg_integer(), dict:dict(), data_recomp_fun(), pid()) -> 'eos'.
%% @doc Recursively receives and stores messages until groups of said messages
%% may be recomposed and sent. Serves to stop all processes once all inputs
%% have been processed.
Expand Down Expand Up @@ -109,7 +109,7 @@ new_total_workers(TotWorkers, _NPartitions) ->
TotWorkers.


-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict()) -> dict().
-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict:dict()) -> dict:dict().
%% @doc Stores in a dictionary the total number of partitions, `NPartitions',
%% expected; all messages heretofore received; and the number said received
%% messages, for the original input under the reference given by `Ref'. The
Expand All @@ -120,7 +120,7 @@ store(Ref, Idx, NPartitions, PartitionMessage, Dict) ->
dict:update_counter({Ref, received}, 1, Dict2).


-spec combine_and_forward(reference(), dict(), data_recomp_fun(), pid()) -> dict().
-spec combine_and_forward(reference(), dict:dict(), data_recomp_fun(), pid()) -> dict:dict().
%% @doc Attempts to find the reference as given by `Ref' in the specified
%% dictionary.
%%
Expand All @@ -134,7 +134,7 @@ combine_and_forward(Ref, Dict, DataCombinerFun, NextPid) ->
end.


-spec combine_and_forward(reference(), dict(), pos_integer(), data_recomp_fun(), pid()) -> dict().
-spec combine_and_forward(reference(), dict:dict(), pos_integer(), data_recomp_fun(), pid()) -> dict:dict().
%% @doc Inner-function for {@link combine_and_forward/4} that attempts to
%% restore a decomposed list from parts in a dictionary `Dict', whose
%% reference is given by `Ref'.
Expand All @@ -157,7 +157,7 @@ combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid) ->
end.


-spec fetch_partitions(reference(), non_neg_integer(), dict(), [any()]) -> [any()].
-spec fetch_partitions(reference(), non_neg_integer(), dict:dict(), [any()]) -> [any()].
%% @doc Returns a list of all data messages in the given dictionary, whose
%% reference is `Ref'.
fetch_partitions(_Ref, 0, _Dict, Acc) ->
Expand All @@ -167,7 +167,7 @@ fetch_partitions(Ref, NPartitions, Dict, Acc) ->
fetch_partitions(Ref, NPartitions-1, Dict, [Piece|Acc]).


-spec purge_partitions(reference(), non_neg_integer(), dict()) -> dict().
-spec purge_partitions(reference(), non_neg_integer(), dict:dict()) -> dict:dict().
%% @doc Recursively removes all entries with `Ref' as their reference in the
%% given dictionary.
purge_partitions(Ref, 0, Dict) ->
Expand Down
42 changes: 41 additions & 1 deletion src/sk_map_partitioner.erl
Expand Up @@ -23,7 +23,8 @@
-module(sk_map_partitioner).

-export([
start/3
start/3,
start_hyb/4
]).

-include("skel.hrl").
Expand Down Expand Up @@ -52,6 +53,12 @@ start(man, WorkerPids, CombinerPid) when is_pid(hd(WorkerPids)) ->
sk_tracer:t(75, self(), {?MODULE, start}, [{combiner, CombinerPid}]),
loop(decomp_by(), CombinerPid, WorkerPids).

-spec start_hyb(atom(), [pid()], [pid()], pid()) -> 'eos'.
start_hyb(man, CPUWorkerPids, GPUWorkerPids, CombinerPid) ->
sk_tracer:t(75, self(), {?MODULE, start}, [{combiner, CombinerPid}]),
loop_hyb(decomp_by(), CombinerPid, CPUWorkerPids, GPUWorkerPids).


-spec loop(data_decomp_fun(), workflow(), pid(), [pid()]) -> 'eos'.
%% @doc Recursively receives inputs as messages, which are decomposed, and the
%% resulting messages sent to individual workers. `loop/4' is used in place of
Expand Down Expand Up @@ -89,6 +96,21 @@ loop(DataPartitionerFun, CombinerPid, WorkerPids) ->
eos
end.

loop_hyb(DataPartitionerFun, CombinerPid, CPUWorkerPids, GPUWorkerPids) ->
receive
{data, _, _} = DataMessage ->
PartitionMessages = DataPartitionerFun(DataMessage),
Ref = make_ref(),
sk_tracer:t(60, self(), {?MODULE, data}, [{ref, Ref}, {input, DataMessage}, {partitions, PartitionMessages}]),
hyb_dispatch(Ref, length(PartitionMessages), PartitionMessages, CPUWorkerPids, GPUWorkerPids),
loop_hyb(DataPartitionerFun, CombinerPid, CPUWorkerPids, GPUWorkerPids);
{system, eos} ->
sk_utils:stop_workers(?MODULE, CPUWorkerPids),
sk_utils:stop_workers(?MODULE, GPUWorkerPids),
eos
end.



-spec decomp_by() -> data_decomp_fun().
%% @doc Provides the decomposition function and means to split a single input
Expand Down Expand Up @@ -124,6 +146,9 @@ start_workers(_NPartitions, _WorkFlow, _CombinerPid, WorkerPids) ->
dispatch(Ref, NPartitions, PartitionMessages, WorkerPids) ->
dispatch(Ref, NPartitions, 1, PartitionMessages, WorkerPids).

hyb_dispatch(Ref, NPartitions, PartitionMessages, CPUWorkerPids, GPUWorkerPids) ->
hyb_dispatch(Ref, NPartitions, 1, PartitionMessages, CPUWorkerPids, GPUWorkerPids).


-spec dispatch(reference(), pos_integer(), pos_integer(), [data_message(),...], [pid()]) -> 'ok'.
%% @doc Inner-function for {@link dispatch/4}. Recursively sends each message
Expand All @@ -136,3 +161,18 @@ dispatch(Ref, NPartitions, Idx, [PartitionMessage|PartitionMessages], [WorkerPid
sk_tracer:t(50, self(), WorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]),
WorkerPid ! PartitionMessage1,
dispatch(Ref, NPartitions, Idx+1, PartitionMessages, WorkerPids ++ [WorkerPid]).

hyb_dispatch(_Ref,_NPartitions, _Idx, [], _, _) ->
ok;
hyb_dispatch(Ref, NPartitions, Idx, [{DataTag,{cpu,Msg},Rest}|PartitionMessages], [CPUWorkerPid|CPUWorkerPids], GPUWorkerPids) ->
PartitionMessageWithoutTag = {DataTag, Msg, Rest},
PartitionMessage1 = sk_data:push({decomp, Ref, Idx, NPartitions}, PartitionMessageWithoutTag),
sk_tracer:t(50, self(), CPUWorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]),
CPUWorkerPid ! PartitionMessage1,
hyb_dispatch(Ref, NPartitions, Idx+1, PartitionMessages, CPUWorkerPids ++ [CPUWorkerPid], GPUWorkerPids);
hyb_dispatch(Ref, NPartitions, Idx, [{DataTag,{gpu,Msg},Rest}|PartitionMessages], CPUWorkerPids, [GPUWorkerPid|GPUWorkerPids]) ->
PartitionMessageWithoutTag = {DataTag, Msg, Rest},
PartitionMessage1 = sk_data:push({decomp, Ref, Idx, NPartitions}, PartitionMessageWithoutTag),
sk_tracer:t(50, self(), GPUWorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]),
GPUWorkerPid ! PartitionMessage1,
hyb_dispatch(Ref, NPartitions, Idx+1, PartitionMessages, CPUWorkerPids, GPUWorkerPids ++ [GPUWorkerPid]).
6 changes: 3 additions & 3 deletions src/sk_ord_reorderer.erl
Expand Up @@ -39,7 +39,7 @@ start(NextPid) ->
% dict1 is a dictionary of {Identifier, Data Message} key-value pairs
% Seen is a count of the loop: how many things it has seen.

-spec loop(non_neg_integer(), dict(), pid()) -> 'eos'.
-spec loop(non_neg_integer(), dict:dict(), pid()) -> 'eos'.
%% @doc Recursively receives and stores messages until they are ready for
%% release.
loop(Seen, Dict, NextPid) ->
Expand All @@ -55,13 +55,13 @@ loop(Seen, Dict, NextPid) ->
eos
end.

-spec store(pos_integer(), data_message(), dict()) -> dict().
-spec store(pos_integer(), data_message(), dict:dict()) -> dict:dict().
%% @doc Stores the given `Idx', indicating position, and message `DataMessage'
%% in the dictionary `Dict'. Returns the resulting dictionary.
store(Idx, DataMessage, Dict) ->
dict:store(Idx, DataMessage, Dict).

-spec forward(non_neg_integer(), dict(), pid()) -> {non_neg_integer(), dict()}.
-spec forward(non_neg_integer(), dict:dict(), pid()) -> {non_neg_integer(), dict:dict()}.
%% @doc Determines if any messages in the dictionary `Dict' can be released to
%% the process `NextPid'. This decision is based upon which messages have
%% already been released as indicated by the `Seen' counter.
Expand Down

0 comments on commit 164abba

Please sign in to comment.