Skip to content

Commit c17b86d

Browse files
committed
Sch: Minimize calls to reschedule_inputs
1 parent 2f47217 commit c17b86d

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

src/sch/Sch.jl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Fields:
4747
- `waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}` - Map from input `Chunk`/upstream `Thunk` to all unfinished downstream `Thunk`s, to retain caches
4848
- `ready::Vector{Thunk}` - The list of `Thunk`s that are ready to execute
4949
- `cache::WeakKeyDict{Thunk, Any}` - Maps from a finished `Thunk` to it's cached result, often a DRef
50+
- `valid::WeakKeyDict{Thunk, Nothing}` - Tracks all `Thunk`s that are in a valid scheduling state
5051
- `running::Set{Thunk}` - The set of currently-running `Thunk`s
5152
- `running_on::Dict{Thunk,OSProc}` - Map from `Thunk` to the OS process executing it
5253
- `thunk_dict::Dict{Int, WeakThunk}` - Maps from thunk IDs to a `Thunk`
@@ -70,6 +71,7 @@ struct ComputeState
7071
waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}
7172
ready::Vector{Thunk}
7273
cache::WeakKeyDict{Thunk, Any}
74+
valid::WeakKeyDict{Thunk, Nothing}
7375
running::Set{Thunk}
7476
running_on::Dict{Thunk,OSProc}
7577
thunk_dict::Dict{Int, WeakThunk}
@@ -93,7 +95,8 @@ function start_state(deps::Dict, node_order, chan)
9395
OneToMany(),
9496
deps,
9597
Vector{Thunk}(undef, 0),
96-
Dict{Thunk, Any}(),
98+
WeakKeyDict{Thunk, Any}(),
99+
WeakKeyDict{Thunk, Nothing}(),
97100
Set{Thunk}(),
98101
Dict{Thunk,OSProc}(),
99102
Dict{Int, WeakThunk}(),
@@ -119,6 +122,7 @@ function start_state(deps::Dict, node_order, chan)
119122
else
120123
state.waiting[k] = waiting
121124
end
125+
state.valid[k] = nothing
122126
end
123127
end
124128
state

src/sch/dynamic.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future, ref))
208208
# Preserve the `EagerThunkFinalizer` through `thunk`
209209
thunk.eager_ref = ref
210210
end
211+
state.valid[thunk] = nothing
211212
put!(state.chan, RescheduleSignal())
212213
timespan_finish(ctx, :add_thunk, tid, 0)
213214
return thunk_id

src/sch/util.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ function reschedule_inputs!(state, thunk, seen=Set{Thunk}())
101101
while !isempty(to_visit)
102102
thunk = pop!(to_visit)
103103
push!(seen, thunk)
104+
if haskey(state.valid, thunk)
105+
continue
106+
end
104107
if haskey(state.cache, thunk) || (thunk in state.ready) || (thunk in state.running)
105108
continue
106109
end

0 commit comments

Comments
 (0)