Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ macro static_error(msg::AbstractString)
:(static_error(Val{$(QuoteNode(sym))}()))
end

"""
is_pointerfree_type(T::Type) :: Bool

Return `true` if any instances of `T` do not contain boxed Julia objects.
"""
is_pointerfree_type(::Type{T}) where {T} = isconcretetype(T) && Base.datatype_pointerfree(T)

function ceillog2(n::Integer)
n > 0 || throw(DomainError(n))
i = trailing_zeros(n)
Expand Down
60 changes: 38 additions & 22 deletions src/workstealing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ end
function CircularVector{T}(log2length::Int) where {T}
@assert log2length >= 0
data = Vector{T}(undef, 1 << log2length)
Nothing <: T && fill!(data, nothing) # [^speculative_load]
T <: Signed && fill!(data, typemin(T))
return CircularVector{T}(log2length, data)
end
Expand Down Expand Up @@ -51,10 +52,12 @@ mutable struct WorkStealingDeque{T,S}
end

function WorkStealingDeque{T}() where {T}
if isbitstype(T) || Base.isbitsunion(T)
# TODO: support Some{Union{Int,Nothing}} etc.
if is_pointerfree_type(T)
S = T
else
# Any element type that may contain a boxed object uses
# `CircularVector{Any}` which initialize all locations with `nothing`.
# This is for supporting "blind" `buffer[top]`. [^speculative_load]
S = Any
end
return WorkStealingDeque{T,S}(CircularVector{S}(4), 1, 1)
Expand Down Expand Up @@ -139,36 +142,49 @@ function ConcurrentCollections.trypop!(deque::WorkStealingDeque)
return r
end

function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque)
function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque{T}) where {T}
top = @atomic deque.top
bottom = @atomic deque.bottom
buffer = @atomic deque.buffer
current_size = bottom - top
if current_size <= 0
return nothing
end
if Base.allocatedinline(eltype(buffer))
r = Some(buffer[top])
if @atomicreplace(deque.top, top => top + 1)[2]
return r
else
return nothing
end
# TODO: Technically, this should be an atomic load. See below for some
# discussions. [^speculative_load]
y = buffer[top]
if @atomicreplace(deque.top, top => top + 1)[2]
y = y::T
return Some{T}(y)
else
ptr = UnsafeAtomics.load(Ptr{Ptr{Cvoid}}(pointer(buffer, top)), monotonic)
if @atomicreplace(deque.top, top => top + 1)[2]
# Safety: The above CAS verifies that the slot `buffer[top]`
# contained the valid element. We can now materialize it as a Julia
# value.
GC.@preserve buffer begin
r = Some(unsafe_pointer_to_objref(ptr))
end
return r
else
return nothing
end
return nothing
end
end
# [^speculative_load]: The difficulty here is that we don't know the validity of
# `y = buffer[top]` until the CAS below but we still need to make sure `y` is a
# ("somewhat") valid Julia object since it has to be rooted before the CAS.
# This is because successful CAS means that `buffer` may not root `y` anymore.
# This can be an issue if the compiler inserts a safepoint between `buffer[top]`
# and the CAS. The current implementation relies on the stop-the-world behavior
# of the GC (the "somewhat" part). That is to say, the object `y` is not
# *always* properly accessible at the (potential) safepoint before the CAS.
# However, since the GC (currently) starts traversing the object graph after it
# knows all worker threads reach the safepoint, the implementation above may
# actually be OK for now.
#
# Another issue is that `buffer[top]` may not be assigned and accessing it can
# throw if the element type is a boxed value. The current workaround is to fill
# `buffer` with `nothing`. Alternatively, checking the buffer with `isassigned`
# should also work. However, since dequeue can happen many times while the
# buffer is initialized less frequently (upon construction and resizes),
# the `isassigned`-based strategy is not used.
#
# TODO: The current implementation also expects that it is "OK" to load a
# pointerfree value non-atomically (it's a data race in C++20 memory model hence
# an UB). A proper solution may be to store/load these values as a sequence of
# `UInt`s with relaxed atomics and then type-pun `NTuple{_,UInt}` into the Julia
# values. See also the discussion in the C++ standard: [p0690r1: Tearable
# Atomics](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0690r1.html)

function Base.pop!(deque::WorkStealingDeque)
r = trypop!(deque)
Expand Down