Skip to content

Commit

Permalink
Merge pull request #129 from JobJob/action_queues_redesign
Browse files Browse the repository at this point in the history
Node Creation Order Redesign
  • Loading branch information
shashi committed May 7, 2017
2 parents 9696f27 + e249069 commit 8e9d086
Show file tree
Hide file tree
Showing 17 changed files with 1,212 additions and 316 deletions.
12 changes: 12 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
v0.4.1
------
* Fix bugs in signal update ordering - see test/node_order.jl ("bfs bad", and "bfs bad, dfs bad") for examples fixed
* Fix for #123 changes the behaviour of `throttle`, for the old behaviour, use `debounce`
* Adds `bound_srcs(dest)`, and `bound_dests(src)` which return signals bound using `bind!(dest, src)`
* Performance improvements

v0.4.0
------
* API for `onerror` changed, see `?push!` for details`

v0.1.8
------
* Mix in Timing module into Reactive and remove it

87 changes: 87 additions & 0 deletions benchmark/ReactBench.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using Reactive, GLAbstraction, GeometryTypes
import Reactive: edges, nodes
# Base.step() = Reactive.run(1)
Reactive.stop()

function test1(; use_async = true)
# Reactive.run_async(use_async)
a = Signal(0.0)

b = map(/, a, Signal(23.0))
c = map(/, a, Signal(8.0))
f = foldp(+, 0.0, b)

d = map(Vec3f0, b)
e = map(Vec3f0, c)
g = map(Vec3f0, f)

m = map(translationmatrix, d)
m2 = map(translationmatrix, e)

m3 = map(*, m, m2)
# I don't know why, but Mat*Vec is broken right now
result = map(m3, g) do a, b
r = a * Vec4f0(b, 1)
Vec3f0(r[1], r[2], r[3])
end
total_time = 0.0

# warm the cache
push!(a, 0.1) # note causes the result to be slightly different, noticable for small N
Reactive.run_till_now()
println("post warmup, length(nodes): ", length(nodes))

for i=1:N
tic()
push!(a, i)
Reactive.run(1) # only needed for async
total_time += toq()
end

@show(total_time)
@show(total_time/N)
value(result), total_time
end

function bf(a,c)
a/c
end

function test2()
total_time = 0.0
a = 0.0
accum = 0.0
function ff(x)
accum += x
end
local result
for i=1:N
tic()

a = i
b = bf(a, 23.0)
c = bf(a, 8.0)
f = ff(b)
d = Vec3f0(b)
e = Vec3f0(c)
g = Vec3f0(f)

m = translationmatrix(d)
m2 = translationmatrix(e)

m3 = m*m2
r = m3 * Vec4f0(g, 1)
result = Vec3f0(r[1], r[2], r[3])
total_time += toq()
end

@show(total_time)
@show(total_time/N)
result, total_time
end

N = 10^6
react_res, react_time = test1(use_async=true)
regular_res, regular_time = test2()
@show react_res regular_res
react_time/regular_time
46 changes: 46 additions & 0 deletions benchmark/benchmarks.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using PkgBenchmark
using Reactive

Reactive.stop()

@benchgroup "Signal creation" begin
@bench "int" Signal(0)
@bench "string" Signal($("x"))
@bench "standalone update" (push!($(Signal(0)), 1); Reactive.run_till_now())
end

@benchgroup "map 1" begin
x = Signal(0)
y = map(-, x)
@bench "2 node" (push!($x, 1); Reactive.run_till_now())

z = map(+, x)

@bench "3 nodes" (push!($x, 1); Reactive.run_till_now())

a = map(+, x, y)
@bench "4 nodes" (push!($x, 1); Reactive.run_till_now())
end

@benchgroup "map 2" begin
a = Signal(0.0)

b = map(/, a, Signal(23.0))
c = map(/, a, Signal(8.0))
f = foldp(+, 0.0, b)

d = map(Vec3f0, b)
e = map(Vec3f0, c)
g = map(Vec3f0, f)

m = map(translationmatrix, d)
m2 = map(translationmatrix, e)

m3 = map(*, m, m2)
# I don't know why, but Mat*Vec is broken right now
result = map(m3, g) do a, b
r = a * Vec4f0(b, 1)
Vec3f0(r[1], r[2], r[3])
end
@bench "10 nodes" (push!($x, 1); Reactive.run_till_now())
end
41 changes: 41 additions & 0 deletions doc/Design Overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
### Node Creation Order Design

When a node is `push!`ed to in user code, this library must process it and ensure signal values stay consistent with the operations users used to define the chain of signals (e.g. map, foldp, etc.).

(N.b. "node" and "Signal" are used interchangeably in this doc and the code)

The design assumes:

1. The order which nodes are created is a correct [topological ordering](https://en.wikipedia.org/wiki/Topological_ordering) (with the edges of the signal graph regarded as directed from parents to children)
2. Signals will end up in a correct state if the order in which each node is processed and their update actions (e.g. the mapped function in the case of a map) run, is the same as the order in which nodes were created.
3. Signal actions should be run for a given `push!` only if the node itself was pushed to or if one of their parents had their actions run.

This should ensure that parents of nodes update before their children, and signal values will be in a correct state after each `push!` has been processed.

#### Basics

Each node (`Signal`) is added to the end of a Vector called `nodes` on creation, so that `nodes` holds Signals in the order they were created.

Each Signal holds a field `actions` which are just 0-argument functions that update the value of the node or perform some helper function to that end. In some cases the action will update, push to, or set a Timer to update a different node.

Each Signal also has a field `active` which flags whether or not the node was `push!`ed to, or had/should have its actions run, in processing the current push. In essence it flags whether or not the node's value has been updated, or should be updated.

Nodes that are pushed to will always be set to active, other (downstream) nodes will be set to active, and their actions run if any of their parent `Signal`s were active in processing the current `push!`.

On processing each `push!`, we run through `nodes` and execute the actions of each node if it has been set to active, i.e. if it was pushed to, or if any of its parents were active.

#### Pushing to Non Input Nodes

Sometimes it is desirable to push! a value to a non-input node, e.g. a `map` on an input `Signal(...)`, rather than it attaining that value by running its action. In order for this pushed value to "stick", it's important that the map's action does not run after pushing to the node - since the map's action would update the map node to the return value of the function used to create the map, which in general would not be equal to the pushed value.

This is achieved simply by the check in `run_node` requiring an active parent in order to run the node's actions.

A consequence of this is any actions attached to a node with no parents, e.g. an input `Signal(...)` node, will not run. Accordingly, all actions that rely on an update to a node, are attached to a child of the node, and not the node itself. See [dev notes](dev%20notes.md) for more details.

#### Filter

Filter works by setting the filter node's active field to false when the filter condition is false. Downstream/descendent nodes check if at least one of their parents has been active, if none of them have been active then the node will not run its action, thus propagating the filter correctly.

#### More info

There is some info on each operator in the [dev notes](dev%20notes.md). Please feel free to open issues if things are not clear.
53 changes: 53 additions & 0 deletions doc/dev notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## Developer Notes

### Operators

Action updates the node whose `actions` Vector it's in with `set_value!` (pre-updates, should not run when node is pushed to, i.e. no parents active):
1. map (multiple parents)
1. filter (calls deactivate! when f(value(input)) is false)
1. filterwhen (calls deactivate! when value(input) is false)
1. foldp (single parent)
1. sampleon (sample trigger input is its only parent)
1. merge (multiple parents)
1. previous (caches the previous update)
1. droprepeats (calls deactivate! when value(input) == prev_value)
1. flatten (wire_flatten and set_flatten_val both get run whenever either the `current_node` or the `input` sigsig update. Would it be better to just add those actions to the input and current_node update respectively?)

Action on an auxilliary node connected to the input that pushes to it, this allows the action to run, even if the node is a non-input, but gets pushed to
1. throttle/debounce (the action is on a foreach on the input node, which sets up a timer to push to the throttle output node)
1. delay (the action is on a foreach on the input node, which just pushes to the delay node)
1. bind! (action is a `map` on the src node, which calls set_value! on the dest node, and returns nothing). , e.g. from test/basics.jl "non-input bind":
```
s = Signal(1; name="sig 1")
m = map(x->2x, s; name="m")
s2 = Signal(3; name="sig 2")
push!(m, 10) # s,m,s2 should be 1, 10, 3
bind!(m, s2) # s,m,s2 should be 1, 3, 3
push!(m, 6) # s,m,s2 should be 1, 6, 6
push!(s2, 10) # s,m,s2 should be 1, 10, 10
```
1. fpswhen (the action to to set up the next tick/or stop the timer is on an auxilliary node with switch and the output node as the parents)

Other
1. every (doesn't actually have an action, just creates a timer to push to itself repeatedly)

### GC and Preserve

##### Docstring

`preserve(signal::Signal)`

prevents `signal` from being garbage collected (GC'd) as long as any of its `parents` are around. Useful for when you want to do some side effects in a signal.

e.g. `preserve(map(println, x))` - this will continue to print updates to x, until x goes out of scope. `foreach` is a shorthand for `map` with `preserve`.

##### Implementation

1. `preserve(x)` iterates through the parents of `x` and increases the count of `p.preservers[x]` by 1, and calls `preserve(p)` for each parent `p` of `x`.
1. Each signal has a field `preservers`, which is a `Dict{Signal, Int}`, which basically stores the number of times `preserve(x)` has been called on each of its child nodes `x`
1. Crucially, this Dict holds an active reference to `x` which stops it from getting GC'd
1. `unpreserve(x)` reduces the count of `preservers[x]` in all of x's parents, and if the count goes to 0, deletes the entry for (reference to) `x` in the `preservers` Dict thus freeing x for garbage collection.
1. Both `preserve` and `unpreserve` are also called recursively on all parents/ancestors of `x`, this means that all ancestors of x in the signal graph will be preserved, until their parents are GC'd or `unpreserve` is called the same number of times as `preserve` was called on them, or any of their descendants.
10 changes: 3 additions & 7 deletions src/async.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ export remote_map,
Spawn a new task to run a function when input signal updates. Returns a signal of tasks and a `results` signal which updates asynchronously with the results. `init` will be used as the default value of `results`. `onerror` is the callback to be called when an error occurs, by default it is set to a callback which prints the error to STDERR. It's the same as the `onerror` argument to `push!` but is run in the spawned task.
"""
function async_map(f, init, inputs...; typ=typeof(init), onerror=print_error)

node = Signal(typ, init, inputs)
map(inputs...; init=nothing, typ=Any) do args...
node = Signal(typ, init) #results node
async_args = join(map(n->n.name, inputs), ", ")
map(inputs...; init=nothing, typ=Any, name="async_map ($async_args)") do args...
outer_task = current_task()
@async begin
try
Expand All @@ -28,9 +28,6 @@ end
Spawn a new task on process `procid` to run a function when input signal updates. Returns a signal of remote refs and a `results` signal which updates asynchronously with the results. `init` will be used as the default value of `results`. `onerror` is the callback to be called when an error occurs, by default it is set to a callback which prints the error to STDERR. It's the same as the `onerror` argument to `push!` but is run in the spawned task.
"""
function remote_map(procid, f, init, inputs...; typ=typeof(init), onerror=print_error)

node = Signal(typ, init, inputs)

node = Signal(typ, init, inputs)
map(inputs...; init=nothing, typ=Any) do args...
outer_task = current_task()
Expand All @@ -47,4 +44,3 @@ function remote_map(procid, f, init, inputs...; typ=typeof(init), onerror=print_
end
end, node
end

0 comments on commit 8e9d086

Please sign in to comment.