Skip to content

Commit

Permalink
Restructured SMCIO parallel internal code
Browse files Browse the repository at this point in the history
Disabled Win32 testing on AppVeyor
Added benchmarking script
  • Loading branch information
awllee committed Dec 17, 2017
1 parent a32eae6 commit 658e475
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 245 deletions.
8 changes: 4 additions & 4 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
environment:
matrix:
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe"
# - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.6/julia-0.6-latest-win64.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
# - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"

## uncomment the following lines to allow failures on nightly julia
## (tests will run but not make your overall status red)
#matrix:
# allow_failures:
allow_failures:
# - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
# - JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"

branches:
only:
Expand Down
12 changes: 6 additions & 6 deletions src/common.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ end

## better not to inline ; enough work should be done in the loop
function _mutateParticles!(zetas::Vec, engine::SMCRNG, p::Int64, M!::F,
zetaAncs::Vec, pScratch) where {Particle, Vec<:AbstractVector{Particle},
F<:Function}
zetaAncs::Vec, pScratch::ParticleScratch) where {Particle,
Vec<:AbstractVector{Particle}, F<:Function, ParticleScratch}
for j in eachindex(zetas)
@inbounds M!(zetas[j], engine, p, zetaAncs[j], pScratch)
end
end

## better not to inline ; enough work should be done in the loop
function _mutateParticles!(zetas::Vec, engine::SMCRNG, p::Int64, M!::F,
zetaAncs::Vec, pScratch, xref::Particle) where {Particle,
Vec<:AbstractVector{Particle}, F<:Function}
zetaAncs::Vec, pScratch::ParticleScratch, xref::Particle) where {Particle,
Vec<:AbstractVector{Particle}, F<:Function, ParticleScratch}
particleCopy!(zetas[1], xref)
for j in 2:length(zetas)
@inbounds M!(zetas[j], engine, p, zetaAncs[j], pScratch)
Expand All @@ -73,9 +73,9 @@ end

## better not to inline ; enough work should be done in the loop
function _logWeightParticles!(lws::Vec1, p::Int64, lG::F,
zetas::Vec2, pScratch, resample::Bool, oldWs::Vec1) where
zetas::Vec2, pScratch::ParticleScratch, resample::Bool, oldWs::Vec1) where
{Vec1<:AbstractVector{Float64}, F<:Function, Particle,
Vec2<:AbstractVector{Particle}}
Vec2<:AbstractVector{Particle}, ParticleScratch}
for j in eachindex(zetas)
@inbounds lws[j] = lG(p, zetas[j], pScratch)
end
Expand Down
200 changes: 111 additions & 89 deletions src/parallel.jl
Original file line number Diff line number Diff line change
@@ -1,103 +1,116 @@
@inline function _iotaParallel!(array::Vector{Int64})
Threads.@threads for i in 1:length(array)
@inbounds array[i] = i
@inline function _iotaParallel!(array::Vector{Int64}, N::Int64, nthreads::Int64,
Nperthread::Int64)
Threads.@threads for i in 1:nthreads
start::Int64 = Nperthread * (i - 1) + 1
finish::Int64 = start + Nperthread - 1
for i = start:finish
@inbounds array[i] = i
end
end
end

@inline function _cpeHelper(smcio::SMCIO)
Threads.@threads for i = 1:smcio.nthreads
@inbounds localAs::SubVectorI64 = smcio.internal.localAs[i]
@inbounds localEves::SubVectorI64 = smcio.internal.localEves[i]
_setEves!(localEves, smcio.internal.oldEves, localAs)
ip::_SMCInternalParallel = smcio.internal.parallel
oldEves::Vector{Int64} = smcio.internal.oldEves
@inbounds localAs::SubVectorI64 = ip.localAs[i]
@inbounds localEves::SubVectorI64 = ip.localEves[i]
_setEves!(localEves, oldEves, localAs)
end
end

