From d33746b3e0bb9579f54ff11621b59412e41bf8df Mon Sep 17 00:00:00 2001 From: "Julian P. Samaroo" Date: Wed, 9 Mar 2022 15:36:53 -0600 Subject: [PATCH] benchmarks: Split out suites Split suites out into individual files Provide usage info when run without BENCHMARK env var Add option to save logs to output file Add DTable CSV/Arrow reading suite --- benchmarks/analysis.jl | 43 ++++ benchmarks/benchmark.jl | 383 +++++++++++++----------------------- benchmarks/suites/array.jl | 49 +++++ benchmarks/suites/dtable.jl | 132 +++++++++++++ benchmarks/suites/nmf.jl | 124 ++++++++++++ 5 files changed, 490 insertions(+), 241 deletions(-) create mode 100644 benchmarks/analysis.jl create mode 100644 benchmarks/suites/array.jl create mode 100644 benchmarks/suites/dtable.jl create mode 100644 benchmarks/suites/nmf.jl diff --git a/benchmarks/analysis.jl b/benchmarks/analysis.jl new file mode 100644 index 000000000..aeacb7db7 --- /dev/null +++ b/benchmarks/analysis.jl @@ -0,0 +1,43 @@ +path = ARGS[1] +start, finish = parse.(UInt64, ARGS[2:3]) + +using DataFrames, BenchmarkTools, Dagger, Serialization + +logs = deserialize(path)["logs"] + +""" + spans(logs::DataFrame) -> DataFrame + +Combines start and finish event pairs, and computes their timespan. +""" +function spans(logs) + df = Any[] + evs = Dict{Symbol,Dict{Any,Any}}() + for log in eachrow(logs) + _evs = get!(evs, log.core.category) do + Dict{Any,Any}() + end + if log.core.kind == :finish + if haskey(_evs, log.id) + start = pop!(_evs, log.id) + ev = merge(log, (;span=(start.core.timestamp, log.core.timestamp))) + push!(df, ev) + else + @warn "Dropped :finish for $(log.core.category)" + end + elseif log.core.kind == :start + _evs[log.id] = log + end + end + DataFrame(df) +end + +# Combine, select target range, and filter out `take` +tgt = subset(spans(logs), :core=>ByRow(x->start <= x.timestamp <= finish), + :core=>ByRow(x->x.category != :take)) + +# Sort by largest time contribution +tgt_sort = DataFrame(sort(eachrow(tgt), by=r->r.span[2] - r.span[1], rev=true)) +transform!(tgt_sort, :span=>ByRow(s->(s[2]-s[1]) / (1000^3))=>:span_s) +println("Total: $((finish-start) / (1000^3))") +foreach(println, zip(map(c->c.category, tgt_sort.core), tgt_sort.span_s)) diff --git a/benchmarks/benchmark.jl b/benchmarks/benchmark.jl index e3e8adb3c..e48aeceae 100644 --- a/benchmarks/benchmark.jl +++ b/benchmarks/benchmark.jl @@ -1,3 +1,61 @@ +usage = """ +Dagger.jl Benchmarking Platform + +This benchmarking platform is intended to test Dagger's performance on a wide range of benchmarks, and to determine where Dagger falls short on delivering near-optimal performance. + +This script is configured with environment variables. For example: + +BENCHMARK=nmf:raw,dagger+cuda julia benchmark.jl + +The above invocation will run the NMF suite without Dagger on the CPU, and with Dagger on CUDA GPUs. + +Environment Variables: +- BENCHMARK - Specifies the suites to benchmark, the execution method, and which acceleration mechanisms to use. The format is: "suite1:method1+accel1+accel2,method2+accel1,method3;suite2:method1,method2+accel1". Available execution and acceleration methods are described below. +- BENCHMARK_PROCS - Specifies the number of workers and threads to start. The format is: "numprocs:numthreads", which will start `numprocs` workers with `numthreads` threads each. Defaults to just the Julia process running this script, with however many threads it has available. +- BENCHMARK_REMOTES - Specifies the remote hosts to connect to, on which workers will be started. The format is the same as accepted by `Distributed.addprocs`. +- BENCHMARK_SCALE - The scaling factor to use for all suites, specified as a `UnitRange` or other Julia expression. Experimental, and subject to future removal. +- BENCHMARK_OUTPUT_FORMAT - Which output format to write results as. May be "none" for no output writing, "jls" for Julia's native `Serialization` format, or "jld", for the HDF5 format written by JLD.jl. +- BENCHMARK_VISUALIZE - Whether to run the `visualize.jl` script on the output results. May be any value that can parse as a `Bool`. +- BENCHMARK_RENDER - Which rendering mode to use. May be "live" to use the old (and soon to be removed) web renderer, "webdash" to use the DaggerWebDash renderer, or "offline" to use the old (and soon to be removed) offline renderer. The default of "" disables rendering. +- BENCHMARK_LIVE_PORT - Which port to use for web rendering. Defaults to port 8000. +- BENCHMARK_GRAPH - Whether to use dotviz graph rendering. Only useable if using "live" or "offline" rendering methods. Defaults to off, and may be any value that can parse as a `Bool`. +- BENCHMARK_PROFILE - Whether to enable real-time profiling. Defaults to off, and may be any value with parses as a `Bool`. Currently experimental and very, very slow. +- BENCHMARK_SAVE_LOGS - Whether to save logs collected at runtime to the output file. Defaults to off, and may be any value that can parse as a `Bool`. + +Execution Methods: +- "raw" - Non-Dagger execution +- "dagger" - Dagger execution + +Acceleration Methods: +- "cuda" - CUDA GPU acceleration +- "amdgpu" - AMD GPU acceleration +""" + +if !haskey(ENV, "BENCHMARK") + print(stderr, usage) + exit(1) +end + +const benches = Dict{String,Vector}() +const suites = Set{String}() +const accelerations = Set{String}() +for bench_spec in split(ENV["BENCHMARK"], ';') + suite, bench_spec_methods = split(bench_spec, ':') + if !isfile(joinpath(@__DIR__, "suites", suite*".jl")) + error("Unknown benchmark suite: $suite") + end + push!(suites, suite) + for method_spec in split(bench_spec_methods, ',') + method, accels... = split(method_spec, '+') + for accel in accels + push!(accelerations, accel) + end + accels = String.(accels) + _benches = get!(benches, suite, []) + push!(_benches, (;method, accels)) + end +end + using Distributed if haskey(ENV, "BENCHMARK_PROCS") const np, nt = parse.(Ref(Int), split(ENV["BENCHMARK_PROCS"], ":")) @@ -5,7 +63,7 @@ if haskey(ENV, "BENCHMARK_PROCS") addprocs(np; exeflags="-t $nt") end else - const np = 2 + const np = 1 const nt = 1 end if haskey(ENV, "BENCHMARK_REMOTES") @@ -20,7 +78,19 @@ end import Dagger: Computation, reduceblock using Dates, Random, Statistics, LinearAlgebra, InteractiveUtils -const output_format = get(ENV, "BENCHMARK_OUTPUT_FORMAT", "jls") +for accel in accelerations + if accel == "cuda" + @everywhere using DaggerGPU, CUDA + elseif accel == "amdgpu" + @everywhere using DaggerGPU, AMDGPU + else + error("Unknown acceleration: $accel") + end +end + +const scales = eval(Meta.parse(get(ENV, "BENCHMARK_SCALE", "1:5:50"))) + +const output_format = get(ENV, "BENCHMARK_OUTPUT_FORMAT", "none") if output_format == "jld" using JLD elseif output_format == "jls" @@ -52,220 +122,41 @@ if graph && render == "webdash" @warn "BENCHMARK_GRAPH=1 is not compatible with BENCHMARK_RENDER=webdash; disabling graphing" end const profile = parse(Bool, get(ENV, "BENCHMARK_PROFILE", "0")) - -_benches = get(ENV, "BENCHMARK", "cpu,cpu+dagger") -const benches = [] -for bench in split(_benches, ',') - if endswith(bench, "+dagger") - accel = split(bench, '+')[1] - dagger = true +const savelogs = if parse(Bool, get(ENV, "BENCHMARK_SAVE_LOGS", "0")) + if render == "live " || render == "offline" + @warn "BENCHMARK_SAVE_LOGS=1 is incompatible with BENCHMARK_RENDER=live; disabling log saving" + false else - accel = bench - dagger = false + using DataFrames + true end - push!(benches, (name=bench, accel=accel, dagger=dagger)) -end - -if any(x->x.accel == "cuda", benches) - @everywhere using DaggerGPU, CUDA -elseif any(x->x.accel == "amdgpu", benches) - @everywhere using DaggerGPU, AMDGPU +else + false end -const scales = eval(Meta.parse(get(ENV, "BENCHMARK_SCALE", "1:5:50"))) - using BenchmarkTools -AmulB_compatible(x,y) = - try; size(x), size(y), x * y; true; catch err false end - -AmulB_compatible(x::Computation,y::Computation) = AmulB_compatible(domain(x), domain(y)) - -function AcmulB_compatible(x,y) - AmulB_compatible(domain(x)', domain(y)) -end - -function array_suite_inner(ctx, X, Y, f=x->x) - suite = BenchmarkGroup(["array"]) - - suite["alloc"] = @benchmarkable (compute($f($X)); GC.gc()) - - Y = compute(ctx, f(X)) - suite["X.+1.0"] = @benchmarkable (compute($ctx, $f($Y.+1)); GC.gc()) - suite["X+X"] = @benchmarkable (compute($ctx, $f($Y+$Y)); GC.gc()) - suite["X'"] = @benchmarkable (compute($ctx, $f($Y')); GC.gc()) - suite["sin.(X)"] = @benchmarkable (compute($ctx, $f(sin.($Y))); GC.gc()) - - if ndims(Y) == 2 - suite["X+X'"] = @benchmarkable (compute($ctx, $f($Y+$Y')); GC.gc()) - suite["X*X"] = @benchmarkable (compute($ctx, $f($Y*$Y)); GC.gc()) - suite["X'*X"] = @benchmarkable (compute($ctx, $f($Y'*$Y)); GC.gc()) - end - - suite -end - -function nnmf(X, W, H) - # H update - H = (H .* (W' * (X ./ (W * H))) - ./ (sum(W; dims=1))') - # W update - W = (W .* ((X ./ (W * H)) * (H')) - ./ (sum(H; dims=2)')) - # error estimate - X - W * H -end - -theory_flops(nrow, ncol, nfeatures) = 11 * ncol * nrow * nfeatures + 2 * (ncol + nrow) * nfeatures - -function nmf_suite(ctx; dagger, accel) - suite = BenchmarkGroup() - - #= TODO: Re-enable - pairs = Set{Tuple}() - for i in (1, 2, 5, 10, 64, 100, 1028) - for bscale in (1, 0.5, 0.3, 0.25, 0.1) - b = ceil(Int, i*bscale) - if !((b,i) in pairs) - push!(pairs, (b,i)) - X = zeros(Blocks(b), i) - Y = compute(ctx, f(X)) - suite["vector $i/$b"] = array_suite_inner(ctx, X, Y, f) - end - if !(((b,b),(i,i)) in pairs) - push!(pairs, ((b,b),(i,i))) - X = zeros(Blocks(b,b), i, i) - Y = compute(ctx, f(X)) - suite["matrix ($i,$i)/$b"] = array_suite_inner(ctx, X, Y, f) - end - end - end - =# - - X = Ref{Any}() - W = Ref{Any}() - H = Ref{Any}() - - for scale in scales - ncol = 2001 * scale - nrow = 1002 - nfeatures = 12 - - if !dagger - suite["NNMF scaled by: $scale"] = @benchmarkable begin - nnmf($X[], $W[], $H[]) - end setup=begin - _scale = $scale - @info "Starting non-Dagger NNMF (scale by $_scale)" - if $accel == "cuda" - $X[] = CUDA.rand(Float32, $nrow, $ncol) - $W[] = CUDA.rand(Float32, $nrow, $nfeatures) - $H[] = CUDA.rand(Float32, $nfeatures, $ncol) - elseif $accel == "amdgpu" - $X[] = ROCArray(rand(Float32, $nrow, $ncol)) - $W[] = ROCArray(rand(Float32, $nrow, $nfeatures)) - $H[] = ROCArray(rand(Float32, $nfeatures, $ncol)) - elseif $accel == "cpu" - $X[] = rand(Float32, $nrow, $ncol) - $W[] = rand(Float32, $nrow, $nfeatures) - $H[] = rand(Float32, $nfeatures, $ncol) - end - end teardown=begin - $X[] = nothing - $W[] = nothing - $H[] = nothing - @everywhere GC.gc() - end - else - RENDERS[scale] = Dict{Int,Vector}() - nw = length(workers()) - nsuite = BenchmarkGroup() - while nw > 0 - opts = if accel == "cuda" - Dagger.Sch.SchedulerOptions(;proctypes=[ - DaggerGPU.CuArrayDeviceProc - ]) - elseif accel == "amdgpu" - Dagger.Sch.SchedulerOptions(;proctypes=[ - DaggerGPU.ROCArrayProc - ]) - elseif accel == "cpu" - Dagger.Sch.SchedulerOptions() - else - error("Unknown accelerator $accel") - end - p = sum([length(Dagger.get_processors(OSProc(id))) for id in 2:(nw+1)]) - #bsz = ncol ÷ length(workers()) - bsz = ncol ÷ 64 - nsuite["Workers: $nw"] = @benchmarkable begin - _ctx = Context($ctx, workers()[1:$nw]) - compute(_ctx, nnmf($X[], $W[], $H[]); options=$opts) - end setup=begin - _nw, _scale = $nw, $scale - @info "Starting $_nw worker Dagger NNMF (scale by $_scale)" - if $accel == "cuda" - # FIXME: Allocate with CUDA.rand if possible - $X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts)) - $W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts)) - $H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts)) - elseif $accel == "amdgpu" - $X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts)) - $W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts)) - $H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts)) - elseif $accel == "cpu" - $X[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts) - $W[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts) - $H[] = compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts) - end - end teardown=begin - if render != "" && !live - Dagger.continue_rendering[] = false - for i in 1:5 - isready(Dagger.render_results) && break - sleep(1) - end - if isready(Dagger.render_results) - video_paths = take!(Dagger.render_results) - try - video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths)) - push!(get!(()->[], RENDERS[$scale], $nw), video_data) - catch err - @error "Failed to process render results" exception=(err,catch_backtrace()) - end - else - @warn "Failed to fetch render results" - end - end - $X[] = nothing - $W[] = nothing - $H[] = nothing - @everywhere GC.gc() - end - nw ÷= 2 - end - suite["NNMF scaled by: $scale"] = nsuite - end - end - - suite +suite_setup = Dict{String,Function}() +for suite in suites + suite_setup[suite] = include(joinpath(@__DIR__, "suites", suite*".jl")) end function main() nw = length(workers()) output_prefix = "result-$(np)workers-$(nt)threads-$(Dates.now())" - suites = Dict() + suite_trees = Dict() opts = (;profile=profile) if render == "live" opts = merge(opts, (;log_sink=Dagger.LocalEventLog())) if graph opts = merge(opts, (;log_file=output_prefix*".dot")) end - elseif render == "webdash" + elseif render == "webdash" || savelogs ml = Dagger.MultiEventLog() ml[:core] = Dagger.Events.CoreMetrics() ml[:id] = Dagger.Events.IDMetrics() - ml[:timeline] = Dagger.Events.TimelineMetrics() + # FIXME: ml[:timeline] = Dagger.Events.TimelineMetrics() profile && (ml[:profile] = DaggerWebDash.ProfileMetrics()) ml[:wsat] = Dagger.Events.WorkerSaturation() ml[:loadavg] = Dagger.Events.CPULoadAverages() @@ -274,28 +165,33 @@ function main() ml[:esat] = Dagger.Events.EventSaturation() ml[:psat] = Dagger.Events.ProcessorSaturation() lw = Dagger.Events.LogWindow(5*10^9, :core) - df = DataFrame([key=>[] for key in keys(ml.consumers)]...) - ts = Dagger.Events.TableStorage(df) + logs_df = DataFrame([key=>[] for key in keys(ml.consumers)]...) + ts = Dagger.Events.TableStorage(logs_df) push!(lw.creation_handlers, ts) - d3r = DaggerWebDash.D3Renderer(live_port; seek_store=ts) - push!(lw.creation_handlers, d3r) - push!(lw.deletion_handlers, d3r) - push!(d3r, GanttPlot(:core, :id, :timeline, :esat, :psat, "Overview")) - # TODO: push!(d3r, ProfileViewer(:core, :profile, "Profile Viewer")) - push!(d3r, LinePlot(:core, :wsat, "Worker Saturation", "Running Tasks")) - push!(d3r, LinePlot(:core, :loadavg, "CPU Load Average", "Average Running Threads")) - push!(d3r, LinePlot(:core, :bytes, "Allocated Bytes", "Bytes")) - push!(d3r, LinePlot(:core, :mem, "Available Memory", "% Free")) - #push!(d3r, GraphPlot(:core, :id, :timeline, :profile, "DAG")) + if render == "webdash" + d3r = DaggerWebDash.D3Renderer(live_port; seek_store=ts) + push!(lw.creation_handlers, d3r) + push!(lw.deletion_handlers, d3r) + push!(d3r, GanttPlot(:core, :id, :timeline, :esat, :psat, "Overview")) + # TODO: push!(d3r, ProfileViewer(:core, :profile, "Profile Viewer")) + push!(d3r, LinePlot(:core, :wsat, "Worker Saturation", "Running Tasks")) + push!(d3r, LinePlot(:core, :loadavg, "CPU Load Average", "Average Running Threads")) + push!(d3r, LinePlot(:core, :bytes, "Allocated Bytes", "Bytes")) + push!(d3r, LinePlot(:core, :mem, "Available Memory", "% Free")) + #push!(d3r, GraphPlot(:core, :id, :timeline, :profile, "DAG")) + ml.aggregators[:d3r] = d3r + end ml.aggregators[:logwindow] = lw - ml.aggregators[:d3r] = d3r opts = merge(opts, (;log_sink=ml)) end - ctx = Context(collect((1:nw) .+ 1); opts...) - for bench in benches - name = bench.name - println("creating $name benchmarks") - suites[name] = nmf_suite(ctx; dagger=bench.dagger, accel=bench.accel) + ctx = Context(collect(1:nw); opts...) + Dagger.Sch.EAGER_CONTEXT[] = ctx + for suite in keys(benches) + for bench in benches[suite] + name = "suite $suite, exec $(bench.method), accels $(bench.accels)" + println("creating benchmarks for $name") + suite_trees[name] = suite_setup[suite](ctx; method=bench.method, accels=bench.accels) + end end if render == "live" || render == "offline" Dagger.show_gantt(ctx; width=1800, window_length=5, delay=2, port=live_port, live=live) @@ -303,7 +199,9 @@ function main() # Make sure server code is compiled sleep(1) run(pipeline(`curl -s localhost:$live_port/`; stdout=devnull)) - run(pipeline(`curl -s localhost:$live_port/profile`; stdout=devnull)) + if profile + run(pipeline(`curl -s localhost:$live_port/profile`; stdout=devnull)) + end @info "Rendering started on port $live_port" end elseif render == "webdash" @@ -312,44 +210,47 @@ function main() run(pipeline(`curl -s localhost:$live_port/index.html`; stdout=devnull)) @info "Rendering started on port $live_port" end - res = Dict() - for bench in benches - name = bench.name - println("running $name benchmarks") + res = Dict{String,Any}() + for name in keys(suite_trees) + println("running benchmarks for $name") res[name] = try - run(suites[name]; samples=3, seconds=10*60, gcsample=true) + run(suite_trees[name]; samples=3, seconds=10*60, gcsample=true) catch err - @error "Error running $name benchmarks" exception=(err,catch_backtrace()) + @error "Error running benchmarks for $name" exception=(err,catch_backtrace()) nothing end end - for bench in benches - println("benchmark results for $(bench.name): $(minimum(res[bench.name]))") - end - - println("saving results in $output_prefix.$output_format") - if output_format == "jld" - JLD.save(output, "results", res, "peakflops", peakflops(), "renders", RENDERS) - elseif output_format == "jls" - outdict = Dict("results"=>res, "peakflops"=>peakflops(), "renders"=>RENDERS) - open(output_prefix*".jls", "w") do io - serialize(io, outdict) + for name in sort(collect(keys(suite_trees))) + if res[name] !== nothing + println("benchmark results for $name: $(minimum(res[name]))") end end - if parse(Bool, get(ENV, "BENCHMARK_VISUALIZE", "0")) - run(`$(Base.julia_cmd()) $(joinpath(pwd(), "visualize.jl")) -- $(output_prefix*"."*output_format)`) - end + if output_format != "none" + println("saving results in $output_prefix.$output_format") + if output_format == "jld" + JLD.save(output, "results", res, "peakflops", peakflops(), "renders", RENDERS, "logs", savelogs ? logs_df : nothing) + elseif output_format == "jls" + outdict = Dict("results"=>res, "peakflops"=>peakflops(), "renders"=>RENDERS, "logs"=>savelogs ? logs_df : nothing) + open(output_prefix*".jls", "w") do io + serialize(io, outdict) + end + end - println("Done.") + if parse(Bool, get(ENV, "BENCHMARK_VISUALIZE", "0")) + run(`$(Base.julia_cmd()) $(joinpath(@__DIR__, "visualize.jl")) -- $(output_prefix*"."*output_format)`) + end - # TODO: Compare with multiple results - if length(ARGS) == 1 - compare_file = ARGS[1] - println("Comparing with results in $compare_file") - reference_results = JLD.load(compare_file, "result") - @show judge(mean(reference_results), mean(res)) + # TODO: Compare with multiple results + if length(ARGS) == 1 + compare_file = ARGS[1] + println("comparing with results in $compare_file") + reference_results = JLD.load(compare_file, "result") + @show judge(mean(reference_results), mean(res)) + end end + + println("Done!") end main() diff --git a/benchmarks/suites/array.jl b/benchmarks/suites/array.jl new file mode 100644 index 000000000..3877810fe --- /dev/null +++ b/benchmarks/suites/array.jl @@ -0,0 +1,49 @@ +AmulB_compatible(x,y) = + try; size(x), size(y), x * y; true; catch err false end + +AmulB_compatible(x::Computation,y::Computation) = AmulB_compatible(domain(x), domain(y)) + +function AcmulB_compatible(x,y) + AmulB_compatible(domain(x)', domain(y)) +end + +function array_suite_inner(ctx, X, Y, f=x->x) + suite = BenchmarkGroup(["array"]) + + suite["alloc"] = @benchmarkable (compute($f($X)); GC.gc()) + + Y = compute(ctx, f(X)) + suite["X.+1.0"] = @benchmarkable (compute($ctx, $f($Y.+1)); GC.gc()) + suite["X+X"] = @benchmarkable (compute($ctx, $f($Y+$Y)); GC.gc()) + suite["X'"] = @benchmarkable (compute($ctx, $f($Y')); GC.gc()) + suite["sin.(X)"] = @benchmarkable (compute($ctx, $f(sin.($Y))); GC.gc()) + + if ndims(Y) == 2 + suite["X+X'"] = @benchmarkable (compute($ctx, $f($Y+$Y')); GC.gc()) + suite["X*X"] = @benchmarkable (compute($ctx, $f($Y*$Y)); GC.gc()) + suite["X'*X"] = @benchmarkable (compute($ctx, $f($Y'*$Y)); GC.gc()) + end + + #= TODO: Re-enable + pairs = Set{Tuple}() + for i in (1, 2, 5, 10, 64, 100, 1028) + for bscale in (1, 0.5, 0.3, 0.25, 0.1) + b = ceil(Int, i*bscale) + if !((b,i) in pairs) + push!(pairs, (b,i)) + X = zeros(Blocks(b), i) + Y = compute(ctx, f(X)) + suite["vector $i/$b"] = array_suite_inner(ctx, X, Y, f) + end + if !(((b,b),(i,i)) in pairs) + push!(pairs, ((b,b),(i,i))) + X = zeros(Blocks(b,b), i, i) + Y = compute(ctx, f(X)) + suite["matrix ($i,$i)/$b"] = array_suite_inner(ctx, X, Y, f) + end + end + end + =# + + suite +end diff --git a/benchmarks/suites/dtable.jl b/benchmarks/suites/dtable.jl new file mode 100644 index 000000000..512e274fd --- /dev/null +++ b/benchmarks/suites/dtable.jl @@ -0,0 +1,132 @@ +@everywhere using CSV, Arrow, Random, OnlineStats, Dates, MemPool + +# n = tryparse(Int, ARGS[1]) +# max_chunksize = tryparse(Int, ARGS[2]) +# unique_values = tryparse(Int32, ARGS[3]) +# ncolumns = tryparse(Int, ARGS[4]) +# + +@everywhere function fetch_wait(dt::DTable) + println(MemPool.GLOBAL_DEVICE[]) + foreach(fetch, dt.chunks) +end + +function dtable_suite(ctx; method, accels) + @assert method == "dagger" "DTable suite does not support non-Dagger execution" + @assert isempty(accels) "DTable suite does not support acceleration" + + n = Int(2e8) + max_chunksize = Int(1e8) + unique_values = Int(1e3) + ncolumns = 4 + nchunks = (n+max_chunksize-1) ÷ max_chunksize + + function genchunk(rng, nchunks) + (;[Symbol("a$i") => rand(rng, Int32(1):Int32(unique_values), n÷nchunks) for i in 1:ncolumns]...) + end + + suite = BenchmarkGroup() + + #= FIXME: Way too slow + suite["DTable single CSV chunked reading"] = @benchmarkable begin + @info "Loading CSV.Chunks -> DTable" + c = CSV.Chunks( + joinpath(path, "datapart_1.csv"), + ntasks=(($n+$max_chunksize-1) ÷ $max_chunksize), + types = Int32 + ) + dt = Dagger.with_options(;storage=Dagger.tochunk(MemPool.CPURAMDevice())) do + DTable(c) + end + wait(dt) + end setup=begin + genchunk = $genchunk + @info "Writing single CSV" + path = mktempdir() + nchunks = 1 #overwrite nchunks to create one big file + [CSV.write(joinpath(path, "datapart_$i.csv"), genchunk(MersenneTwister(1111+i), nchunks)) for i in 1:nchunks] + end teardown=begin + rm(path; recursive=true) + @everywhere GC.gc() + end + =# + + function dtable_multi_memory(name, op) + suite["DTable in-memory ($name)"] = @benchmarkable begin + @info "$(time_ns()) DTable -> $($name)" + dt = $op(dt) + fetch_wait(dt) + end setup=begin + @info "$(time_ns()) Generating multiple chunks" + nchunks = $nchunks + dt = DTable([Dagger.spawn($genchunk, MersenneTwister(1111+i), nchunks) for i in 1:nchunks], NamedTuple) + fetch_wait(dt) + end teardown=begin + @info "$(time_ns()) Done" + @everywhere GC.gc() + end + end + function dtable_multi_csv(name, op) + suite["DTable multiple CSV reading ($name)"] = @benchmarkable begin + @info "$(time_ns()) Loading CSV -> DTable" + dt = DTable(x -> CSV.read(x, NamedTuple, types=Int32), readdir(path, join=true); device=MemPool.CPURAMDevice()) + @info "$(time_ns()) DTable -> $($name)" + dt = $op(dt) + fetch_wait(dt) + end setup=begin + genchunk = $genchunk + @info "$(time_ns()) Writing mutiple CSVs" + path = mktempdir() + nchunks = $nchunks + wait.([Threads.@spawn CSV.write(joinpath(path, "datapart_$i.csv"), genchunk(MersenneTwister(1111+i), nchunks)) for i in 1:nchunks]) + end teardown=begin + @info "$(time_ns()) Done" + rm(path; recursive=true) + @everywhere GC.gc() + end + end + function dtable_multi_arrow(name, op) + suite["DTable multiple Arrow reading ($name)"] = @benchmarkable begin + @info "$(time_ns()) Loading Arrow -> DTable" + dt = DTable(Arrow.Table, readdir(path, join=true); device=MemPool.CPURAMDevice()) + @info "$(time_ns()) DTable -> $($name)" + dt = $op(dt) + fetch_wait(dt) + end setup=begin + genchunk = $genchunk + @info "$(time_ns()) Writing multiple Arrow files" + path = mktempdir() + nchunks = $nchunks + wait.([Threads.@spawn Arrow.write(joinpath(path, "datapart_$i.arrow"), genchunk(MersenneTwister(1111+i), nchunks)) for i in 1:nchunks]) + end teardown=begin + @info "$(time_ns()) Done" + rm(path; recursive=true) + @everywhere GC.gc() + end + end + for (name, op) in (("none", identity), + ("map", dt->map(r->(;s=sum(r)), dt)), + # FIXME: ("reduce", dt->reduce(+, dt)), + ("filter", dt->filter(r->all(>(1), r), dt))) + dtable_multi_memory(name, op) + dtable_multi_arrow(name, op) + #dtable_multi_csv(name, op) + end + + #= TODO: Analyze serial tail + suite["DTable innerjoin"] = @benchmarkable begin + @info "Joining DTables" + dt = Dagger.innerjoin(d_left, d_right, on=:a1, r_unique=true) + wait(dt) + end setup=begin + @info "Generating DTable" + nchunks = $nchunks + d_left = DTable([Dagger.spawn($genchunk, MersenneTwister(1111+i), nchunks) for i in 1:nchunks], NamedTuple) + d_right = DTable((a1=Int32.(1:$unique_values), a5=.-Int32.(1:$unique_values)), Int($unique_values)) + end teardown=begin + @everywhere GC.gc() + end + =# + + suite +end diff --git a/benchmarks/suites/nmf.jl b/benchmarks/suites/nmf.jl new file mode 100644 index 000000000..b6d5afbb5 --- /dev/null +++ b/benchmarks/suites/nmf.jl @@ -0,0 +1,124 @@ +function nnmf(X, W, H) + # H update + H = (H .* (W' * (X ./ (W * H))) + ./ (sum(W; dims=1))') + # W update + W = (W .* ((X ./ (W * H)) * (H')) + ./ (sum(H; dims=2)')) + # error estimate + X - W * H +end + +theory_flops(nrow, ncol, nfeatures) = 11 * ncol * nrow * nfeatures + 2 * (ncol + nrow) * nfeatures + +function nmf_suite(ctx; method, accels) + suite = BenchmarkGroup() + + accel = !isempty(accels) ? only(accels) : "cpu" + + X = Ref{Any}() + W = Ref{Any}() + H = Ref{Any}() + + for scale in scales + ncol = 2001 * scale + nrow = 1002 + nfeatures = 12 + + if method == "raw" + suite["NNMF scaled by: $scale"] = @benchmarkable begin + nnmf($X[], $W[], $H[]) + end setup=begin + _scale = $scale + @info "Starting non-Dagger NNMF (scale by $_scale)" + if $accel == "cuda" + $X[] = CUDA.rand(Float32, $nrow, $ncol) + $W[] = CUDA.rand(Float32, $nrow, $nfeatures) + $H[] = CUDA.rand(Float32, $nfeatures, $ncol) + elseif $accel == "amdgpu" + $X[] = ROCArray(rand(Float32, $nrow, $ncol)) + $W[] = ROCArray(rand(Float32, $nrow, $nfeatures)) + $H[] = ROCArray(rand(Float32, $nfeatures, $ncol)) + elseif $accel == "cpu" + $X[] = rand(Float32, $nrow, $ncol) + $W[] = rand(Float32, $nrow, $nfeatures) + $H[] = rand(Float32, $nfeatures, $ncol) + end + end teardown=begin + $X[] = nothing + $W[] = nothing + $H[] = nothing + @everywhere GC.gc() + end + elseif method == "dagger" + RENDERS[scale] = Dict{Int,Vector}() + nw = length(workers()) + nsuite = BenchmarkGroup() + while nw > 0 + opts = if accel == "cuda" + Dagger.Sch.SchedulerOptions(;proctypes=[ + DaggerGPU.CuArrayDeviceProc + ]) + elseif accel == "amdgpu" + Dagger.Sch.SchedulerOptions(;proctypes=[ + DaggerGPU.ROCArrayProc + ]) + elseif accel == "cpu" + Dagger.Sch.SchedulerOptions() + else + error("Unknown accelerator $accel") + end + #bsz = ncol ÷ length(workers()) + bsz = ncol ÷ 64 + nsuite["Workers: $nw"] = @benchmarkable begin + _ctx = Context($ctx, workers()[1:$nw]) + compute(_ctx, nnmf($X[], $W[], $H[]); options=$opts) + end setup=begin + _nw, _scale = $nw, $scale + @info "Starting $_nw worker Dagger NNMF (scale by $_scale)" + if $accel == "cuda" + # FIXME: Allocate with CUDA.rand if possible + $X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts)) + $W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts)) + $H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts)) + elseif $accel == "amdgpu" + $X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts)) + $W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts)) + $H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts)) + elseif $accel == "cpu" + $X[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $ncol); options=$opts) + $W[] = compute(rand(Blocks($bsz, $bsz), Float32, $nrow, $nfeatures); options=$opts) + $H[] = compute(rand(Blocks($bsz, $bsz), Float32, $nfeatures, $ncol); options=$opts) + end + end teardown=begin + if render != "" && !live + Dagger.continue_rendering[] = false + for i in 1:5 + isready(Dagger.render_results) && break + sleep(1) + end + if isready(Dagger.render_results) + video_paths = take!(Dagger.render_results) + try + video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths)) + push!(get!(()->[], RENDERS[$scale], $nw), video_data) + catch err + @error "Failed to process render results" exception=(err,catch_backtrace()) + end + else + @warn "Failed to fetch render results" + end + end + $X[] = nothing + $W[] = nothing + $H[] = nothing + @everywhere GC.gc() + end + nw ÷= 2 + end + suite["NNMF scaled by: $scale"] = nsuite + end + end + + suite +end