-
-
Notifications
You must be signed in to change notification settings - Fork 67
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
298 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
name: Documentation | ||
|
||
on: | ||
push: | ||
branches: | ||
- master | ||
tags: '*' | ||
pull_request: | ||
|
||
jobs: | ||
build: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: julia-actions/setup-julia@latest | ||
with: | ||
version: '1.4' | ||
- name: Install dependencies | ||
run: julia --project=docs/ -e 'using Pkg; Pkg.develop(PackageSpec(path=pwd())); Pkg.instantiate()' | ||
- name: Build and deploy | ||
env: | ||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # For authentication with GitHub Actions token | ||
DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }} # For authentication with SSH deploy key | ||
run: julia --project=docs/ docs/make.jl |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[deps] | ||
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
using Documenter, Dagger | ||
|
||
makedocs( | ||
sitename = "Dagger", | ||
pages = [ | ||
"Home" => "index.md", | ||
"Processors" => "processors.md", | ||
"Scheduler Internals" => "scheduler-internals.md", | ||
] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# A framework for out-of-core and parallel execution | ||
|
||
## Modeling of Dagger programs | ||
|
||
The key API for parallel and heterogeneous execution is `Dagger.delayed`. | ||
The call signature of `Dagger.delayed` is the following: | ||
|
||
```julia | ||
thunk = Dagger.delayed(func)(args...) | ||
``` | ||
|
||
This invocation serves to construct a single node in a computational graph. | ||
`func` is a Julia function, which normally takes some number of arguments, of | ||
length `N` and of types `Targs`. The set of arguments `args...` is specified | ||
with ellipses to indicate that many arguments may be passed between the | ||
parentheses. When correctly invoked, `args...` is of length `N` and of types | ||
`Targs` (or suitable subtypes of `Targs`, for each respective argument in | ||
`args...`). `thunk` is an instance of a Dagger `Thunk`, which is the value | ||
used internally by Dagger to represent a node in the graph. | ||
|
||
A `Thunk` may be "computed": | ||
|
||
```julia | ||
chunk = Dagger.compute(thunk) | ||
``` | ||
|
||
Computing a `Thunk` performs roughly the same logic as the following Julia | ||
function invocation: | ||
|
||
```julia | ||
result = func(args...) | ||
``` | ||
|
||
Such an invocation invokes `func` on `args...`, returning `result`. Computing | ||
the above thunk would produce a value with the same type as `result`, with the | ||
caveat that the result will be wrapped by a `Dagger.Chunk` (`chunk` in the | ||
above example). A `Chunk` is a reference to a value stored on a compute | ||
process within the `Distributed` cluster that Dagger is operating within. A | ||
`Chunk` may be "collected", which will return the wrapped value to the | ||
collecting process, which in the above example will be `result`: | ||
|
||
```julia | ||
result = collect(chunk) | ||
``` | ||
|
||
In order to create a graph with more than a single node, arguments to | ||
`delayed` may themselves be `Thunk`s or `Chunk`s. For example, the sum of the | ||
elements of vector `[1,2,3,4]` may be represented in Dagger as follows: | ||
|
||
```julia | ||
thunk1 = Dagger.delayed(+)(1, 2) | ||
thunk2 = Dagger.delayed(+)(3, 4) | ||
thunk3 = Dagger.delayed(+)(thunk1, thunk2) | ||
``` | ||
|
||
A graph has now been constructed, where `thunk1` and `thunk2` are dependencies | ||
("inputs") to `thunk3`. Computing `thunk3` and then collecting its resulting | ||
`Chunk` would result in the answer that is expected from the operation: | ||
|
||
```julia | ||
chunk = compute(thunk3) | ||
result = collect(chunk) | ||
``` | ||
|
||
```julia-repl | ||
julia> result == 10 | ||
true | ||
``` | ||
|
||
`result` now has the `Int64` value `10`, which is the result of summing the | ||
elements of the vector `[1,2,3,4]`. For convenience, computation may be | ||
performed together with collection, like so: | ||
|
||
```julia | ||
result = collect(thunk3) | ||
``` | ||
|
||
The above summation example is equivalent to the following invocation in plain | ||
Julia: | ||
|
||
```julia | ||
x1 = 1 + 2 | ||
x2 = 3 + 4 | ||
result = x1 + x2 | ||
result == 10 | ||
``` | ||
|
||
However, there are key differences when using Dagger to perform this operation | ||
as compared to performing this operation without Dagger. In Dagger, the graph | ||
is constructed separately from computing the graph ("lazily"), whereas without | ||
Dagger the graph is executed immediately ("eagerly"). Dagger makes use of this | ||
lazy construction approach to allow modifying the actual execution of the | ||
overall operation in useful ways. | ||
|
||
By default, computing a Dagger graph creates an instance of a scheduler, which | ||
will be provided the graph to execute. The scheduler executes the individual | ||
nodes of the graph on their arguments in the order specified by the graph | ||
(ensuring dependencies to a node are satisfied before executing said node) on | ||
compute processes in the cluster; the scheduler process itself typically does | ||
not execute the nodes directly. Additionally, if a given set of nodes do not | ||
depend on each other (the value generated by a node is not an input to another | ||
node in the set), then those nodes may be executed in parallel, and the | ||
scheduler attempts to schedule such nodes in parallel when possible. | ||
|
||
The scheduler also orchestrates data movement between compute processes, such | ||
that inputs to a given node are available on the compute process that is | ||
scheduled to execute said node. The scheduler attempts to minimize data | ||
movement between compute processes; it does so by trying to schedule nodes | ||
which depend on a given input on the same compute process that computed and | ||
retains that input. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
# Processors | ||
|
||
Dagger contains a flexible mechanism to represent CPUs, GPUs, and other | ||
devices that the scheduler can place user work on. The indiviual devices that | ||
are capable of computing a user operation are called "processors", and are | ||
subtypes of `Dagger.Processor`. Processors are automatically detected by | ||
Dagger at scheduler initialization, and placed in a hierarchy reflecting the | ||
physical (network-, link-, or memory-based) boundaries between processors in | ||
the hierarchy. The scheduler uses the information in this hierarchy to | ||
efficiently schedule and partition user operations. | ||
|
||
## Hardware capabilities, topology, and data locality | ||
|
||
The processor hierarchy is modeled as a multi-root tree, where each root is an | ||
`OSProc`, which represents a Julia OS process, and the "children" of the root | ||
or some other branch in the tree represent the processors which reside on the | ||
same logical server as the "parent" branch. All roots are connected to each | ||
other directly, in the common case. The processor hierarchy's topology is | ||
automatically detected and elaborated by callbacks in Dagger, which users may | ||
manipulate to add detection of extra processors. | ||
|
||
Movement of data between any two processors is decomposable into a sequence of | ||
"moves" between a child and its parent, termed a "generic path move". Movement | ||
of data may also take "shortcuts" between nodes in the tree which are not | ||
directly connected if enabled by libraries or the user, which may make use of | ||
IPC mechanisms to transfer data more directly and efficiently (such as | ||
Infiniband, GPU RDMA, NVLINK, etc.). All data is considered local to some | ||
processor, and may only be operated on by another processor by first doing an | ||
explicit move operation to that processor. | ||
|
||
A move between a given pair of processors is implemented as a Julia function | ||
dispatching on the types of each processor, as well as the type of the data | ||
being moved. Users are permitted to define custom move functions to improve | ||
data movement efficiency, perform automatic value conversions, or even make | ||
use of special IPC facilities. Custom processors may also be defined by the | ||
user to represent a processor type which is not automatically detected by | ||
Dagger, such as novel GPUs, special OS process abstractions, FPGAs, etc. | ||
|
||
### Future: Network Devices and Topology | ||
|
||
In the future, users will be able to define network devices attached to a | ||
given processor, which provides a direct connection to a network device on | ||
another processor, and may be used to transfer data between said processors. | ||
Data movement rules will most likely be defined by a similar (or even | ||
identical) mechanism to the current processor move mechanism. The multi-root | ||
tree will be expanded to a graph to allow representing these network devices | ||
(as they may potentially span non-root nodes). | ||
|
||
## Redundancy | ||
|
||
### Fault Tolerance | ||
|
||
Dagger has a single means for ensuring redundancy, which is currently called | ||
"fault tolerance". Said redundancy is only targeted at a specific failure | ||
mode, namely the unexpected exit or "killing" of a worker process in the | ||
cluster. This failure mode often presents itself when running on a Linux and | ||
generating large memory allocations, where the Out Of Memory (OOM) killer | ||
process can kill user processes to free their allocated memory for the Linux | ||
kernel to use. The fault tolerance system mitigates the damage caused by the | ||
OOM killer performing its duties on one or more worker processes by detecting | ||
the fault as a process exit exception (generated by Julia), and then moving | ||
any "lost" work to other worker processes for re-computation. | ||
|
||
#### Future: Multi-master, Network Failure Correction, etc. | ||
|
||
This single redundancy mechanism helps alleviate a common issue among HPC and | ||
scientific users, however it does little to help when, for example, the master | ||
node exits, or a network link goes down. Such failure modes require a more | ||
complicated detection and recovery process, including multiple master | ||
processes, a distributed and replicated database such as etcd, and | ||
checkpointing of the scheduler to ensure an efficient recovery. Such a system | ||
does not yet exist, but contributions for such a change are desired. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# Scheduler Internals | ||
|
||
The scheduler is called `Dagger.Sch`. It contains a single internal instance | ||
of type `ComputeState`, which maintains all necessary state to represent the | ||
set of waiting, ready, and completed (or "finished") graph nodes, cached | ||
`Chunk`s, and maps of interdependencies between nodes. It uses Julia's task | ||
infrastructure to asynchronously send work requests to remote compute | ||
processes, and uses a Julia `Channel` as an inbound queue for completed work. | ||
There is an outer loop which drives the scheduler, which continues executing | ||
until all nodes in the graph have completed executing and the final result of | ||
the graph is ready to be returned to the user. This outer loop continuously | ||
performs two main operations: the first is to launch the execution of nodes | ||
which have become "ready" to execute; the second is to "finish" nodes which | ||
have been completed. | ||
|
||
## Scheduler Initialization | ||
|
||
At the very beginning of a scheduler's lifecycle, the `ComputeState` is | ||
elaborated based on the computed sets of dependencies between nodes, and all | ||
nodes are placed in a "waiting" state. If any of the nodes are found to only | ||
have inputs which are not `Thunk`s, then they are moved from "waiting" to | ||
"ready". The set of available "workers" (the set of available compute | ||
processes located throughout the cluster) is recorded, of size `Nworkers`. | ||
|
||
## Scheduler Outer Loop | ||
|
||
At each outer loop iteration, up to `Nworkers` processes that are currently in | ||
the "ready" state will be moved into the "running" state, and asynchronously | ||
sent (along with input arguments) to one of the `Nworkers` processes for | ||
execution. Subsequently, if any nodes exist in the inbound queue (i.e. the | ||
nodes have completed execution and their result is stored on the process that | ||
executed the node), then the most recently-queued node is removed from the | ||
queue, "finished", and placed in the "finished" state. | ||
|
||
## Node Execution | ||
|
||
Executing a node (here called `Ne`) in the "ready" state comprises two tasks. | ||
The first task is to identify which node in the set of "ready" nodes will be | ||
`Ne` (the node to execute). This choice is based on a concept known as | ||
"affinity", which is a cost-based metric used to evaluate the suitability of | ||
executing a given node on a given process. The metric is based primarily on | ||
the location of the input arguments to the node, as well as the arguments | ||
computed size in bytes. A fixed amount of affinity is added for each argument | ||
when the process in question houses that argument. Affinity is then added | ||
based on some base affinity value multiplied by the argument's size in bytes. | ||
The total affinities for each node are then used to pick the most optimal node | ||
to execute (typically, the one with the highest affinity). | ||
|
||
The second task is to prepare and send the node to a process for execution. If | ||
the node has been executed in the past (due to it being an argument to | ||
multiple other nodes), then the node is finished, and its result is pulled | ||
from the cache. If the node has not yet been executed, it is first checked if | ||
it is a "meta" node. A "meta" node is explicitly designated as such by the | ||
user or library, and will execute directly on its inputs as chunks (the data | ||
contained in the chunks are not immediately retrieved from the processors they | ||
reside on). Such a node will be executed directly within the scheduler, under | ||
the assumption that such a node is not expensive to execute. If the node is | ||
not a "meta" node, the executing worker process chooses (in round-robin | ||
fashion) a suitable processor to execute to execute the node on, based on the | ||
node's function, the input argument types, and user-defined rules for | ||
processor selection. The input arguments are then asynchronously transferred | ||
(via processor move operation) to the selected processor, and the appropriate | ||
call to the processor is made with the function and input arguments. Once | ||
execution completes and a result is obtained, it is wrapped as a `Chunk`, and | ||
the `Chunk`'s handle is returned to the scheduler's inbound queue for node | ||
finishing. | ||
|
||
## Node Finishing | ||
|
||
"Finishing" a node (here called `Nf`) performs three main tasks. The first | ||
task is to find all of the downstream "children" nodes of `Nf` (the set of | ||
nodes which use `Nf`'s result as one of their input arguments) that have had | ||
all of their input arguments computed and are in the "waiting" state, and move | ||
them into the "ready" state. The second task is to check all of the inputs to | ||
`Nf` to determine if any of them no longer have children nodes which have not | ||
been finished; if such inputs match this pattern, their cached result may be | ||
freed by the scheduler to minimize data usage. The third task is to mark `Nf` | ||
as "finished", and also to indicate to the scheduler whether another node has | ||
become "ready" to execute. |