@inline function _copyParticlesEves!(model::SMCModel, smcio::SMCIO)
@inline function _copyParticlesEves!(smcio::SMCIO{Particle}) where Particle
Threads.@threads for i = 1:smcio.nthreads
@inbounds localZetaAncs = smcio.internal.localZetaAncs[i]
@inbounds localAs = smcio.internal.localAs[i]
@inbounds localOldEves = smcio.internal.localOldEves[i]
@inbounds localEves = smcio.internal.localEves[i]
_copyParticles!(localZetaAncs, smcio.zetas, localAs)
@inbounds localOldEves .= localEves
ip::_SMCInternalParallel = smcio.internal.parallel
zetas::Vector{Particle} = smcio.zetas
@inbounds localZetaAncs::SubVector{Particle} = ip.localZetaAncs[i]
@inbounds localAs::SubVectorI64 = ip.localAs[i]
@inbounds localOldEves::SubVectorI64 = ip.localOldEves[i]
@inbounds localEves::SubVectorI64 = ip.localEves[i]
_copyParticles!(localZetaAncs, zetas, localAs)
localOldEves .= localEves
end
_cpeHelper(smcio)
end

@inline function _MutateWeightHelper(smcio::SMCIO)
@inline function _MutateWeightHelper1(p::Int64, smcio::SMCIO)
ip::_SMCInternalParallel= smcio.internal.parallel
maxlw::Float64 = maximum(ip.maxlws)
smcio.internal.maxlw = maxlw

for i = 1:smcio.nthreads
@inbounds tmp::Float64 = exp(ip.maxlws[i] - maxlw)
@inbounds ip.partialSums[i] *= tmp
@inbounds ip.partialSumSqs[i] *= tmp * tmp
end

smcio.internal.sws = sum(ip.partialSums)
smcio.internal.mws = smcio.internal.sws / smcio.N
@inbounds smcio.esses[p] = smcio.internal.mws * smcio.internal.mws *
smcio.N / sum(ip.partialSumSqs)
end

@inline function _MutateWeightHelper2(smcio::SMCIO)
Threads.@threads for i = 1:smcio.nthreads
@inbounds localWs = smcio.internal.localWs[i]
@inbounds G::Float64 = exp(smcio.internal.maxlws[i] -
smcio.internal.maxlw) / smcio.internal.mws
localWs .*= G
ip::_SMCInternalParallel = smcio.internal.parallel
@inbounds G::Float64 = exp(ip.maxlws[i] - smcio.internal.maxlw) /
smcio.internal.mws
@inbounds ip.localWs[i] .*= G
end
end

@inline function _MutateWeight!(p::Int64, model::SMCModel,
smcio::SMCIO{Particle}, ref::Union{Void, Particle} = nothing) where Particle
nthreads::Int64 = smcio.nthreads
pScratches::Vector{model.pScratch} = smcio.internal.particleScratches

Threads.@threads for i = 1:nthreads
@inbounds localZetas = smcio.internal.localZetas[i]
@inbounds localZetaAncs = smcio.internal.localZetaAncs[i]
@inbounds localLogWs = smcio.internal.localLogWs[i]
@inbounds localWs = smcio.internal.localWs[i]
smcio::SMCIO{Particle, ParticleScratch},
ref::Union{Void, Particle} = nothing) where {Particle, ParticleScratch}
Threads.@threads for i = 1:smcio.nthreads
ip::_SMCInternalParallel{Particle, ParticleScratch} = smcio.internal.parallel
@inbounds localZetas::SubVector{Particle} = ip.localZetas[i]
@inbounds localZetaAncs::SubVector{Particle} = ip.localZetaAncs[i]
@inbounds localLogWs::SubVectorF64 = ip.localLogWs[i]
@inbounds localWs::SubVectorF64 = ip.localWs[i]
@inbounds pScratch::ParticleScratch = ip.particleScratches[i]
engine::SMCRNG = getSMCRNG()

if i == 1 && ref != nothing
@inbounds _mutateParticles!(localZetas, engine, p, model.M!,
localZetaAncs, pScratches[i], ref)
_mutateParticles!(localZetas, engine, p, model.M!, localZetaAncs,
pScratch, ref)
else
@inbounds _mutateParticles!(localZetas, engine, p, model.M!,
localZetaAncs, pScratches[i])
_mutateParticles!(localZetas, engine, p, model.M!, localZetaAncs,
pScratch)
end
@inbounds _logWeightParticles!(localLogWs, p, model.lG, localZetas,
pScratches[i], p == 1 || smcio.resample[p-1], localWs)
@inbounds noresample::Bool = p == 1 || smcio.resample[p-1]
_logWeightParticles!(localLogWs, p, model.lG, localZetas, pScratch,
noresample, localWs)

