Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-local current_task keeps garbage alive #40626

Open
maleadt opened this issue Apr 27, 2021 · 12 comments · May be fixed by #47405 or #48037
Open

Thread-local current_task keeps garbage alive #40626

maleadt opened this issue Apr 27, 2021 · 12 comments · May be fixed by #47405 or #48037
Labels
domain:multithreading Base.Threads and related functionality GC Garbage collector

Comments

@maleadt
Copy link
Member

maleadt commented Apr 27, 2021

When returning values from tasks executed on another thread, those values are kept alive even though it should be possible to collect them:

julia> a = [2,2];
julia> finalizer(a) do _
         Core.println("finalizing a")
       end

julia> t = Threads.@spawn (println("returning a from thread $(Threads.threadid())"); a)
Task (runnable) @0x00007fffb73e8e80
returning a from thread 2

julia> dump(t)
Task
  next: Nothing nothing
  queue: Nothing nothing
  storage: Nothing nothing
  donenotify: Base.GenericCondition{Base.Threads.SpinLock}
    waitq: Base.InvasiveLinkedList{Task}
      head: Nothing nothing
      tail: Nothing nothing
    lock: Base.Threads.SpinLock
      owned: Int64 0
  result: Array{Int64}((2,)) [2, 2] #  <----------------- the array a is being kept alive by the task
  logstate: Nothing nothing
  code: #3 (function of type var"#3#4")
  _state: UInt8 0x01
  sticky: Bool false
  _isexception: Bool false

julia> wait(t)
julia> a = nothing
julia> t = nothing
julia> GC.gc(true)

# nothing is collected

Running under gdb reveals that the task object is being kept alive in the thread's local storage:

$ gdb --args julia -t2
(gdb) run

julia> code from above

(gdb) call jl_(jl_all_tls_states[1].current_task)
Task(next=nothing, queue=nothing, storage=nothing,
     donenotify=Base.GenericCondition{Base.Threads.SpinLock}(waitq=Base.InvasiveLinkedList{Task}(head=nothing, tail=nothing), lock=Base.Threads.SpinLock(owned=0)),
     result=Array{Int64, (2,)}[2, 2], # <------------------- our array again
     logstate=nothing, code=Main.var"#3", _state=0x01, sticky=false, _isexception=false)

Running another task on the same thread replaces that sate and allows collection of the array:

julia> # code from above

julia> t = Threads.@spawn println("doing nothing from thread $(Threads.threadid())")
Task (runnable) @0x00007fffb5b99e40
doing nothing from thread 2

julia> GC.gc(true)
finalizing a

As observed in JuliaGPU/CUDA.jl#866.

@maleadt maleadt added domain:multithreading Base.Threads and related functionality GC Garbage collector labels Apr 27, 2021
maleadt added a commit to JuliaGPU/CUDA.jl that referenced this issue Apr 28, 2021
Fixes JuliaGPU/CUAD.jl#866
[skip tests]
maleadt added a commit to JuliaGPU/CUDA.jl that referenced this issue Apr 28, 2021
@bkamins
Copy link
Member

bkamins commented May 17, 2021

@quinnj - maybe this is what happens in CSV.jl?

@JosePereiraUA
Copy link

