Skip to content

Commit

Permalink
Merge pull request #97 from JuliaParallel/s/fix-gc
Browse files Browse the repository at this point in the history
wip: Fixes to GC interaction
  • Loading branch information
shashi committed Nov 8, 2018
2 parents f808149 + fdb3ae2 commit 7701ae2
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 46 deletions.
39 changes: 2 additions & 37 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,50 +102,15 @@ mutable struct DArray{T,N,F} <: ArrayOp{T, N}
concat::F
freed::Threads.Atomic{UInt8}
function DArray{T,N,F}(domain, subdomains, chunks, concat) where {T, N,F}
A = new(domain, subdomains, chunks, concat, Threads.Atomic{UInt8}(0))
#refcount_chunks(A.chunks)
#finalizer(free!, A)
A
end
end

function serialize(io::AbstractSerializer, A::DArray)
@async refcount_chunks(A)
invoke(serialize, Tuple{AbstractSerializer,Any}, io, A)
end

function deserialize(io::AbstractSerializer, dt::Type{DArray{T,N,F}}) where {T,N,F}
nf = fieldcount(dt)
A = ccall(:jl_new_struct_uninit, Any, (Any,), dt)
Serialization.deserialize_cycle(io, A)
for i in 1:nf
tag = Int32(read(io.io, UInt8)::UInt8)
if tag != Serialization.UNDEFREF_TAG
ccall(:jl_set_nth_field, Cvoid, (Any, Csize_t, Any), A, i-1, Serialization.handle_deserialize(io, tag))
end
end
finalizer(free!, A)
A
end

refcount_chunks(A::DArray) = refcount_chunks(A.chunks)
function refcount_chunks(chunks)
for c in chunks
if c isa Chunk{<:Any, DRef}
# increment refcount on the master node
addrefcount(c.handle, 1)
elseif c isa Thunk
refcount_chunks(c.inputs)
end
new(domain, subdomains, chunks, concat, Threads.Atomic{UInt8}(0))
end
end

function free_chunks(chunks)
@sync for c in chunks
if c isa Chunk{<:Any, DRef}
# increment refcount on the master node
cnt = addrefcount(c.handle, -1)
cnt <= 0 && @async free!(c.handle)
@async free!(c.handle)
elseif c isa Thunk
free_chunks(c.inputs)
end
Expand Down
17 changes: 16 additions & 1 deletion src/chunks.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

using MemPool
using Serialization

export domain, UnitDomain, project, alignfirst, ArrayDomain

Expand Down Expand Up @@ -37,7 +38,7 @@ mutable struct Chunk{T, H}
persist::Bool
function (::Type{Chunk{T,H}})(chunktype, domain, handle, persist) where {T,H}
c = new{T,H}(chunktype, domain, handle, persist)
finalizer(x -> @async(myid() == 1 && nworkers() > 1 && free!(x)), c)
finalizer(x -> @async(myid() == 1 && free!(x)), c)
c
end
end
Expand Down Expand Up @@ -82,6 +83,20 @@ function affinity(r::FileRef)
end
end

function Serialization.deserialize(io::AbstractSerializer, dt::Type{Chunk{T,H}}) where {T,H}
nf = fieldcount(dt)
c = ccall(:jl_new_struct_uninit, Any, (Any,), dt)
Serialization.deserialize_cycle(io, c)
for i in 1:nf
tag = Int32(read(io.io, UInt8)::UInt8)
if tag != Serialization.UNDEFREF_TAG
ccall(:jl_set_nth_field, Cvoid, (Any, Csize_t, Any), c, i-1, Serialization.handle_deserialize(io, tag))
end
end
myid() == 1 && nworkers() > 1 && finalizer(x->@async(free!(x)), c)
c
end

"""
Create a chunk from a sequential object.
"""
Expand Down
9 changes: 6 additions & 3 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Sch

using Distributed

import ..Dagger: Context, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, _thunk_dict, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs
import ..Dagger: Context, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs

const OneToMany = Dict{Thunk, Set{Thunk}}
struct ComputeState
Expand All @@ -13,6 +13,7 @@ struct ComputeState
ready::Vector{Thunk}
cache::Dict{Thunk, Any}
running::Set{Thunk}
thunk_dict::Dict{Int, Any}
end

function cleanup(ctx)
Expand Down Expand Up @@ -60,7 +61,7 @@ function compute_dag(ctx, d::Thunk)
if isa(res, CapturedException) || isa(res, RemoteException)
rethrow(res)
end
node = _thunk_dict[thunk_id]
node = state.thunk_dict[thunk_id]
@logmsg("WORKER $(proc.pid) - $node ($(node.f)) input:$(node.inputs)")
state.cache[node] = res

Expand Down Expand Up @@ -178,6 +179,7 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
data = map(thunk.inputs) do x
istask(x) ? state.cache[x] : x
end
state.thunk_dict[thunk.id] = thunk
async_apply(ctx, proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist, thunk.cache)
end

Expand Down Expand Up @@ -224,7 +226,8 @@ function start_state(deps::Dict, node_order)
OneToMany(),
Vector{Thunk}(undef, 0),
Dict{Thunk, Any}(),
Set{Thunk}()
Set{Thunk}(),
Dict{Int, Thunk}()
)

nodes = sort(collect(keys(deps)), by=node_order)
Expand Down
6 changes: 1 addition & 5 deletions src/thunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ let counter=0
next_id() = (counter >= (1 << 30)) ? (counter = 1) : (counter += 1)
end

global _thunk_dict = Dict{Int, Any}()

# A thing to run
mutable struct Thunk
f::Function
Expand All @@ -28,9 +26,7 @@ mutable struct Thunk
cache_ref=nothing,
affinity=nothing
)
thunk = new(f,xs,id,get_result,meta,persist, cache, cache_ref, affinity)
_thunk_dict[id] = thunk
thunk
new(f,xs,id,get_result,meta,persist, cache, cache_ref, affinity)
end
end

Expand Down
2 changes: 2 additions & 0 deletions test/array.jl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ end
@test !isempty(Dagger.show_plan(Dagger.Thunk(()->10)))
end

#=
@testset "darray distributed refcount" begin
D2 = remotecall_fetch(2, compute(Distribute(Blocks(10, 20), rand(40,40)))) do D
D2 = D
Expand All @@ -223,6 +224,7 @@ end
GC.gc()
@test size(collect(D2)) == (40,40)
end
=#

@testset "sharedarray" begin
A = SharedArray{Int}((1024,))
Expand Down

0 comments on commit 7701ae2

Please sign in to comment.