localmaxlw::Float64 = maximum(localLogWs)
@inbounds smcio.internal.maxlws[i] = localmaxlw
@inbounds localWs .= exp.(localLogWs .- localmaxlw)
@inbounds smcio.internal.partialSums[i] = sum(localWs)
@inbounds smcio.internal.partialSumSqs[i] = mapreduce(x -> x*x, +, localWs)
@inbounds ip.maxlws[i] = localmaxlw
localWs .= exp.(localLogWs .- localmaxlw)
@inbounds ip.partialSums[i] = sum(localWs)
@inbounds ip.partialSumSqs[i] = mapreduce(x -> x * x, +, localWs)
end

maxlw::Float64 = maximum(smcio.internal.maxlws)
smcio.internal.maxlw = maxlw

for i = 1:nthreads
@inbounds tmp::Float64 = exp(smcio.internal.maxlws[i] - maxlw)
@inbounds smcio.internal.partialSums[i] *= tmp
@inbounds smcio.internal.partialSumSqs[i] *= tmp * tmp
end

smcio.internal.sws = sum(smcio.internal.partialSums)
smcio.internal.mws = smcio.internal.sws / smcio.N
@inbounds smcio.esses[p] = smcio.internal.mws * smcio.internal.mws *
smcio.N / sum(smcio.internal.partialSumSqs)

_MutateWeightHelper(smcio)
_MutateWeightHelper1(p, smcio)
_MutateWeightHelper2(smcio)
end