Also, maybe this is happening with PyCall + PyTorch ? (See https://discourse.julialang.org/t/using-gpu-via-pycall-causes-non-reusable-memory-allocation/55140)

@tkf
Copy link
Member

tkf commented Mar 1, 2022

I guess this is because we keep the current task during sleep? If so, a catch-all (and kinda horrible) workaround may be to run

Threads.@threads :static for _ in 1:Threads.nthreads()
    Timer(Returns(nothing), 0; interval = 1)
end

?

Now that we store Task in the actual TLS and not ptls, I think we can't GC the current task even when the worker thread sleeps. We also can't unconditionally clear out jl_current_task->result = NULL since there may be references to this task (and fetch can be called). Several options I can think of:

  1. Create a dummy task per worker thread and set it as the current task before sleep.
  2. Minimize Core.Task fields (without result field and also other auxiliary things) and wrap it in Base.Task and use it in Threads.@spawn etc. We can then GC the Base.Task while keeping Core.Task alive (via ptls).

For a short-term solution with relatively small changes, I think Option 1 is attractive. Option 2 is attractive from other points of view like task fusion where you'd want to create tasks without materializing the auxiliary fields (e.g., RNG).

@pxl-th
Copy link

pxl-th commented Sep 28, 2022

Bumped into this issue with JuliaGPU/AMDGPU.jl#299, where each ROCKernelSignal spawns waiter on a separate thread, but after it finishes it prevents finalizers frome being called (and thus signals are not deleted until Julia exits, same for ROCQueue).

Proposed workaround in JuliaGPU/CUDA.jl#866 does not work as those threads do not return.

Here's also a small MWE that reflects AMDGPU's situation.

quinnj added a commit to JuliaData/CSV.jl that referenced this issue Oct 25, 2022
We're surely running into JuliaLang/julia#40626.

This is my attempt to clear thread's local storage after multithreaded
parsing.
@bkamins
Copy link
Member

bkamins commented Oct 25, 2022

bump

quinnj added a commit to JuliaData/CSV.jl that referenced this issue Oct 25, 2022
#1046)

* Clear thread state to ensure threads local state don't keep references

We're surely running into JuliaLang/julia#40626.

This is my attempt to clear thread's local storage after multithreaded
parsing.

* 1.6 compat

* fix again
@bkamins
Copy link
Member

bkamins commented Oct 29, 2022

Maybe the simplest solution would be to make sure that then GC is invoked the solution proposed by @tkf is invoked before GC process? (the point is that, AFAICT, GC currently ensures that it does not run in parallel to anything else - but I do not know enough about details of current GC implementation so this might be a wrong approach)

@jpsamaroo
Copy link
Member

I would imagine that we can teach the GC to check the task's _state field, and if it's 0x1 (done) or 0x2 (failed), then it will treat the TLS root as weak (i.e. if there are no other references to the task, it will never become accessible again except via cross-thread inspection).

@quinnj
Copy link
Member

quinnj commented Dec 7, 2022

Ok, I've dug into this a bit more on the CSV.jl side and here are my notes:

  • There are actually 2 ways things can remain unexpectedly referenced when using Threads.@spawn (or @async): 1) as noted above via whatever is returned from the Task, but also 2) any input arguments that are captured as part of the Task thunk closure. These latter references are actually harder to deal w/ since you can usually define your API around returning nothing from a Task, but inputs are inputs.
  • @tkf's Threads.@threads :static ... idea is tricky/problematic because now we're (semi-)arbitrarily messing with Julia's global thread state, which can cause problems for nested Threads.@spawn calls; i.e. if an inner Threads.@spawn call then calls this "thread state clearing" function after it finishes, it can mess with other peer or parent Threads.@spawn calls. We saw this problem when we tried to "clear threads state" at the end of CSV.File, but then a user ran something like asyncmap(CSV.File, files) and things got borked.

Following along the lines of Julian's idea, I came up w/ the following helper macros:

"""
    @syncpreserve args... begin

    end

A macro that wraps a `@sync` block with `GC.@preserve` calls for all `args...` arguments, to ensure
they are not garbage collected for the lifetime of the `@sync` block.
"""
macro syncpreserve(args...)
    expr = args[end]
    args = args[1:end-1]
    esc(quote
        GC.@preserve $(args...) begin
            @sync $expr
        end
    end)
end

"""
    @weakrefspawn args... begin

    end

A macro that wraps a `Threads.@spawn` block with `WeakRef` calls for all `args...` arguments, allowing
them to be garbage collected once the `Task` has finished running. Must be used within a `@syncpreserve`
block to ensure input arguments are not garbage collected for the lifetime of the `@sync` block.
"""
macro weakrefspawn(args...)
    expr = args[end]
    args = args[1:end-1]
    block = Expr(:block, :(wkd = Dict()))
    unpack = Expr(:block)
    for arg in args
        push!(block.args, :(wkd[$(Meta.quot(arg))] = WeakRef($arg)))
        push!(unpack.args, :($(Symbol("_", arg)) = wkd[$(Meta.quot(arg))].value))
    end
    esc(quote
        $block
        Threads.@spawn begin
            $unpack
            $expr
        end
    end)
end

Essentially the idea is that I wrap all my input arguments to my Threads.@spawn block in WeakRef to allow them to be GCed, but also I have a @syncpreserve that will wrap the same arguments with GC.@preserve for the life of a @sync block to ensure they stay alive for as long as needed. Their usage looks like this in CSV.jl:

@syncpreserve ctx pertaskcolumns rowchunkguess rows wholecolumnslock for i = 1:ntasks
            @weakrefspawn ctx pertaskcolumns rowchunkguess i rows wholecolumnslock multithreadparse(_ctx, _pertaskcolumns, _rowchunkguess, _i, _rows, _wholecolumnslock)
        end

It's a lot of argument repetition, but from my inspection, it seems to be doing the intended job: that is, even though various spawned Tasks hang around in global thread states (as inspected in lldb via jl_all_tls_states), the WeakRefs in my "arg dict" get garbage collected as expected after CSV.File calls.

So all in all, a bit clunky, but it works. Anyone see any problems with this approach? Will @jpsamaroo's PR/approach be able to do this kind of thing automatically?

I've come to see this as one of those unfortunate cleanups that we should really address; it doesn't jump up an bite you as a bug, but it's an unfortunate piece of our current design where stuff can hang around a lot longer than you expect.

