Skip to content

Commit

Permalink
Merge e5fb456 into 397bd57
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Jul 18, 2020
2 parents 397bd57 + e5fb456 commit 3c74aca
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 3 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/Documentation.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Documentation

on:
push:
branches:
- master
tags: '*'
pull_request:

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@latest
with:
version: '1.4'
- name: Install dependencies
run: julia --project=docs/ -e 'using Pkg; Pkg.develop(PackageSpec(path=pwd())); Pkg.instantiate()'
- name: Build and deploy
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # For authentication with GitHub Actions token
DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }} # For authentication with SSH deploy key
run: julia --project=docs/ docs/make.jl
1 change: 1 addition & 0 deletions .github/workflows/TagBot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ jobs:
- uses: JuliaRegistries/TagBot@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
ssh: ${{ secrets.DOCUMENTER_KEY }}
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Dagger
# Dagger.jl

**A framework for out-of-core and parallel computing**.
*A framework for out-of-core and parallel computing*

[![Build Status](https://travis-ci.org/JuliaParallel/Dagger.jl.svg?branch=master)](https://travis-ci.org/JuliaParallel/Dagger.jl) [![Coverage Status](https://coveralls.io/repos/github/JuliaParallel/Dagger.jl/badge.svg?branch=master)](https://coveralls.io/github/JuliaParallel/Dagger.jl?branch=master)
| **Documentation** | **Build Status** |
|:---------------------------------------:|:-------------------------------------------------------------:|
| [![][docs-master-img]][docs-master-url] | [![Build Status](https://travis-ci.org/JuliaParallel/Dagger.jl.svg?branch=master)](https://travis-ci.org/JuliaParallel/Dagger.jl) [![Coverage Status](https://coveralls.io/repos/github/JuliaParallel/Dagger.jl/badge.svg?branch=master)](https://coveralls.io/github/JuliaParallel/Dagger.jl?branch=master) |

[docs-master-img]: https://img.shields.io/badge/docs-master-blue.svg
[docs-master-url]: https://juliagpu.github.io/Dagger.jl/

At the core of Dagger.jl is a scheduler heavily inspired by [Dask](https://docs.dask.org/en/latest/). It can run computations represented as [directed-acyclic-graphs](https://en.wikipedia.org/wiki/Directed_acyclic_graph) (DAGs) efficiently on many Julia worker processes.

Expand Down
2 changes: 2 additions & 0 deletions docs/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[deps]
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
10 changes: 10 additions & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Documenter, Dagger

makedocs(
sitename = "Dagger",
pages = [
"Home" => "index.md",
"Processors" => "processors.md",
"Scheduler Internals" => "scheduler-internals.md",
]
)
110 changes: 110 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# A framework for out-of-core and parallel execution

## Modeling of Dagger programs

The key API for parallel and heterogeneous execution is `Dagger.delayed`.
The call signature of `Dagger.delayed` is the following:

```julia
thunk = Dagger.delayed(func)(args...)
```

This invocation serves to construct a single node in a computational graph.
`func` is a Julia function, which normally takes some number of arguments, of
length `N` and of types `Targs`. The set of arguments `args...` is specified
with ellipses to indicate that many arguments may be passed between the
parentheses. When correctly invoked, `args...` is of length `N` and of types
`Targs` (or suitable subtypes of `Targs`, for each respective argument in
`args...`). `thunk` is an instance of a Dagger `Thunk`, which is the value
used internally by Dagger to represent a node in the graph.

A `Thunk` may be "computed":

```julia
chunk = Dagger.compute(thunk)
```

Computing a `Thunk` performs roughly the same logic as the following Julia
function invocation:

```julia
result = func(args...)
```

Such an invocation invokes `func` on `args...`, returning `result`. Computing
the above thunk would produce a value with the same type as `result`, with the
caveat that the result will be wrapped by a `Dagger.Chunk` (`chunk` in the
above example). A `Chunk` is a reference to a value stored on a compute
process within the `Distributed` cluster that Dagger is operating within. A
`Chunk` may be "collected", which will return the wrapped value to the
collecting process, which in the above example will be `result`:

```julia
result = collect(chunk)
```

In order to create a graph with more than a single node, arguments to
`delayed` may themselves be `Thunk`s or `Chunk`s. For example, the sum of the
elements of vector `[1,2,3,4]` may be represented in Dagger as follows:

```julia
thunk1 = Dagger.delayed(+)(1, 2)
thunk2 = Dagger.delayed(+)(3, 4)
thunk3 = Dagger.delayed(+)(thunk1, thunk2)
```

A graph has now been constructed, where `thunk1` and `thunk2` are dependencies
("inputs") to `thunk3`. Computing `thunk3` and then collecting its resulting
`Chunk` would result in the answer that is expected from the operation:

```julia
chunk = compute(thunk3)
result = collect(chunk)
```

```julia-repl
julia> result == 10
true
```

`result` now has the `Int64` value `10`, which is the result of summing the
elements of the vector `[1,2,3,4]`. For convenience, computation may be
performed together with collection, like so:

```julia
result = collect(thunk3)
```

The above summation example is equivalent to the following invocation in plain
Julia:

```julia
x1 = 1 + 2
x2 = 3 + 4
result = x1 + x2
result == 10
```

However, there are key differences when using Dagger to perform this operation
as compared to performing this operation without Dagger. In Dagger, the graph
is constructed separately from computing the graph ("lazily"), whereas without
Dagger the graph is executed immediately ("eagerly"). Dagger makes use of this
lazy construction approach to allow modifying the actual execution of the
overall operation in useful ways.

By default, computing a Dagger graph creates an instance of a scheduler, which
will be provided the graph to execute. The scheduler executes the individual
nodes of the graph on their arguments in the order specified by the graph
(ensuring dependencies to a node are satisfied before executing said node) on
compute processes in the cluster; the scheduler process itself typically does
not execute the nodes directly. Additionally, if a given set of nodes do not
depend on each other (the value generated by a node is not an input to another
node in the set), then those nodes may be executed in parallel, and the
scheduler attempts to schedule such nodes in parallel when possible.

The scheduler also orchestrates data movement between compute processes, such
that inputs to a given node are available on the compute process that is
scheduled to execute said node. The scheduler attempts to minimize data
movement between compute processes; it does so by trying to schedule nodes
which depend on a given input on the same compute process that computed and
retains that input.
72 changes: 72 additions & 0 deletions docs/src/processors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Processors

Dagger contains a flexible mechanism to represent CPUs, GPUs, and other
devices that the scheduler can place user work on. The indiviual devices that
are capable of computing a user operation are called "processors", and are
subtypes of `Dagger.Processor`. Processors are automatically detected by
Dagger at scheduler initialization, and placed in a hierarchy reflecting the
physical (network-, link-, or memory-based) boundaries between processors in
the hierarchy. The scheduler uses the information in this hierarchy to
efficiently schedule and partition user operations.

## Hardware capabilities, topology, and data locality

The processor hierarchy is modeled as a multi-root tree, where each root is an
`OSProc`, which represents a Julia OS process, and the "children" of the root
or some other branch in the tree represent the processors which reside on the
same logical server as the "parent" branch. All roots are connected to each
other directly, in the common case. The processor hierarchy's topology is
automatically detected and elaborated by callbacks in Dagger, which users may
manipulate to add detection of extra processors.

Movement of data between any two processors is decomposable into a sequence of
"moves" between a child and its parent, termed a "generic path move". Movement
of data may also take "shortcuts" between nodes in the tree which are not
directly connected if enabled by libraries or the user, which may make use of
IPC mechanisms to transfer data more directly and efficiently (such as
Infiniband, GPU RDMA, NVLINK, etc.). All data is considered local to some
processor, and may only be operated on by another processor by first doing an
explicit move operation to that processor.

A move between a given pair of processors is implemented as a Julia function
dispatching on the types of each processor, as well as the type of the data
being moved. Users are permitted to define custom move functions to improve
data movement efficiency, perform automatic value conversions, or even make
use of special IPC facilities. Custom processors may also be defined by the
user to represent a processor type which is not automatically detected by
Dagger, such as novel GPUs, special OS process abstractions, FPGAs, etc.

### Future: Network Devices and Topology

In the future, users will be able to define network devices attached to a
given processor, which provides a direct connection to a network device on
another processor, and may be used to transfer data between said processors.
Data movement rules will most likely be defined by a similar (or even
identical) mechanism to the current processor move mechanism. The multi-root
tree will be expanded to a graph to allow representing these network devices
(as they may potentially span non-root nodes).

## Redundancy

### Fault Tolerance

Dagger has a single means for ensuring redundancy, which is currently called
"fault tolerance". Said redundancy is only targeted at a specific failure
mode, namely the unexpected exit or "killing" of a worker process in the
cluster. This failure mode often presents itself when running on a Linux and
generating large memory allocations, where the Out Of Memory (OOM) killer
process can kill user processes to free their allocated memory for the Linux
kernel to use. The fault tolerance system mitigates the damage caused by the
OOM killer performing its duties on one or more worker processes by detecting
the fault as a process exit exception (generated by Julia), and then moving
any "lost" work to other worker processes for re-computation.

#### Future: Multi-master, Network Failure Correction, etc.

This single redundancy mechanism helps alleviate a common issue among HPC and
scientific users, however it does little to help when, for example, the master
node exits, or a network link goes down. Such failure modes require a more
complicated detection and recovery process, including multiple master
processes, a distributed and replicated database such as etcd, and
checkpointing of the scheduler to ensure an efficient recovery. Such a system
does not yet exist, but contributions for such a change are desired.
79 changes: 79 additions & 0 deletions docs/src/scheduler-internals.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Scheduler Internals

The scheduler is called `Dagger.Sch`. It contains a single internal instance
of type `ComputeState`, which maintains all necessary state to represent the
set of waiting, ready, and completed (or "finished") graph nodes, cached
`Chunk`s, and maps of interdependencies between nodes. It uses Julia's task
infrastructure to asynchronously send work requests to remote compute
processes, and uses a Julia `Channel` as an inbound queue for completed work.
There is an outer loop which drives the scheduler, which continues executing
until all nodes in the graph have completed executing and the final result of
the graph is ready to be returned to the user. This outer loop continuously
performs two main operations: the first is to launch the execution of nodes
which have become "ready" to execute; the second is to "finish" nodes which
have been completed.

## Scheduler Initialization

At the very beginning of a scheduler's lifecycle, the `ComputeState` is
elaborated based on the computed sets of dependencies between nodes, and all
nodes are placed in a "waiting" state. If any of the nodes are found to only
have inputs which are not `Thunk`s, then they are moved from "waiting" to
"ready". The set of available "workers" (the set of available compute
processes located throughout the cluster) is recorded, of size `Nworkers`.

## Scheduler Outer Loop

At each outer loop iteration, up to `Nworkers` processes that are currently in
the "ready" state will be moved into the "running" state, and asynchronously
sent (along with input arguments) to one of the `Nworkers` processes for
execution. Subsequently, if any nodes exist in the inbound queue (i.e. the
nodes have completed execution and their result is stored on the process that
executed the node), then the most recently-queued node is removed from the
queue, "finished", and placed in the "finished" state.

## Node Execution

Executing a node (here called `Ne`) in the "ready" state comprises two tasks.
The first task is to identify which node in the set of "ready" nodes will be
`Ne` (the node to execute). This choice is based on a concept known as
"affinity", which is a cost-based metric used to evaluate the suitability of
executing a given node on a given process. The metric is based primarily on
the location of the input arguments to the node, as well as the arguments
computed size in bytes. A fixed amount of affinity is added for each argument
when the process in question houses that argument. Affinity is then added
based on some base affinity value multiplied by the argument's size in bytes.
The total affinities for each node are then used to pick the most optimal node
to execute (typically, the one with the highest affinity).

The second task is to prepare and send the node to a process for execution. If
the node has been executed in the past (due to it being an argument to
multiple other nodes), then the node is finished, and its result is pulled
from the cache. If the node has not yet been executed, it is first checked if
it is a "meta" node. A "meta" node is explicitly designated as such by the
user or library, and will execute directly on its inputs as chunks (the data
contained in the chunks are not immediately retrieved from the processors they
reside on). Such a node will be executed directly within the scheduler, under
the assumption that such a node is not expensive to execute. If the node is
not a "meta" node, the executing worker process chooses (in round-robin
fashion) a suitable processor to execute to execute the node on, based on the
node's function, the input argument types, and user-defined rules for
processor selection. The input arguments are then asynchronously transferred
(via processor move operation) to the selected processor, and the appropriate
call to the processor is made with the function and input arguments. Once
execution completes and a result is obtained, it is wrapped as a `Chunk`, and
the `Chunk`'s handle is returned to the scheduler's inbound queue for node
finishing.

## Node Finishing

"Finishing" a node (here called `Nf`) performs three main tasks. The first
task is to find all of the downstream "children" nodes of `Nf` (the set of
nodes which use `Nf`'s result as one of their input arguments) that have had
all of their input arguments computed and are in the "waiting" state, and move
them into the "ready" state. The second task is to check all of the inputs to
`Nf` to determine if any of them no longer have children nodes which have not
been finished; if such inputs match this pattern, their cached result may be
freed by the scheduler to minimize data usage. The third task is to mark `Nf`
as "finished", and also to indicate to the scheduler whether another node has
become "ready" to execute.

0 comments on commit 3c74aca

Please sign in to comment.