Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ uuid = "8290d209-cae3-49c0-8002-c8c24d57dab5"
authors = ["Chris Elrod <elrodc@gmail.com> and contributors"]
version = "0.4.4"

[deps]
VectorizationBase = "3d5dd08c-fd9d-11e8-17fa-ed2836048c2f"

[compat]
Aqua = "0.5"
VectorizationBase = "0.19.2, 0.20"
julia = "1.5"

[extras]
Expand Down
23 changes: 17 additions & 6 deletions src/ThreadingUtilities.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
module ThreadingUtilities

using VectorizationBase:
pause, StaticInt, StridedPointer, stridedpointer, offsets, cache_linesize, align, __vload, __vstore!, num_threads, assume, False, register_size, NativeTypes
"""
pause()

For use in spin-and-wait loops, like spinlocks.
"""
@inline pause() = ccall(:jl_cpu_pause, Cvoid, ())

if VERSION ≥ v"1.6.0-DEV.674"
@inline assume(b::Bool)::Cvoid = Base.llvmcall((" declare void @llvm.assume(i1)\n\n define void @entry(i8) alwaysinline {\n top:\n %b = trunc i8 %0 to i1\ncall void @llvm.assume(i1 %b)\nret void\n }\n", "entry"), Cvoid, Tuple{Bool}, b)
else
@inline assume(b::Bool)::Cvoid = Base.llvmcall(("declare void @llvm.assume(i1)", "%b = trunc i8 %0 to i1\ncall void @llvm.assume(i1 %b)\nret void"), Cvoid, Tuple{Bool}, b)
end

@enum ThreadState::UInt32 begin
TASK = 0 # 0: task available
WAIT = 1 # 1: waiting
SPIN = 2 # 2: spinning
end
const TASKS = Task[]
const LINESPACING = 256 # maximum cache-line size among contemporary CPUs.
const THREADBUFFERSIZE = 512
const THREADPOOL = UInt[]
const THREADPOOLPTR = Ref{Ptr{UInt}}(C_NULL);
Expand Down Expand Up @@ -37,11 +48,11 @@ end

function __init__()
_print_exclusivity_warning()
nt = min(Threads.nthreads(),(Sys.CPU_THREADS)::Int) - 1
resize!(THREADPOOL, (THREADBUFFERSIZE ÷ sizeof(UInt)) * nt + (cache_linesize() ÷ sizeof(UInt)) - 1)
nt = min(Threads.nthreads(), (Sys.CPU_THREADS)::Int) - 1
resize!(THREADPOOL, (THREADBUFFERSIZE ÷ sizeof(UInt)) * nt + (LINESPACING ÷ sizeof(UInt)) - 1)
copyto!(THREADPOOL, zero(UInt))
THREADPOOLPTR[] = align(pointer(THREADPOOL)) - THREADBUFFERSIZE
Threads.atomic_fence() # ensure 0-initialization
# align to LINESPACING boundary, and then subtract THREADBUFFERSIZE to make the pointer 1-indexed
THREADPOOLPTR[] = reinterpret(Ptr{UInt}, (reinterpret(UInt, (pointer(THREADPOOL)))+LINESPACING-1) & (-LINESPACING)) - THREADBUFFERSIZE
resize!(TASKS, nt)
foreach(initialize_task, 1:nt)
end
Expand Down
4 changes: 0 additions & 4 deletions src/atomics.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# TODO: Is atomic volatile really necessary?
# Early on my attempts weren't syncing / atomics
# weren't behaving atomically between threads so
# I got a bit defensive.
for (ityp,jtyp) ∈ [("i8", UInt8), ("i16", UInt16), ("i32", UInt32), ("i64", UInt64), ("i128", UInt128)]
@eval begin
@inline function _atomic_load(ptr::Ptr{$jtyp})
Expand Down
2 changes: 0 additions & 2 deletions src/threadtasks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ function (tt::ThreadTask)()
end

