From 164abba114af97bb30a80ddc1e060dba82067347 Mon Sep 17 00:00:00 2001 From: Vladimir Janjic Date: Fri, 15 May 2015 20:52:27 +0200 Subject: [PATCH] New set of hybrid skeletons (hyb_cluster and hyb_map) --- src/sk_assembler.erl | 25 ++++++++++ src/sk_cluster.erl | 99 +++++++++++++++++++++++++++++++++++++- src/sk_cluster_recomp.erl | 12 ++--- src/sk_map.erl | 14 +++++- src/sk_map_combiner.erl | 12 ++--- src/sk_map_partitioner.erl | 42 +++++++++++++++- src/sk_ord_reorderer.erl | 6 +-- src/sk_reduce_decomp.erl | 2 +- src/sk_reduce_reducer.erl | 8 +-- src/sk_utils.erl | 25 ++++++---- 10 files changed, 211 insertions(+), 34 deletions(-) diff --git a/src/sk_assembler.erl b/src/sk_assembler.erl index 3e8ed5e..5b38ca8 100644 --- a/src/sk_assembler.erl +++ b/src/sk_assembler.erl @@ -13,6 +13,7 @@ -export([ make/2 + ,make_hyb/4 ,run/2 ]). @@ -32,6 +33,12 @@ make(WorkFlow, EndPid) when is_pid(EndPid) -> MakeFns = [parse(Section) || Section <- WorkFlow], lists:foldr(fun(MakeFn, Pid) -> MakeFn(Pid) end, EndPid, MakeFns). +-spec make_hyb(workflow(), pid(), pos_integer(), pos_integer()) -> pid(). +make_hyb(WorkFlow, EndPid, NCPUWorkers, NGPUWorkers) when is_pid(EndPid) -> + MakeFns = [parse_hyb(Section, NCPUWorkers, NGPUWorkers) || Section <- WorkFlow], + lists:foldr(fun(MakeFn, Pid) -> MakeFn(Pid) end, EndPid, MakeFns). + + -spec run(pid() | workflow(), input()) -> pid(). %% @doc Function to produce and start a set of processes according to the %% given workflow specification and input. @@ -43,6 +50,14 @@ run(WorkFlow, Input) when is_list(WorkFlow) -> AssembledWF = make(WorkFlow, DrainPid), run(AssembledWF, Input). +parse_hyb(Section, NCPUWorkers, NGPUWorkers) -> + case Section of + {hyb_map, WorkFlowCPU, WorkFlowGPU} -> + parse({hyb_map, WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers}); + Other -> parse(Other) + end. + + -spec parse(wf_item()) -> maker_fun(). %% @doc Determines the course of action to be taken according to the type of %% workflow specified. Constructs and starts specific skeleton instances. @@ -62,11 +77,21 @@ parse({map, WorkFlow}) -> sk_map:make(WorkFlow); parse({map, WorkFlow, NWorkers}) -> sk_map:make(WorkFlow, NWorkers); +parse({hyb_map, WorkFlowCPU, WorkFlowGPU}) -> + sk_map:make_hyb(WorkFlowCPU, WorkFlowGPU); parse({hyb_map, WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers}) -> sk_map:make_hyb(WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers); parse({cluster, WorkFlow, Decomp, Recomp}) when is_function(Decomp, 1), is_function(Recomp, 1) -> sk_cluster:make(WorkFlow, Decomp, Recomp); +parse({hyb_cluster, WorkFlow, Decomp, Recomp, NCPUWorkers, NGPUWorkers}) when + is_function(Decomp, 1), is_function(Recomp, 1) -> + sk_cluster:make_hyb(WorkFlow, Decomp, Recomp, NCPUWorkers, NGPUWorkers); +parse({hyb_cluster, WorkFlow, TimeRatio, NCPUWorkers, NGPUWorkers}) -> + sk_cluster:make_hyb(WorkFlow, TimeRatio, NCPUWorkers, NGPUWorkers); +parse({hyb_cluster, WorkFlow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers}) -> + sk_cluster:make_hyb(WorkFlow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers); + % parse({decomp, WorkFlow, Decomp, Recomp}) when is_function(Decomp, 1), % is_function(Recomp, 1) -> % sk_decomp:make(WorkFlow, Decomp, Recomp); diff --git a/src/sk_cluster.erl b/src/sk_cluster.erl index 13bc24c..d960d25 100644 --- a/src/sk_cluster.erl +++ b/src/sk_cluster.erl @@ -33,7 +33,10 @@ -module(sk_cluster). -export([ - make/3 + make/3, + make_hyb/4, + make_hyb/5, + make_hyb/7 ]). -include("skel.hrl"). @@ -55,3 +58,97 @@ make(WorkFlow, Decomp, Recomp) -> WorkerPid = sk_utils:start_worker(WorkFlow, RecompPid), spawn(sk_cluster_decomp, start, [Decomp, WorkerPid]) end. + +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + + +mark_tasks([], _NCPUWorkers, _NGPUWorkers) -> + []; +mark_tasks(_Tasks, 0, 0) -> + []; +mark_tasks([Task|Tasks], 0, NGPUWorkers) -> + [{gpu, Task} | mark_tasks(Tasks, 0, NGPUWorkers-1)]; +mark_tasks([Task|Tasks], NCPUWorkers, NGPUWorkers) -> + [{cpu, Task} | mark_tasks(Tasks, NCPUWorkers-1, NGPUWorkers)]. + +hyb_cluster_decomp(Decomp, NCPUWorkers, NGPUWorkers, Input) -> + Tasks = Decomp(Input), + mark_tasks(Tasks, NCPUWorkers, NGPUWorkers). + +calculate_ratio(TimeRatio, NTasks, NCPUW, NGPUW) -> + TasksCPU = lists:seq(0, NTasks), + Time = fun(CPUTasks, GPUTasks) -> + max (ceiling(CPUTasks/NCPUW)*TimeRatio, ceiling(GPUTasks/NGPUW)) + end, + Ratio = lists:foldl(fun(Elem,Acc) -> FooBar = Time(Elem, NTasks-Elem), + if + (FooBar < element(1,Acc)) or (element(1,Acc) == -1) + -> {FooBar, Elem}; + true -> Acc + end end, + {-1,0}, TasksCPU), + {element(2,Ratio), NTasks-element(2,Ratio)}. + +calculate_chunk_sizes(NrItems, NrWorkers) -> + ChunkSize = NrItems div NrWorkers, + Remainder = NrItems rem NrWorkers, + ChunkSizes = lists:duplicate(Remainder, {ChunkSize+1}) ++ lists:duplicate(NrWorkers-Remainder, {ChunkSize}), + ChunkSizes. + +create_task_list([], [], _MakeChunkFun, _Input) -> + []; +create_task_list([CPUChunk|CPUChunks], GPUChunks, MakeChunkFun, Input) -> + CPUChunkSize = element(1,CPUChunk), + {Work, Rest} = MakeChunkFun(Input, CPUChunkSize), + [ {cpu, Work} | create_task_list(CPUChunks, GPUChunks, MakeChunkFun, Rest) ]; +create_task_list([], [GPUChunk|GPUChunks], MakeChunkFun, Input) -> + GPUChunkSize = element(1,GPUChunk), + {Work, Rest} = MakeChunkFun(Input, GPUChunkSize), + [ {gpu, Work} | create_task_list([], GPUChunks, MakeChunkFun, Rest) ]. + +hyb_cluster_decomp_default(TimeRatio, StructSizeFun, MakeChunkFun, NCPUWorkers, NGPUWorkers, Input) -> + NItems = StructSizeFun(Input), + {CPUItems, GPUItems} = if + (NCPUWorkers>0) and (NGPUWorkers>0) -> calculate_ratio(TimeRatio, NItems, NCPUWorkers, NGPUWorkers); + NGPUWorkers == 0 -> {NItems,0}; + NCPUWorkers == 0 -> {0, NItems} + end, + CPUChunkSizes = calculate_chunk_sizes(CPUItems, NCPUWorkers), + GPUChunkSizes = calculate_chunk_sizes(GPUItems, NGPUWorkers), + [create_task_list(CPUChunkSizes, GPUChunkSizes, MakeChunkFun, Input)]. + +-spec make_hyb(workflow(), decomp_fun(), recomp_fun(), pos_integer(), pos_integer()) -> fun((pid()) -> pid()). +make_hyb(Workflow, Decomp, Recomp, NCPUWorkers, NGPUWorkers) -> + fun(NextPid) -> + RecompPid = spawn(sk_cluster_recomp, start, [Recomp, NextPid]), + WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers), + spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp(Decomp, NCPUWorkers, NGPUWorkers, Input) end, + WorkerPid]) + end. + +-spec make_hyb(workflow(), float(), fun((any()) -> pos_integer()), fun((any(),pos_integer()) -> pos_integer()), + fun((any())->any()), + pos_integer(), pos_integer()) -> fun((pid()) -> pid()). +make_hyb(Workflow, TimeRatio, StructSizeFun, MakeChunkFun, RecompFun, NCPUWorkers, NGPUWorkers) -> + fun(NextPid) -> + RecompPid = spawn(sk_cluster_recomp, start, [RecompFun, NextPid]), + WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers), + spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp_default(TimeRatio, StructSizeFun, MakeChunkFun, NCPUWorkers, NGPUWorkers, Input) end, + WorkerPid]) + end. + +-spec make_hyb(workflow(), float(), pos_integer(), pos_integer()) -> fun((pid())->pid()). +make_hyb(Workflow, TimeRatio, NCPUWorkers, NGPUWorkers) -> + fun(NextPid) -> + RecompPid = spawn(sk_cluster_recomp, start, [fun lists:flatten/1, NextPid]), + WorkerPid = sk_utils:start_worker_hyb(Workflow, RecompPid, NCPUWorkers, NGPUWorkers), + spawn(sk_cluster_decomp, start, [fun (Input) -> hyb_cluster_decomp_default(TimeRatio, fun length/1,fun (Data,Pos) -> lists:split(Pos,Data) end, NCPUWorkers, NGPUWorkers, Input) end, + WorkerPid]) + end. + diff --git a/src/sk_cluster_recomp.erl b/src/sk_cluster_recomp.erl index d24eeb9..751c91a 100644 --- a/src/sk_cluster_recomp.erl +++ b/src/sk_cluster_recomp.erl @@ -41,7 +41,7 @@ start(Recomp, NextPid) -> DataRecompFun = sk_data:recomp_with(Recomp), loop(dict:new(), DataRecompFun, NextPid). --spec loop(dict(), data_recomp_fun(), pid()) -> 'eos'. +-spec loop(dict:dict(), data_recomp_fun(), pid()) -> 'eos'. %% @doc Worker function for {@link start/2}; recursively receives and combines %% messages using the recomposition function under `DataRecompFun'. Once all %% partite elements for each original input have been received and merged, the @@ -59,7 +59,7 @@ loop(Dict, DataRecompFun, NextPid) -> eos end. --spec store(reference(), pos_integer(), pos_integer(), data_message(), dict()) -> dict(). +-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict:dict()) -> dict:dict(). %% @doc Facilitates the storing of all partite and semi-combined messages, for each input. %% %% The total number of partite elements expected, the partite element data @@ -71,7 +71,7 @@ store(Ref, Idx, NPartitions, PartitionMessage, Dict) -> Dict2 = dict:store({Ref, Idx}, PartitionMessage, Dict1), dict:update_counter({Ref, received}, 1, Dict2). --spec combine_and_forward(reference(), dict(), data_recomp_fun(), pid()) -> dict(). +-spec combine_and_forward(reference(), dict:dict(), data_recomp_fun(), pid()) -> dict:dict(). %% @doc Attempts to find the grouping reference under `Ref' in the given %% dictionary. If that reference is found, all message parts are combined %% using the recomposition function given under `DataCombinerFun'. @@ -81,7 +81,7 @@ combine_and_forward(Ref, Dict, DataCombinerFun, NextPid) -> {ok, NPartitions} -> combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid) end. --spec combine_and_forward(reference(), dict(), pos_integer(), data_recomp_fun(), pid()) -> dict(). +-spec combine_and_forward(reference(), dict:dict(), pos_integer(), data_recomp_fun(), pid()) -> dict:dict(). %% @doc Worker function for {@link combine_and_forward/4}; attempts to %% recompose a partite elements for a given original input, as indicated by %% `Ref'. Should all partite elements be stored in the dictionary, they are @@ -101,7 +101,7 @@ combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid) -> Dict end. --spec fetch_partitions(reference(), non_neg_integer(), dict(), [any()]) -> [any()]. +-spec fetch_partitions(reference(), non_neg_integer(), dict:dict(), [any()]) -> [any()]. %% @doc Retrieves and returns a list of all entries with the same reference in %% the specified dictionary. fetch_partitions(_Ref, 0, _Dict, Acc) -> @@ -110,7 +110,7 @@ fetch_partitions(Ref, NPartitions, Dict, Acc) -> {ok, Piece} = dict:find({Ref, NPartitions}, Dict), fetch_partitions(Ref, NPartitions-1, Dict, [Piece|Acc]). --spec purge_partitions(reference(), non_neg_integer(), dict()) -> dict(). +-spec purge_partitions(reference(), non_neg_integer(), dict:dict()) -> dict:dict(). %% @doc Recursively removes all entries with the same reference in a given %% dictionary. purge_partitions(Ref, 0, Dict) -> diff --git a/src/sk_map.erl b/src/sk_map.erl index 0bb5dae..471f5da 100644 --- a/src/sk_map.erl +++ b/src/sk_map.erl @@ -88,7 +88,17 @@ make(WorkFlow, NWorkers) -> make_hyb(WorkFlowCPU, WorkFlowGPU, NCPUWorkers, NGPUWorkers) -> fun(NextPid) -> CombinerPid = spawn(sk_map_combiner, start, [NextPid, NCPUWorkers+NGPUWorkers]), - WorkerPids = sk_utils:start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, CombinerPid), - spawn(sk_map_partitioner, start, [man, WorkerPids, CombinerPid]) + {CPUWorkerPids, GPUWorkerPids} = sk_utils:start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, CombinerPid), + spawn(sk_map_partitioner, start_hyb, [man, CPUWorkerPids, GPUWorkerPids, CombinerPid]) end. +%make_hyb(WorkFlowCPU, WorkFlowGPU) -> +% fun(NextPid) -> +% CombinerPid = spawn(sk_map_combiner, start, [NextPid]), +% spawn(sk_map_partitioner, start, [auto, [{seq, fun(X) -> sk_utils:hyb_worker(WorkFlowCPU, WorkFlowGPU, X) end}], +% CombinerPid]) +% end. + + + + diff --git a/src/sk_map_combiner.erl b/src/sk_map_combiner.erl index 34ee3a4..1950538 100644 --- a/src/sk_map_combiner.erl +++ b/src/sk_map_combiner.erl @@ -50,7 +50,7 @@ start(NextPid, NWorkers) -> loop(man, NWorkers, 0, dict:new(), recomp_with(), NextPid). --spec loop(atom(), non_neg_integer(), non_neg_integer(), dict(), data_recomp_fun(), pid()) -> 'eos'. +-spec loop(atom(), non_neg_integer(), non_neg_integer(), dict:dict(), data_recomp_fun(), pid()) -> 'eos'. %% @doc Recursively receives and stores messages until groups of said messages %% may be recomposed and sent. Serves to stop all processes once all inputs %% have been processed. @@ -109,7 +109,7 @@ new_total_workers(TotWorkers, _NPartitions) -> TotWorkers. --spec store(reference(), pos_integer(), pos_integer(), data_message(), dict()) -> dict(). +-spec store(reference(), pos_integer(), pos_integer(), data_message(), dict:dict()) -> dict:dict(). %% @doc Stores in a dictionary the total number of partitions, `NPartitions', %% expected; all messages heretofore received; and the number said received %% messages, for the original input under the reference given by `Ref'. The @@ -120,7 +120,7 @@ store(Ref, Idx, NPartitions, PartitionMessage, Dict) -> dict:update_counter({Ref, received}, 1, Dict2). --spec combine_and_forward(reference(), dict(), data_recomp_fun(), pid()) -> dict(). +-spec combine_and_forward(reference(), dict:dict(), data_recomp_fun(), pid()) -> dict:dict(). %% @doc Attempts to find the reference as given by `Ref' in the specified %% dictionary. %% @@ -134,7 +134,7 @@ combine_and_forward(Ref, Dict, DataCombinerFun, NextPid) -> end. --spec combine_and_forward(reference(), dict(), pos_integer(), data_recomp_fun(), pid()) -> dict(). +-spec combine_and_forward(reference(), dict:dict(), pos_integer(), data_recomp_fun(), pid()) -> dict:dict(). %% @doc Inner-function for {@link combine_and_forward/4} that attempts to %% restore a decomposed list from parts in a dictionary `Dict', whose %% reference is given by `Ref'. @@ -157,7 +157,7 @@ combine_and_forward(Ref, Dict, NPartitions, DataCombinerFun, NextPid) -> end. --spec fetch_partitions(reference(), non_neg_integer(), dict(), [any()]) -> [any()]. +-spec fetch_partitions(reference(), non_neg_integer(), dict:dict(), [any()]) -> [any()]. %% @doc Returns a list of all data messages in the given dictionary, whose %% reference is `Ref'. fetch_partitions(_Ref, 0, _Dict, Acc) -> @@ -167,7 +167,7 @@ fetch_partitions(Ref, NPartitions, Dict, Acc) -> fetch_partitions(Ref, NPartitions-1, Dict, [Piece|Acc]). --spec purge_partitions(reference(), non_neg_integer(), dict()) -> dict(). +-spec purge_partitions(reference(), non_neg_integer(), dict:dict()) -> dict:dict(). %% @doc Recursively removes all entries with `Ref' as their reference in the %% given dictionary. purge_partitions(Ref, 0, Dict) -> diff --git a/src/sk_map_partitioner.erl b/src/sk_map_partitioner.erl index 01b90eb..8213820 100644 --- a/src/sk_map_partitioner.erl +++ b/src/sk_map_partitioner.erl @@ -23,7 +23,8 @@ -module(sk_map_partitioner). -export([ - start/3 + start/3, + start_hyb/4 ]). -include("skel.hrl"). @@ -52,6 +53,12 @@ start(man, WorkerPids, CombinerPid) when is_pid(hd(WorkerPids)) -> sk_tracer:t(75, self(), {?MODULE, start}, [{combiner, CombinerPid}]), loop(decomp_by(), CombinerPid, WorkerPids). +-spec start_hyb(atom(), [pid()], [pid()], pid()) -> 'eos'. +start_hyb(man, CPUWorkerPids, GPUWorkerPids, CombinerPid) -> + sk_tracer:t(75, self(), {?MODULE, start}, [{combiner, CombinerPid}]), + loop_hyb(decomp_by(), CombinerPid, CPUWorkerPids, GPUWorkerPids). + + -spec loop(data_decomp_fun(), workflow(), pid(), [pid()]) -> 'eos'. %% @doc Recursively receives inputs as messages, which are decomposed, and the %% resulting messages sent to individual workers. `loop/4' is used in place of @@ -89,6 +96,21 @@ loop(DataPartitionerFun, CombinerPid, WorkerPids) -> eos end. +loop_hyb(DataPartitionerFun, CombinerPid, CPUWorkerPids, GPUWorkerPids) -> + receive + {data, _, _} = DataMessage -> + PartitionMessages = DataPartitionerFun(DataMessage), + Ref = make_ref(), + sk_tracer:t(60, self(), {?MODULE, data}, [{ref, Ref}, {input, DataMessage}, {partitions, PartitionMessages}]), + hyb_dispatch(Ref, length(PartitionMessages), PartitionMessages, CPUWorkerPids, GPUWorkerPids), + loop_hyb(DataPartitionerFun, CombinerPid, CPUWorkerPids, GPUWorkerPids); + {system, eos} -> + sk_utils:stop_workers(?MODULE, CPUWorkerPids), + sk_utils:stop_workers(?MODULE, GPUWorkerPids), + eos + end. + + -spec decomp_by() -> data_decomp_fun(). %% @doc Provides the decomposition function and means to split a single input @@ -124,6 +146,9 @@ start_workers(_NPartitions, _WorkFlow, _CombinerPid, WorkerPids) -> dispatch(Ref, NPartitions, PartitionMessages, WorkerPids) -> dispatch(Ref, NPartitions, 1, PartitionMessages, WorkerPids). +hyb_dispatch(Ref, NPartitions, PartitionMessages, CPUWorkerPids, GPUWorkerPids) -> + hyb_dispatch(Ref, NPartitions, 1, PartitionMessages, CPUWorkerPids, GPUWorkerPids). + -spec dispatch(reference(), pos_integer(), pos_integer(), [data_message(),...], [pid()]) -> 'ok'. %% @doc Inner-function for {@link dispatch/4}. Recursively sends each message @@ -136,3 +161,18 @@ dispatch(Ref, NPartitions, Idx, [PartitionMessage|PartitionMessages], [WorkerPid sk_tracer:t(50, self(), WorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]), WorkerPid ! PartitionMessage1, dispatch(Ref, NPartitions, Idx+1, PartitionMessages, WorkerPids ++ [WorkerPid]). + +hyb_dispatch(_Ref,_NPartitions, _Idx, [], _, _) -> + ok; +hyb_dispatch(Ref, NPartitions, Idx, [{DataTag,{cpu,Msg},Rest}|PartitionMessages], [CPUWorkerPid|CPUWorkerPids], GPUWorkerPids) -> + PartitionMessageWithoutTag = {DataTag, Msg, Rest}, + PartitionMessage1 = sk_data:push({decomp, Ref, Idx, NPartitions}, PartitionMessageWithoutTag), + sk_tracer:t(50, self(), CPUWorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]), + CPUWorkerPid ! PartitionMessage1, + hyb_dispatch(Ref, NPartitions, Idx+1, PartitionMessages, CPUWorkerPids ++ [CPUWorkerPid], GPUWorkerPids); +hyb_dispatch(Ref, NPartitions, Idx, [{DataTag,{gpu,Msg},Rest}|PartitionMessages], CPUWorkerPids, [GPUWorkerPid|GPUWorkerPids]) -> + PartitionMessageWithoutTag = {DataTag, Msg, Rest}, + PartitionMessage1 = sk_data:push({decomp, Ref, Idx, NPartitions}, PartitionMessageWithoutTag), + sk_tracer:t(50, self(), GPUWorkerPid, {?MODULE, data}, [{partition, PartitionMessage1}]), + GPUWorkerPid ! PartitionMessage1, + hyb_dispatch(Ref, NPartitions, Idx+1, PartitionMessages, CPUWorkerPids, GPUWorkerPids ++ [GPUWorkerPid]). diff --git a/src/sk_ord_reorderer.erl b/src/sk_ord_reorderer.erl index 8f9771e..4305d2b 100644 --- a/src/sk_ord_reorderer.erl +++ b/src/sk_ord_reorderer.erl @@ -39,7 +39,7 @@ start(NextPid) -> % dict1 is a dictionary of {Identifier, Data Message} key-value pairs % Seen is a count of the loop: how many things it has seen. --spec loop(non_neg_integer(), dict(), pid()) -> 'eos'. +-spec loop(non_neg_integer(), dict:dict(), pid()) -> 'eos'. %% @doc Recursively receives and stores messages until they are ready for %% release. loop(Seen, Dict, NextPid) -> @@ -55,13 +55,13 @@ loop(Seen, Dict, NextPid) -> eos end. --spec store(pos_integer(), data_message(), dict()) -> dict(). +-spec store(pos_integer(), data_message(), dict:dict()) -> dict:dict(). %% @doc Stores the given `Idx', indicating position, and message `DataMessage' %% in the dictionary `Dict'. Returns the resulting dictionary. store(Idx, DataMessage, Dict) -> dict:store(Idx, DataMessage, Dict). --spec forward(non_neg_integer(), dict(), pid()) -> {non_neg_integer(), dict()}. +-spec forward(non_neg_integer(), dict:dict(), pid()) -> {non_neg_integer(), dict:dict()}. %% @doc Determines if any messages in the dictionary `Dict' can be released to %% the process `NextPid'. This decision is based upon which messages have %% already been released as indicated by the `Seen' counter. diff --git a/src/sk_reduce_decomp.erl b/src/sk_reduce_decomp.erl index db2b18d..78aa6e9 100644 --- a/src/sk_reduce_decomp.erl +++ b/src/sk_reduce_decomp.erl @@ -19,7 +19,7 @@ -include("skel.hrl"). --type pid_pools() :: dict(). +-type pid_pools() :: dict:dict(). -spec start(decomp_fun(), reduce_fun(), pid()) -> eos. %% @doc Starts the reduce process. Takes the developer-defined reduction and diff --git a/src/sk_reduce_reducer.erl b/src/sk_reduce_reducer.erl index d14802b..cde0ee6 100644 --- a/src/sk_reduce_reducer.erl +++ b/src/sk_reduce_reducer.erl @@ -33,7 +33,7 @@ start(DataFun, NextPid) -> % 1st Case: data message. Stores items in a dictionary until they can be reduced (i.e. on every second call, dict1 is emptied). % 2nd Case: Occurs when you have an odd-length list as input. --spec loop(dict(), integer(), data_reduce_fun(), pid()) -> eos. +-spec loop(dict:dict(), integer(), data_reduce_fun(), pid()) -> eos. %% @doc The main message receiver loop. Recursively receives messages upon %% which, if said messages carry data, a reduction is attempted using %% `DataFun'. @@ -57,13 +57,13 @@ loop(Dict, EOSRecvd, DataFun, NextPid) -> loop(Dict, EOSRecvd+1, DataFun, NextPid) end. --spec store(reference(), dict(), maybe_data()) -> dict(). +-spec store(reference(), dict:dict(), maybe_data()) -> dict:dict(). %% @doc Stores the given reference `Ref' and value `Value' in the dictionary %% `Dict'. Returns the resulting dictionary. store(Ref, Dict, Value) -> dict:append(Ref, Value, Dict). --spec maybe_reduce(reference(), integer(), pid(), data_reduce_fun(), dict()) -> dict(). +-spec maybe_reduce(reference(), integer(), pid(), data_reduce_fun(), dict:dict()) -> dict:dict(). %% @doc Attempts to find the reference `Ref' in the dictionary `Dict'. If %% found, a reduction shall be attempted. Otherwise, the dictionary is simply %% returned. @@ -79,7 +79,7 @@ maybe_reduce(Ref, ReduceCount, NextPid, DataFun, Dict) -> % Case 4: Reference has two data message entries, which are then reduced. % Deletes the reference from the dictionary, result is returned. --spec reduce(reference(), integer(), pid(), [maybe_data(),...], data_reduce_fun(), dict()) -> dict(). +-spec reduce(reference(), integer(), pid(), [maybe_data(),...], data_reduce_fun(), dict:dict()) -> dict:dict(). %% @doc The reduction function. Given a list of length two containing specific %% data messages retreived from `Dict', all messages are reduced to a single %% message. Returns the (now-erased) dictionary. diff --git a/src/sk_utils.erl b/src/sk_utils.erl index ddeff68..c693080 100644 --- a/src/sk_utils.erl +++ b/src/sk_utils.erl @@ -11,9 +11,10 @@ -export([ start_workers/3 + ,start_worker_hyb/4 + ,start_workers_hyb/5 ,start_worker/2 ,stop_workers/2 - ,start_workers_hyb/5 ]). -include("skel.hrl"). @@ -23,9 +24,9 @@ start_workers(NWorkers, WorkFlow, NextPid) -> start_workers(NWorkers, WorkFlow, NextPid, []). --spec start_workers_hyb(pos_integer(), pos_integer(), workflow(), workflow(), pid()) -> [pid()]. +-spec start_workers_hyb(pos_integer(), pos_integer(), workflow(), workflow(), pid()) -> {[pid()],[pid()]}. start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid) -> - start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, []). + start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, {[],[]}). -spec start_workers(pos_integer(), workflow(), pid(), [pid()]) -> [pid()]. %% @doc Starts a given number NWorkers of workers as children to the @@ -37,17 +38,16 @@ start_workers(NWorkers, WorkFlow, NextPid, WorkerPids) -> NewWorker = start_worker(WorkFlow, NextPid), start_workers(NWorkers-1, WorkFlow, NextPid, [NewWorker|WorkerPids]). --spec start_workers_hyb(pos_integer(), pos_integer(), workflow(), workflow(), pid(), [pid()]) -> [pid()]. -start_workers_hyb(NCPUWorkers, NGPUWorkers, _WorkFlowCPU, _WorkFlowGPU, _NextPid, WorkerPids) +start_workers_hyb(NCPUWorkers, NGPUWorkers, _WorkFlowCPU, _WorkFlowGPU, _NextPid, Acc) when (NCPUWorkers < 1) and (NGPUWorkers < 1) -> - WorkerPids; -start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, WorkerPids) + Acc; +start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, {CPUWs,GPUWs}) when NCPUWorkers < 1 -> NewWorker = start_worker(WorkFlowGPU, NextPid), - start_workers_hyb(NCPUWorkers, NGPUWorkers-1, WorkFlowCPU, WorkFlowGPU, NextPid, [NewWorker|WorkerPids]); -start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, WorkerPids) -> + start_workers_hyb(NCPUWorkers, NGPUWorkers-1, WorkFlowCPU, WorkFlowGPU, NextPid, {CPUWs, [NewWorker|GPUWs]}); +start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, {CPUWs, GPUWs}) -> NewWorker = start_worker(WorkFlowCPU, NextPid), - start_workers_hyb(NCPUWorkers-1, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, [NewWorker|WorkerPids]). + start_workers_hyb(NCPUWorkers-1, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, {[NewWorker|CPUWs],GPUWs}). -spec start_worker(workflow(), pid()) -> pid(). %% @doc Provides a worker with its tasks, the workflow WorkFlow. @@ -56,6 +56,10 @@ start_workers_hyb(NCPUWorkers, NGPUWorkers, WorkFlowCPU, WorkFlowGPU, NextPid, W start_worker(WorkFlow, NextPid) -> sk_assembler:make(WorkFlow, NextPid). +-spec start_worker_hyb(workflow(), pid(), pos_integer(), pos_integer()) -> pid(). +start_worker_hyb(WorkFlow, NextPid, NCPUWorkers, NGPUWorkers) -> + sk_assembler:make_hyb(WorkFlow, NextPid, NCPUWorkers, NGPUWorkers). + -spec stop_workers(module(), [pid()]) -> 'eos'. %% @doc Sends the halt command to each worker in the given list of worker %% processes. @@ -65,3 +69,4 @@ stop_workers(Mod, [Worker|Rest]) -> sk_tracer:t(85, self(), Worker, {Mod, system}, [{msg, eos}]), Worker ! {system, eos}, stop_workers(Mod, Rest). +