diff --git a/src/utils.jl b/src/utils.jl index 7134e9f..dcdeb52 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -15,6 +15,13 @@ macro static_error(msg::AbstractString) :(static_error(Val{$(QuoteNode(sym))}())) end +""" + is_pointerfree_type(T::Type) :: Bool + +Return `true` if any instances of `T` do not contain boxed Julia objects. +""" +is_pointerfree_type(::Type{T}) where {T} = isconcretetype(T) && Base.datatype_pointerfree(T) + function ceillog2(n::Integer) n > 0 || throw(DomainError(n)) i = trailing_zeros(n) diff --git a/src/workstealing.jl b/src/workstealing.jl index c64f022..ee83078 100644 --- a/src/workstealing.jl +++ b/src/workstealing.jl @@ -8,6 +8,7 @@ end function CircularVector{T}(log2length::Int) where {T} @assert log2length >= 0 data = Vector{T}(undef, 1 << log2length) + Nothing <: T && fill!(data, nothing) # [^speculative_load] T <: Signed && fill!(data, typemin(T)) return CircularVector{T}(log2length, data) end @@ -51,10 +52,12 @@ mutable struct WorkStealingDeque{T,S} end function WorkStealingDeque{T}() where {T} - if isbitstype(T) || Base.isbitsunion(T) - # TODO: support Some{Union{Int,Nothing}} etc. + if is_pointerfree_type(T) S = T else + # Any element type that may contain a boxed object uses + # `CircularVector{Any}` which initialize all locations with `nothing`. + # This is for supporting "blind" `buffer[top]`. [^speculative_load] S = Any end return WorkStealingDeque{T,S}(CircularVector{S}(4), 1, 1) @@ -139,7 +142,7 @@ function ConcurrentCollections.trypop!(deque::WorkStealingDeque) return r end -function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque) +function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque{T}) where {T} top = @atomic deque.top bottom = @atomic deque.bottom buffer = @atomic deque.buffer @@ -147,28 +150,41 @@ function ConcurrentCollections.trypopfirst!(deque::WorkStealingDeque) if current_size <= 0 return nothing end - if Base.allocatedinline(eltype(buffer)) - r = Some(buffer[top]) - if @atomicreplace(deque.top, top => top + 1)[2] - return r - else - return nothing - end + # TODO: Technically, this should be an atomic load. See below for some + # discussions. [^speculative_load] + y = buffer[top] + if @atomicreplace(deque.top, top => top + 1)[2] + y = y::T + return Some{T}(y) else - ptr = UnsafeAtomics.load(Ptr{Ptr{Cvoid}}(pointer(buffer, top)), monotonic) - if @atomicreplace(deque.top, top => top + 1)[2] - # Safety: The above CAS verifies that the slot `buffer[top]` - # contained the valid element. We can now materialize it as a Julia - # value. - GC.@preserve buffer begin - r = Some(unsafe_pointer_to_objref(ptr)) - end - return r - else - return nothing - end + return nothing end end +# [^speculative_load]: The difficulty here is that we don't know the validity of +# `y = buffer[top]` until the CAS below but we still need to make sure `y` is a +# ("somewhat") valid Julia object since it has to be rooted before the CAS. +# This is because successful CAS means that `buffer` may not root `y` anymore. +# This can be an issue if the compiler inserts a safepoint between `buffer[top]` +# and the CAS. The current implementation relies on the stop-the-world behavior +# of the GC (the "somewhat" part). That is to say, the object `y` is not +# *always* properly accessible at the (potential) safepoint before the CAS. +# However, since the GC (currently) starts traversing the object graph after it +# knows all worker threads reach the safepoint, the implementation above may +# actually be OK for now. +# +# Another issue is that `buffer[top]` may not be assigned and accessing it can +# throw if the element type is a boxed value. The current workaround is to fill +# `buffer` with `nothing`. Alternatively, checking the buffer with `isassigned` +# should also work. However, since dequeue can happen many times while the +# buffer is initialized less frequently (upon construction and resizes), +# the `isassigned`-based strategy is not used. +# +# TODO: The current implementation also expects that it is "OK" to load a +# pointerfree value non-atomically (it's a data race in C++20 memory model hence +# an UB). A proper solution may be to store/load these values as a sequence of +# `UInt`s with relaxed atomics and then type-pun `NTuple{_,UInt}` into the Julia +# values. See also the discussion in the C++ standard: [p0690r1: Tearable +# Atomics](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0690r1.html) function Base.pop!(deque::WorkStealingDeque) r = trypop!(deque)