# 1-based tid, pushes into task 2-nthreads()
# function wake_thread!(tid::T) where {T <: Unsigned}
@noinline function wake_thread!(_tid::T) where {T <: Integer}
tid = _tid % Int
# store!(taskpointer(_tid), TASK)
tidp1 = tid + one(tid)
assume(unsigned(length(Base.Workqueues)) > unsigned(tid))
assume(unsigned(length(TASKS)) > unsigned(tidp1))
Expand Down
165 changes: 73 additions & 92 deletions src/utils.jl
Original file line number Diff line number Diff line change
@@ -1,103 +1,84 @@
# To add support for loading/storing...
@inline function load(p::Ptr{UInt}, ::Type{T}) where {T<:NativeTypes}
__vload(Base.unsafe_convert(Ptr{T}, p), False(), register_size())
@generated function load(p::Ptr{T}) where {T}
if Base.allocatedinline(T)
Expr(:block, Expr(:meta,:inline), :(unsafe_load(p)))
else
Expr(:block, Expr(:meta,:inline), :(ccall(:jl_value_ptr, Ref{$T}, (Ptr{Cvoid},), unsafe_load(Base.unsafe_convert(Ptr{Ptr{Cvoid}}, p)))))
end
end
@inline function load(p::Ptr{UInt}, ::Type{T}) where {T<:Union{Ptr,Core.LLVMPtr}}
reinterpret(T, __vload(p, False(), register_size()))
@inline load(p::Ptr{UInt}, ::Type{T}) where {T} = load(reinterpret(Ptr{T}, p))
@generated function store!(p::Ptr{T}, v::T) where {T}
if Base.allocatedinline(T)
Expr(:block, Expr(:meta,:inline), :(unsafe_store!(p, v); return nothing))
else
Expr(:block, Expr(:meta,:inline), :(unsafe_store!(Base.unsafe_convert(Ptr{Ptr{Cvoid}}, p), Base.pointer_from_objref(v)); return nothing))
end
end
# @inline function load(p::Ptr{UInt}, ::Type{T}) where {T<:NativeTypes}
# __vload(reinterpret(Core.LLVMPtr{T,0}, p), False(), register_size())
# end
# @inline function load(p::Ptr{UInt}, ::Type{T}) where {T<:Union{Ptr,Core.LLVMPtr}}
# reinterpret(T, __vload(reinterpret(Core.LLVMPtr{UInt,0}, p), False(), register_size()))
# end
@inline load(p::Ptr{UInt}, ::Type{T}) where {T} = unsafe_load(Base.unsafe_convert(Ptr{T}, p))
@inline function store!(p::Ptr{UInt}, x::T) where {T <: Union{Ptr,Core.LLVMPtr}}
__vstore!(p, reinterpret(UInt, x), False(), False(), False(), register_size())
end
@inline function store!(p::Ptr{UInt}, x::T) where {T <: NativeTypes}
__vstore!(Base.unsafe_convert(Ptr{T}, p), x, False(), False(), False(), register_size())
end
# @inline function store!(p::Ptr{UInt}, x::T) where {T <: Union{Ptr,Core.LLVMPtr}}
# __vstore!(reinterpret(Core.LLVMPtr{UInt,0}, p), reinterpret(UInt, x), False(), False(), False(), register_size())
# end
# @inline function store!(p::Ptr{UInt}, x::T) where {T <: NativeTypes}
# __vstore!(reinterpret(Core.LLVMPtr{T,0}, p), x, False(), False(), False(), register_size())
# end
@inline store!(p::Ptr{UInt}, x::T) where {T} = (unsafe_store!(Base.unsafe_convert(Ptr{T}, p), x); nothing)

@inline load(p::Ptr{UInt}, ::Type{StaticInt{N}}, i) where {N} = i, StaticInt{N}()
@inline store!(p::Ptr{UInt}, ::StaticInt, i) = i

offsetsize(::Type{T}) where {T} = Base.allocatedinline(T) ? sizeof(T) : sizeof(Int)




@generated function load(p::Ptr{UInt}, ::Type{StridedPointer{T,N,C,B,R,X,O}}, i) where {T,N,C,B,R,X,O}
q = quote
$(Expr(:meta,:inline))
i, ptr = load(p, Ptr{$T}, i)
end
xt = Expr(:tuple)
Xp = X.parameters
for n ∈ 1:N
x = Symbol(:x_,n)
push!(xt.args, x)
push!(q.args, :((i, $x) = load(p, $(Xp[n]), i)))
function load_aggregate(::Type{T}, offset::Int) where {T}
numfields = fieldcount(T)
call = (T <: Tuple) ? Expr(:tuple) : Expr(:new, T)
for f ∈ 1:numfields
TF = fieldtype(T, f)
if Base.issingletontype(TF)
push!(call.args, TF.instance)
elseif fieldcount(TF) ≡ 0
ptr = :(p + (offset + $offset))
ptr = TF === UInt ? ptr : :(reinterpret(Ptr{$TF}, $ptr))
push!(call.args, :(load($ptr)))
offset += offsetsize(TF)
else
arg, offset = load_aggregate(TF, offset)
push!(call.args, arg)
end
ot = Expr(:tuple)
Op = O.parameters
for n ∈ 1:N
o = Symbol(:o_,n)
push!(ot.args, o)
push!(q.args, :((i, $o) = load(p, $(Op[n]), i)))
end
push!(q.args, :((i, StridedPointer{$T,$N,$C,$B,$R}(ptr, $xt, $ot))))
q
end
@generated function store!(p::Ptr{UInt}, ptr::StridedPointer{T,N,C,B,R,X,O}, i) where {T,N,C,B,R,X,O}
q = quote
$(Expr(:meta,:inline))
i = store!(p, pointer(ptr), i)
strd = strides(ptr)
offs = offsets(ptr)
end
for n ∈ 1:N
push!(q.args, :(i = store!(p, strd[$n], i)))
end
for n ∈ 1:N
push!(q.args, :(i = store!(p, offs[$n], i)))
end
push!(q.args, :i)
q
end