@inline function _resampleHelper(smcio::SMCIO, conditional::Bool)
Ns::Vector{Int64} = smcio.internal.Ns
NsPartial::Vector{Int64} = smcio.internal.NsPartial
Nperthread::Int64 = smcio.internal.Nperthread
Threads.@threads for i = 1:smcio.nthreads
@inbounds if Ns[i] > 0
@inbounds localScratch1 = smcio.internal.localScratch1s[i]
@inbounds localWs = smcio.internal.localWs[i]
ip::_SMCInternalParallel = smcio.internal.parallel
@inbounds if ip.Ns[i] > 0
@inbounds localScratch1 = ip.localScratch1s[i]
@inbounds localWs = ip.localWs[i]
@inbounds localN = ip.Ns[i]
engine::SMCRNG = getSMCRNG()
start::Int64 = Nperthread * (i - 1)
@inbounds offset::Int64 = i == 1 ? 0 : NsPartial[i-1]
start::Int64 = ip.Nperthread * (i - 1)
@inbounds offset::Int64 = i == 1 ? 0 : ip.NsPartial[i-1]
if i == 1 && conditional
@inbounds smcio.internal.as[1] = 1
@inbounds if Ns[i] > 1
@inbounds sampleSerial!(smcio.internal.as, engine, localWs, Ns[i]-1,
if localN > 1
sampleSerial!(smcio.internal.as, engine, localWs, localN - 1,
smcio.internal.scratch2, localScratch1, offset + 1, start)
end
else
@inbounds sampleSerial!(smcio.internal.as, engine, localWs, Ns[i],
sampleSerial!(smcio.internal.as, engine, localWs, localN,
smcio.internal.scratch2, localScratch1, offset, start)
end
end
Expand All @@ -107,37 +120,50 @@ end
@inline function _resampleParallel!(smcio::SMCIO, p::Int64, conditional::Bool)
if p != smcio.n-1 && smcio.esses[p] > smcio.essThreshold
smcio.resample[p] = false
_iotaParallel!(smcio.internal.as)
_iotaParallel!(smcio.internal.as, smcio.N, smcio.nthreads,
smcio.internal.parallel.Nperthread)
return
end
@inbounds smcio.resample[p] = true
smcio.internal.nresamples += 1

Ns = smcio.internal.Ns
NsPartial = smcio.internal.NsPartial
partialSums = smcio.internal.partialSums
ip::_SMCInternalParallel = smcio.internal.parallel
Ns::Vector{Int64} = ip.Ns
NsPartial::Vector{Int64} = ip.NsPartial
partialSums::Vector{Float64} = ip.partialSums
partialSums ./= smcio.internal.sws
if conditional
@inbounds sampleMultinomial!(smcio.N-1, partialSums, Ns, getSMCRNG())
sampleMultinomial!(smcio.N-1, partialSums, Ns, getSMCRNG())
@inbounds Ns[1] += 1
else
@inbounds sampleMultinomial!(smcio.N, partialSums, Ns, getSMCRNG())
sampleMultinomial!(smcio.N, partialSums, Ns, getSMCRNG())
end

cumsum!(NsPartial,Ns)
_resampleHelper(smcio, conditional)
end

@inline function _Vhat1Parallel(smcio::SMCIO)
@inline function _Vhat1ParallelHelper(smcio::SMCIO)
N::Int64 = smcio.N
eves::Vector{Int64} = smcio.eves
ws::Vector{Float64} = smcio.ws
vs::Vector{Float64} = smcio.internal.scratch1

v::Float64 = 0.0
for t = 1:smcio.nthreads
@inbounds v += vs[t]
end
q::Int64 = 1 + smcio.internal.nresamples
return 1 - (1-v)*(N/(N-1))^q
end

nthreads::Int64 = smcio.nthreads
Nperthread::Int64 = smcio.internal.Nperthread
vs = smcio.internal.scratch1
Threads.@threads for t = 1:nthreads
@inline function _Vhat1Parallel(smcio::SMCIO)
# N::Int64 = smcio.N
# vs::Vector{Float64} = smcio.internal.scratch1
Threads.@threads for t = 1:smcio.nthreads
N::Int64 = smcio.N
vs::Vector{Float64} = smcio.internal.scratch1
Nperthread::Int64 = smcio.internal.parallel.Nperthread
eves::Vector{Int64} = smcio.eves
ws::Vector{Float64} = smcio.ws
i::Int64 = searchsortedfirst(eves, Nperthread * (t - 1) + 1)
finish::Int64 = searchsortedlast(eves, Nperthread * t)
vLocal::Float64 = 0.0
Expand All @@ -153,22 +179,17 @@ end
end
vs[t] = vLocal
end
v::Float64 = 0.0
for t = 1:nthreads
@inbounds v += vs[t]
end
return 1 - (1-v)*(N/(N-1))^q
return _Vhat1ParallelHelper(smcio)
end

@inline function _intermediateOutputParallel!(smcio::SMCIO{Particle},
p::Int64) where Particle
Threads.@threads for i = 1:smcio.nthreads
_copyParticles!(smcio.internal.localAllZetas[p][i], smcio.internal.localZetas[i])
@inbounds smcio.internal.localAllWs[p][i] .= smcio.internal.localWs[i]
@inbounds smcio.internal.localAllEves[p][i] .= smcio.internal.localEves[i]
if p < smcio.n
@inbounds smcio.internal.localAllAs[p][i] .= smcio.internal.localAs[i]
end
ip::_SMCInternalParallel{Particle} = smcio.internal.parallel
@inbounds _copyParticles!(ip.localAllZetas[p][i], ip.localZetas[i])
@inbounds ip.localAllWs[p][i] .= ip.localWs[i]
@inbounds ip.localAllEves[p][i] .= ip.localEves[i]
@inbounds p < smcio.n && (ip.localAllAs[p][i] .= ip.localAs[i])
end
end

Expand All @@ -178,10 +199,11 @@ end
refout::Union{Void, Vector{Particle}} = nothing) where Particle
smcio.internal.nresamples = 0
lZ::Float64 = 0.0
_iotaParallel!(smcio.eves)
_iotaParallel!(smcio.eves, smcio.N, smcio.nthreads,
smcio.internal.parallel.Nperthread)

for p = 1:smcio.n
p > 1 && _copyParticlesEves!(model, smcio)
p > 1 && _copyParticlesEves!(smcio)
if ref == nothing
_MutateWeight!(p, model, smcio)
else
Expand Down
8 changes: 4 additions & 4 deletions src/serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ end
ws = smcio.ws
as = smcio.internal.as
engine = getSMCRNG()
pScratches = smcio.internal.particleScratches
pScratch = smcio.internal.particleScratch
smcio.internal.nresamples = 0

logZhats = smcio.logZhats
Expand All @@ -78,12 +78,12 @@ end
_setEves!(smcio.eves, smcio.internal.oldEves, as)
end
if ref == nothing
_mutateParticles!(zetas, engine, p, model.M!, zetaAncs, pScratches[1])
_mutateParticles!(zetas, engine, p, model.M!, zetaAncs, pScratch)
else
_mutateParticles!(zetas, engine, p, model.M!, zetaAncs, pScratches[1], ref[p])
_mutateParticles!(zetas, engine, p, model.M!, zetaAncs, pScratch, ref[p])
end

_logWeightParticles!(lws, p, model.lG, zetas, pScratches[1],
_logWeightParticles!(lws, p, model.lG, zetas, pScratch,
p == 1 || smcio.resample[p-1], ws)

smcio.internal.maxlw = maximum(lws)
Expand Down

0 comments on commit 658e475

Please sign in to comment.