Skip to content

Commit

Permalink
Merge 27e7c86 into 61ff302
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Dec 2, 2020
2 parents 61ff302 + 27e7c86 commit 12c8413
Show file tree
Hide file tree
Showing 11 changed files with 439 additions and 90 deletions.
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ makedocs(;
"Processors" => "processors.md",
"Scheduler Internals" => "scheduler-internals.md",
"Logging and Graphing" => "logging.md",
"Dynamic Scheduler Control" => "dynamic.md",
]
)

Expand Down
30 changes: 30 additions & 0 deletions docs/src/dynamic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Dynamic Scheduler Control

Normally, Dagger executes static graphs defined with `delayed` and `@par`.
However, it is possible for thunks to dynamically modify the graph at runtime,
and to generally exert direct control over the scheduler's internal state. The
`Dagger.sch_handle` function provides this functionality within a thunk:

```julia
function mythunk(x)
h = Dagger.sch_handle()
Dagger.halt!(h)
return x
end
```

The above example prematurely halts a running scheduler at the next
opportunity using `Dagger.halt!`:

[`Dagger.halt!`](@ref)

There are a variety of other built-in functions available for
various uses:

[`Dagger.get_dag_ids`](@ref)
[`Dagger.add_thunk!`](@ref)
[`Dagger.set_return!`](@ref)

Users with needs not covered by the built-in functions should use the `Dagger.exec!` function to pass a user-defined function, closure, or callable struct to the scheduler, along with a payload which will be provided to that function:

[`Dagger.exec!`](@ref)
2 changes: 1 addition & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ include("chunks.jl")

# Task scheduling
include("compute.jl")
include("scheduler.jl")
include("sch/Sch.jl"); using .Sch

# Array computations
include("array/darray.jl")
Expand Down
24 changes: 16 additions & 8 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,24 @@ iscompatible(proc::ThreadProc, opts, f, args...) = true
iscompatible_func(proc::ThreadProc, opts, f) = true
iscompatible_arg(proc::ThreadProc, opts, x) = true
@static if VERSION >= v"1.3.0-DEV.573"
execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn begin
task_local_storage(:processor, proc)
f(args...)
end)
function execute!(proc::ThreadProc, f, args...)
sch_handle = task_local_storage(:sch_handle)
fetch(Threads.@spawn begin
task_local_storage(:processor, proc)
task_local_storage(:sch_handle, sch_handle)
f(args...)
end)
end
else
# TODO: Use Threads.@threads?
execute!(proc::ThreadProc, f, args...) = fetch(@async begin
task_local_storage(:processor, proc)
f(args...)
end)
function execute!(proc::ThreadProc, f, args...)
sch_handle = task_local_storage(:sch_handle)
fetch(@async begin
task_local_storage(:processor, proc)
task_local_storage(:sch_handle, sch_handle)
f(args...)
end)
end
end
get_parent(proc::ThreadProc) = OSProc(proc.owner)
default_enabled(proc::ThreadProc) = true
Expand Down
Loading

0 comments on commit 12c8413

Please sign in to comment.