@inline function load(p::Ptr{UInt}, ::Type{T}, i) where {T}
i + sizeof(T), load(p + i, T)
end
return call, offset
end
@inline function store!(p::Ptr{UInt}, x, i)
store!(p + i, x)
i + sizeof(x)
@generated function load(p::Ptr{UInt}, ::Type{T}, offset::Int) where {T}
if Base.issingletontype(T)
call = Expr(:tuple, :offset, T.instance)
elseif fieldcount(T) ≡ 0
ptr = :(p + offset)
ptr = T === UInt ? ptr : :(reinterpret(Ptr{$T}, $ptr))
call = :(((offset + $(offsetsize(T)), load($ptr))))
else
call, off = load_aggregate(T, 0)
call = Expr(:tuple, :(offset + $off), call)
end
Expr(:block, Expr(:meta,:inline), call)
end

@generated function load(p::Ptr{UInt}, ::Type{T}, i) where {T<:Tuple}
q = Expr(:block, Expr(:meta,:inline))
tup = Expr(:tuple)
for (i,t) ∈ enumerate(T.parameters)
ln = Symbol(:l_,i)
push!(tup.args, ln)
push!(q.args, :((i,$ln) = load(p, $t, i)))
function store_aggregate!(q::Expr, sym, ::Type{T}, offset::Int) where {T}
gf = GlobalRef(Core,:getfield)
for f ∈ 1:fieldcount(T)
TF = fieldtype(T, f)
Base.issingletontype(TF) && continue
gfcall = Expr(:call, gf, sym, f)
if fieldcount(TF) ≡ 0
ptr = :(p + (offset + $offset))
ptr = TF === UInt ? ptr : :(reinterpret(Ptr{$TF}, $ptr))
push!(q.args, :(store!($ptr, $gfcall)))
offset += offsetsize(TF)
else
newsym = gensym(sym)
push!(q.args, Expr(:(=), newsym, gfcall))
offset = store_aggregate!(q, newsym, TF, offset)
end
push!(q.args, :(i, $tup))
q
end
@inline function store!(p::Ptr{UInt}, tup::Tuple{A,B,Vararg{Any,N}}, i) where {A,B,N}
i = store!(p, first(tup), i)
store!(p, Base.tail(tup), i)
end
return offset
end
@inline function store!(p::Ptr{UInt}, tup::Tuple{A}, i) where {A}
store!(p, first(tup), i)
@generated function store!(p::Ptr{UInt}, x::T, offset::Int) where {T}
Base.issingletontype(T) && return :offset
body = Expr(:block, Expr(:meta,:inline))
if fieldcount(T) ≡ 0
ptr = :(p + offset)
ptr = T === UInt ? ptr : :(reinterpret(Ptr{$T}, $ptr))
push!(body.args, :(store!($ptr, x)))
off = offsetsize(T)
else
off = store_aggregate!(body, :x, T, 0)
end
push!(body.args, Expr(:call, +, :offset, off))
return body
end
@inline store!(p::Ptr{UInt}, tup::Tuple{}, i) = i
@inline store!(p::Ptr{UInt}, tup::Nothing, i) = i

7 changes: 5 additions & 2 deletions test/internals.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
@test ThreadingUtilities.store!(pointer(UInt[]), nothing, 1) == 1
x = zeros(UInt, 100);
GC.@preserve x begin
t1 = (1.0, C_NULL, 3, ThreadingUtilities.stridedpointer(x))
t1 = (1.0, C_NULL, Val(7), (3, UInt(17)), VectorizationBase.stridedpointer(x))
@test ThreadingUtilities.store!(pointer(x), t1, 0) === mapreduce(sizeof, +, t1)
@test ThreadingUtilities.store!(pointer(x), Val(0), 0) == 0
@test ThreadingUtilities.load(pointer(x), typeof(t1), 0) === (mapreduce(sizeof, +, t1), t1)

@test ThreadingUtilities.load(pointer(x), Val{0}, 0) === (0, Val(0))
@test ThreadingUtilities.store!(pointer(x), 0xb502916f%UInt, 72) == 72 + sizeof(Int)
@test ThreadingUtilities.load(pointer(x), UInt, 72) == (72 + sizeof(Int),0xb502916f%UInt)
nt1 = (;a = 1.0)
@test ThreadingUtilities.store!(pointer(x), nt1, 0) === sizeof(nt1)
@test ThreadingUtilities.load(pointer(x), typeof(nt1), 0) === (sizeof(nt1), nt1)
Expand Down
2 changes: 1 addition & 1 deletion test/threadpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
@test isconst(ThreadingUtilities, :THREADPOOL) # test that ThreadingUtilities.THREADPOOL is a constant
@test ThreadingUtilities.THREADPOOL isa Vector{UInt}
@test eltype(ThreadingUtilities.THREADPOOL) === UInt
@test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),(Sys.CPU_THREADS)::Int) - 1) + (VectorizationBase.cache_linesize() ÷ sizeof(UInt)) - 1
@test length(ThreadingUtilities.THREADPOOL) == (ThreadingUtilities.THREADBUFFERSIZE÷sizeof(UInt)) * (min(Threads.nthreads(),(Sys.CPU_THREADS)::Int) - 1) + (256 ÷ sizeof(UInt)) - 1
end