quinnj added a commit to JuliaData/CSV.jl that referenced this issue Dec 7, 2022
Most complete explanation is [here](JuliaLang/julia#40626 (comment)).

Also discussed [here](#1057).

This PR proposes an alternative solution to `clear_thread_states` where that approach
can be problematic (interferring with global thread state, not working as expected in nested spawned
tasks, etc.). The previous definition also started unending `Timer` tasks that could build up over time.

The approach in this PR is to wrap spawned task closure arguments in `WeakRef` to allow them to be
GCed as expected once the tasks are finished.
@NHDaly
Copy link
Member

NHDaly commented Dec 7, 2022

Excellent writeup @quinnj

We chatted about this a bit offline, but i don't think the @syncpreserve should be needed. I think the $unpack inside the Task's lifetime should be enough to ensure that the variables have GC-rooted references during the lifetime of the $expr. We think that there were some unrelated experimental issues that led to thinking @syncpreserve was needed.

But yeah, otherwise this seems like a decent workaround..

@NHDaly
Copy link
Member

NHDaly commented Dec 7, 2022

I was wondering about the underlying issue:

When a thread goes to sleep, couldn't it switch to a specific "sleeper Task" that just does the sleep, so that it can free the user's Task object?

i.e. instead of sleeping a task looking like something this:

function _jl_get_next_task()
     next_task_or_nothing = fetch_task()
     if next_task_or_nothing === nothing
         sleep(tls.sleep_cond)
     else
         task_switch(next_task_or_nothing::Task)
     end
end

it could instead look something like this?:

function _jl_get_next_task()
     next_task_or_nothing = fetch_task()
     if next_task_or_nothing === nothing
         task_switch(tls.sleeper_task)
     else
         task_switch(next_task_or_nothing::Task)
     end
end

where the sleeper_task would just be a per-Thread task that runs a function something like this:

function sleeper_task()
    while true
        sleep(tls.cond)
        next_task_or_nothing = fetch_task()
        if next_task_or_nothing !== nothing
            task_switch(next_task_or_nothing::Task)
        end
    end
end

Is there a reason we don't have this kind of design? It would also make CPU profiles easier to understand, since thread sleep always looks the same.

quinnj added a commit to JuliaServices/ConcurrentUtilities.jl that referenced this issue Dec 7, 2022
…e argumetns in WeakRef

Fixes JuliaData/CSV.jl#1057.
Works around current Julia limitation here: JuliaLang/julia#40626.

`@wkspawn` acts just like `Threads.@spawn`, except for mutable, interpolated arguments
in the spawned expression, they will also be transparently wrapped as `WeakRef`s,
then immediately unwrapped within the spawn block. This avoids the `Task` closure
capturing mutable arguments in a more permanent way and preventing their collection
later, even after there are no more program references to the mutable argument.
@quinnj
Copy link
Member

quinnj commented Dec 7, 2022

Minor update: I packaged my idea(s) above into a single @wkspawn macro in a PR here. This emulates Threads.@spawn in every way, but will wrap mutable closure arguments in WeakRef and then transparently unwrap within the spawn expression.

I added some tests that show the current, surprising behavior with Threads.@spawn and how the ref is correctly garbage collected with @wkspawn.

@vchuravy
Copy link
Sponsor Member

vchuravy commented Dec 7, 2022

I believe we should be able to easily do #47829, this doesn't treat the result as a transitive weak reference, but as @quinnj said if that is crucial one can return nothing from the task.

quinnj added a commit to JuliaServices/ConcurrentUtilities.jl that referenced this issue Dec 7, 2022
#10)

* Add new `@wkspawn` macro for wrapping mutable `Threads.@spawn` closure argumetns in WeakRef

Fixes JuliaData/CSV.jl#1057.
Works around current Julia limitation here: JuliaLang/julia#40626.

`@wkspawn` acts just like `Threads.@spawn`, except for mutable, interpolated arguments
in the spawned expression, they will also be transparently wrapped as `WeakRef`s,
then immediately unwrapped within the spawn block. This avoids the `Task` closure
capturing mutable arguments in a more permanent way and preventing their collection
later, even after there are no more program references to the mutable argument.

* Compat

* fix

* try lots of gc

* Fix

* Run with threads
quinnj added a commit to JuliaData/CSV.jl that referenced this issue Dec 8, 2022
…#1058)

* Use WeakRefs for spawned tasks to avoid holding unexpected references

Most complete explanation is [here](JuliaLang/julia#40626 (comment)).

Also discussed [here](#1057).

This PR proposes an alternative solution to `clear_thread_states` where that approach
can be problematic (interferring with global thread state, not working as expected in nested spawned
tasks, etc.). The previous definition also started unending `Timer` tasks that could build up over time.

The approach in this PR is to wrap spawned task closure arguments in `WeakRef` to allow them to be
GCed as expected once the tasks are finished.

* Try putting gc preserve inside Threads.spawn block

* Outside GC preserve manual sync block

* Make Context mutable so it gets preserved properly

* Only wrap in WeakRef if mutable

* oops

* Use `@wkspawn` from WorkerUtilities.jl package

* 1.6 compat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain:multithreading Base.Threads and related functionality GC Garbage collector
Projects
None yet
9 participants