-
-
Notifications
You must be signed in to change notification settings - Fork 63
/
thunk.jl
119 lines (105 loc) · 3.19 KB
/
thunk.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
export Thunk, delayed, delayedmap
let counter=0
global next_id
next_id() = (counter >= (1 << 30)) ? (counter = 1) : (counter += 1)
end
# A thing to run
mutable struct Thunk
f::Function
inputs::Tuple
id::Int
get_result::Bool # whether the worker should send the result or only the metadata
meta::Bool
persist::Bool # don't `free!` result after computing
cache::Bool # release the result giving the worker an opportunity to
# cache it
cache_ref::Any
affinity::Union{Nothing, Vector{Pair{OSProc, Int}}}
options::Any # stores scheduler-specific options
function Thunk(f, xs...;
id::Int=next_id(),
get_result::Bool=false,
meta::Bool=false,
persist::Bool=false,
cache::Bool=false,
cache_ref=nothing,
affinity=nothing,
options=nothing
)
new(f,xs,id,get_result,meta,persist, cache, cache_ref, affinity, options)
end
end
function affinity(t::Thunk)
if t.affinity !== nothing
@logmsg("$t has (cached) affinity: $(get(t.affinity))")
return t.affinity
end
if t.cache && t.cache_ref !== nothing
aff_vec = affinity(t.cache_ref)
else
aff = Dict{OSProc,Int}()
for inp in inputs(t)
#if haskey(state.cache, inp)
# as = affinity(state.cache[inp])
# for a in as
# proc, sz = a
# aff[proc] = get(aff, proc, 0) + sz
# end
#else
if isa(inp, Union{Chunk, Thunk})
for a in affinity(inp)
proc, sz = a
aff[proc] = get(aff, proc, 0) + sz
end
end
#end
end
aff_vec = collect(aff)
end
@logmsg("$t has affinity: $aff_vec")
aff_vec
#if length(aff) > 1
# return sort!(aff_vec, by=last,rev=true)
#else
# return aff_vec
#end
end
function delayed(f; kwargs...)
(args...) -> Thunk(f, args...; kwargs...)
end
delayedmap(f, xs...) = map(delayed(f), xs...)
persist!(t::Thunk) = (t.persist=true; t)
cache_result!(t::Thunk) = (t.cache=true; t)
# @generated function compose{N}(f, g, t::NTuple{N})
# if N <= 4
# ( :(()->f(g())),
# :((a)->f(g(a))),
# :((a,b)->f(g(a,b))),
# :((a,b,c)->f(g(a,b,c))),
# :((a,b,c,d)->f(g(a,b,c,d))), )[N+1]
# else
# :((xs...) -> f(g(xs...)))
# end
# end
# function Thunk(f::Function, t::Tuple{Thunk})
# g = compose(f, t[1].f, t[1].inputs)
# @logmsg(string("FUSING ", f, "*", t[1].f))
# Thunk(g, t[1].inputs)
# end
# this gives a ~30x speedup in hashing
Base.hash(x::Thunk, h::UInt) = hash(x.id, hash(h, 0x7ad3bac49089a05f % UInt))
Base.isequal(x::Thunk, y::Thunk) = x.id==y.id
function Base.show(io::IO, z::Thunk)
lvl = get(io, :lazy_level, 1)
print(io, "Thunk($(z.f), ")
if lvl < 2
show(IOContext(io, :lazy_level => lvl+1), z.inputs)
else
print(io, "...")
end
print(io, ")")
end
inputs(x::Thunk) = x.inputs
inputs(x) = ()
istask(x::Thunk) = true
istask(x) = false