In [None]:
# add 5 executor processes, master is the driver
addprocs(5);

In [None]:
include("../test/daggen.jl") # helper functions for generating DAGs

using DagScheduler           # load DagScheduler
using GraphViz               # for visualizing DAGs
using Base.Test

# cleanup any stale leftover files
isdir(".mempool") && rm(".mempool"; recursive=true);

In [None]:
# set some options (needed for large DAGs)
@everywhere begin
    # these are only for Shared Memory Metadata store
    DagScheduler.META_IMPL[:map_num_entries] = 1024*100
    DagScheduler.META_IMPL[:map_entry_sz] = 1512
end

# create a runenv and plug it into Dagger
runenv = DagScheduler.Plugin.setrunenv(RunEnv())

In [None]:
# deep DAG: depth to which dags are stacked is large
dag1 = gen_straight_dag(ones(Int, 6^4))
Graph(Dagger.show_plan(dag1))

In [None]:
result = collect(rundag(runenv, dag1))
@test result == 1

In [None]:
DagScheduler.print_stats(runenv)

In [None]:
# cross connected DAG: output consumed by multiple nodes upstream
dag3 = gen_cross_dag()
Graph(Dagger.show_plan(dag3))

In [None]:
result = collect(rundag(runenv, dag3))
@test result == 84

In [None]:
DagScheduler.print_stats(runenv)

In [None]:
# array sorting
L = 10^6
dag2 = gen_sort_dag(L, 40, 4, 1)
Graph(Dagger.show_plan(dag2))

In [None]:
result = collect(rundag(runenv, dag2))
@test issorted(result)

In [None]:
# cleanup MemPool remnants (reference counting doesn't work fully yet)
@everywhere MemPool.cleanup()
DagScheduler.print_stats(runenv)

In [None]:
# sorting with distributed output
dag4 = gen_sort_dag(L, 40, 4, 40)
Graph(Dagger.show_plan(dag4))

In [None]:
DagScheduler.dref_to_fref!(dag4) # (optional) use file refs instead, for faster inter-process IO
result = collect(rundag(runenv, dag4))

In [None]:
# concatenate and test if it is sorted
fullresult = collect(Dagger.treereduce(delayed(vcat), result))
@test issorted(fullresult)

In [None]:
# cleanup MemPool remnants (reference counting doesn't work fully yet)
@everywhere MemPool.cleanup()
DagScheduler.print_stats(runenv)

In [None]:
# cleanup everything
DagScheduler.cleanup(runenv)
isdir(".mempool") && rm(".mempool"; recursive=true)