-module(mergesort).
-compile(export_all).
%-export([run/2]).
-ifdef(debug).
-define(TRACE(Format, Data), io:format(string:concat("TRACE ~p:~p ", Format), [?MODULE, ?LINE] ++ Data)).
-else.
-define(TRACE(Format, Data), void).
-endif.
run(Dimensions, M) ->
TimeStart = time_microseconds(),
DataSize = round(math:pow(2, Dimensions)) * M,
random_seed(Dimensions + DataSize),
Data = [random_value(DataSize) || _ <- lists:seq(1, DataSize)],
Result = mergesort_exec(Data, Dimensions),
?TRACE("RESULT FINAL: ~p", [Result]),
ResultFlat = lists:flatten([R || {_, R} <- lists:keysort(1, Result)]),
TimeEnd = time_microseconds(),
TimeElapsed = TimeEnd - TimeStart,
io:format("data: ~p~n result: ~p~n time: ~.2f us~n", [Data, ResultFlat, TimeElapsed]).
mergesort_exec(Data, 0) ->
mergesort(Data);
mergesort_exec(Data, Dimensions) ->
UniqueID = time_microseconds(),
ProcessesTotal = dimensions_to_processes(Dimensions),
Processes = create_processes(Dimensions, ProcessesTotal, Data),
WorkerIDs = [IDs || {IDs, _} <- Processes],
WorkerPids = [Pids || {_, Pids} <- Processes],
[Process ! {init, self(), WorkerPids, UniqueID} || {_, Process} <- Processes],
finalize(WorkerIDs, WorkerPids).
mergesort(Data) when length(Data) =< 1 ->
Data;
mergesort(Data) ->
{Left, Right} = split(Data),
merge(mergesort(Left), mergesort(Right)).
merge([], Result) ->
Result;
merge(Result, []) ->
Result;
merge(Left, Right) ->
[LeftFirst | LeftRest] = Left,
[RightFirst | RightRest] = Right,
case LeftFirst =< RightFirst of
true ->
[LeftFirst | merge(LeftRest, Right)];
false ->
[RightFirst | merge(Left, RightRest)]
end.
destinations(SenderID, Dimensions, WorkerPids) ->
DestIDs = [SenderID bxor round(math:pow(2, X)) || X <- lists:seq(0, Dimensions - 1)],
DestPids = [lists:sublist(WorkerPids, S + 1, 1) || S <- DestIDs],
[{DestID, DestPid} || {DestID, [DestPid]} <- lists:zip(DestIDs, DestPids)].
compare_exchange_decode(SenderID, Dimension) ->
case (SenderID band round(math:pow(2, Dimension - 1))) > 0 of
true ->
high;
false ->
low
end.
compare_exchange(MyID, Destinations, Dimensions, Data, ID) ->
compare_exchange(MyID, Destinations, Dimensions, 1, Data, ID, 0).
compare_exchange(_, [], _, _, Data, _, _) ->
Data;
compare_exchange(MyID, [{DestID, Dest} | Destinations], Dimensions, Dimension, Data, ID, Strobe) ->
Mode = compare_exchange_decode(MyID, Dimension),
?TRACE(
"CE: myid: ~p(~p) dim: ~p/~p dest: ~p(~p) mode: ~p~n",
[MyID, self(), Dimension, Dimensions, DestID, Dest, Mode]
),
{Low, High} = {Data, Data},
case Mode of
high ->
Result = compare_exchange_high(High, Dest, DestID, ID, Strobe);
low ->
Result = compare_exchange_low(Low, Dest, DestID, ID, Strobe)
end,
compare_exchange(MyID, Destinations, Dimensions, Dimension + 1, Result, ID, Strobe + 1).
compare_exchange_high(High, Dest, _DestID, ID, Strobe) ->
?TRACE("CE-HIGH: High: ~p Dest: ~p DestID: ~p self: ~p~n", [High, Dest, _DestID, self()]),
Dest ! {ce_high, High, self(), ID, Strobe},
receive
{ce_low, Low, _From, ID, Strobe} ->
{_, Result} = split(merge(Low, High)),
?TRACE("CE-HIGH-Result:~n Low: ~p~n High: ~p~n Result: ~p~n", [Low, High, Result])
end,
Result.
compare_exchange_low(Low, Dest, _DestID, ID, Strobe) ->
?TRACE("CE-LOW: Low: ~p Dest: ~p DestID: ~p self: ~p~n", [Low, Dest, _DestID, self()]),
Dest ! {ce_low, Low, self(), ID, Strobe},
receive
{ce_high, High, _From, ID, Strobe} ->
{Result, _} = split(merge(Low, High)),
?TRACE("CE-LOW-Result:~n Low: ~p~n High: ~p~n Result: ~p~n", [Low, High, Result])
end,
Result.
create_processes(Dimensions, Processes, Data) ->
DataDistribution = data_distribution(Data, Processes),
[
{ID, spawn(fun() -> worker(ID, D, Dimensions) end)}
||
{ID, D} <- lists:zip(lists:seq(0, Processes - 1), DataDistribution)
].
data_distribution(Data, Processes) ->
data_distribution(Data, Processes, [], 1).
data_distribution(_, Processes, DataDistributed, _) when length(DataDistributed) =:= Processes ->
DataDistributed;
data_distribution(Data, Processes, DataDistributed, Index) ->
FragmentSize = length(Data) div Processes,
Fragment = lists:sublist(Data, Index, FragmentSize),
data_distribution(Data, Processes, [Fragment | DataDistributed], Index + FragmentSize).
dimensions_to_processes(Dimensions) ->
round(math:pow(2, Dimensions)).
worker(MyID, Data, Dimensions) ->
receive
{init, Parent, WorkerPids, ID} ->
Destinations = destinations(MyID, Dimensions, WorkerPids),
?TRACE("WORKER: myid: ~p(~p) destinations: ~p~n", [MyID, self(), Destinations]),
Result = compare_exchange(MyID, Destinations, Dimensions, mergesort(Data), ID),
?TRACE("SENDING: myid: ~p(~p) ~p~n", [MyID, self(), Result]),
Parent ! {done, Result, MyID, self()}
end.
samples_calc(SamplesTotal, ProcNum, ProcTotal) when ProcNum < ProcTotal ->
SamplesBase = SamplesTotal div ProcTotal,
SamplesRemainder = SamplesTotal rem ProcTotal,
if
ProcNum < SamplesRemainder ->
SamplesBase + 1;
true ->
SamplesBase
end.
finalize(IDs, Pids) ->
%?TRACE("Processes: ~p ~p~n", [IDs, Pids]),
finalize(IDs, Pids, []).
finalize([], _, Results) ->
Results;
finalize(Processes, Pids, Results) ->
receive
{done, Result, MyID, From} ->
[Process | ProcessRemainder] = Processes,
%?TRACE("HIT: process: ~p myid: ~p~n", [Process, MyID]),
IsValid = lists:member(From, Pids),
if
IsValid and (length(Results) + 1 =:= length(Processes)) ->
%?TRACE("DONE (final): from: ~p result: ~p results: ~p~n", [From, Result, Results]),
%?TRACE("length(results): ~p length(processes): ~p~n", [length(Results), length(Processes)]),
[{MyID, Result} | Results];
IsValid ->
%?TRACE("DONE: from: ~p result: ~p results: ~p~n", [From, Result, Results]),
%?TRACE("length(results): ~p length(processes): ~p~n", [length(Results), length(Processes)]),
finalize(ProcessRemainder, Pids, [{MyID, Result} | Results]);
true ->
finalize(Processes, Pids, Results)
end
end.
random_seed(X) ->
{S1, S2, S3} = now(),
random:seed(S1 + X, S2 + X, S3 + X).
random_value(DataSize) ->
round(random:uniform() * DataSize).
split(Data) ->
Midpoint = length(Data) div 2,
Left = lists:sublist(Data, Midpoint),
Right = lists:nthtail(Midpoint, Data),
{Left, Right}.
time_microseconds() ->
{MS, S, US} = now(),
(MS * 1.0e+12) + (S * 1.0e+6) + US.