In [1]:
using Distributed
addprocs(4)
using Interpolations, Optim, BenchmarkTools
const agrid = exp.(range(log(1), stop = log(200), length = 100 ))
const xgrid = 1:20
uc(c) = c > 0 ? c^(-2.5)/-2.5 : -Inf
u(a, ap, x) =  uc(a - ap + x)
maxap(a, x) = a + x
const beta = 0.96
const P = rand(length(xgrid), length(xgrid))
P .= P ./ sum(P, dims = 2); # markov chain

In [2]:
# Bellman step given evf and x
function bellman(evf, x)
    vf, pol = similar(evf), similar(evf)
    for i in eachindex(vf)
        a = agrid[i]
        obj = ap -> -(u(a, ap, x) + beta * evf(ap))
        upper = min(maxap(a, x), agrid[end])
        sol = optimize(obj, agrid[1], .99*upper)
        vf[i], pol[i] = -sol.minimum, sol.minimizer
    end
    return vf, pol
end

function expectation(bigvf, P)
   evf = similar(bigvf)
    for a_nx in 1:size(bigvf, 1)
        for x_nx in 1:size(P, 1)
        ev = 0.
            for xp_nx in 1:size(P, 2)
                ev += bigvf[a_nx, xp_nx] * P[x_nx, xp_nx]
            end
            evf[a_nx, x_nx] = ev
        end
    end
    return evf
end
        
        

expectation (generic function with 1 method)

In [3]:
    vfall = zeros(length(agrid), length(xgrid))
    polall = zeros(length(agrid), length(xgrid))
@btime for i in 1:100
    fill!(vfall, 0)
    fill!(polall, 0)
    vfe = expectation(vfall, P)
    for x_nx in eachindex(xgrid)
        vfold = interpolate(agrid, vfe[:, x_nx], SteffenMonotonicInterpolation())
        vf, pol = bellman(vfold, xgrid[x_nx])
        vfall[:, x_nx] = vf; polall[:, x_nx] = pol
    end
    end seconds = 2

  1.124 s (624100 allocations: 53.42 MiB)


In [4]:
    vfall2 = zeros(length(agrid), length(xgrid))
    polall = zeros(length(agrid), length(xgrid))
@btime for i in 1:100
    fill!(vfall2, 0)
    fill!(polall, 0)
    vfe = expectation(vfall2, P)
    Threads.@threads for x_nx in eachindex(xgrid)
        vfold = interpolate(agrid, vfe[:, x_nx], SteffenMonotonicInterpolation())
        vf, pol = bellman(vfold, xgrid[x_nx])
        vfall2[:, x_nx] = vf; polall[:, x_nx] = pol
    end
    end seconds = 1

  295.433 ms (596174 allocations: 50.86 MiB)


In [5]:
all(vfall .== vfall2) 

true

In [6]:
# serial as above
# multithreading. use @threads over x_nx
# pmap use pmap over bellman (use a caching pool? 
# remote channels. setup channels which accept evf and x and return a vf
# -- why is this better? less overhead?

In [7]:
@everywhere module DM
using Interpolations, Optim
const agrid = exp.(range(log(1), stop = log(200), length = 100 ))
const xgrid = 1:20
uc(c) = c > 0 ? c^(-2.5)/-2.5 : -Inf
u(a, ap, x) =  uc(a - ap + x)
maxap(a, x) = a + x
const beta = 0.96
const P = rand(length(xgrid), length(xgrid))
P .= P ./ sum(P, dims = 2) # markov chain
function bellman(evf, x)
    vf, pol = similar(evf), similar(evf)
    for i in eachindex(vf)
        a = agrid[i]
        obj = ap -> -(u(a, ap, x) + beta * evf(ap))
        upper = min(maxap(a, x), agrid[end])
        sol = optimize(obj, agrid[1], .99*upper)
        vf[i], pol[i] = -sol.minimum, sol.minimizer
    end
    return vf, pol
end

function expectation(bigvf, P)
   evf = similar(bigvf)
    for a_nx in 1:size(bigvf, 1)
        for x_nx in 1:size(P, 1)
        ev = 0.
            for xp_nx in 1:size(P, 2)
                ev += bigvf[a_nx, xp_nx] * P[x_nx, xp_nx]
            end
            evf[a_nx, x_nx] = ev
        end
    end
    return evf
end
       
end

In [8]:
vfall3 = zeros(length(DM.agrid), length(DM.xgrid))
polall = zeros(length(DM.agrid), length(DM.xgrid))
@btime for i in 1:100
    fill!(vfall3, 0)
    fill!(polall, 0)
    vfe = DM.expectation(vfall3, DM.P)
    vfolds = [interpolate(DM.agrid, vfe[:, x_nx], SteffenMonotonicInterpolation()) for x_nx in eachindex(DM.xgrid)]
    vfs_pols = pmap((vfold, x) -> DM.bellman(vfold, x), vfolds, DM.xgrid)
    for x_nx in eachindex(DM.xgrid)
        vfall3[:, x_nx] = vfs_pols[x_nx][1]; polall[:, x_nx] = vfs_pols[x_nx][2]
    end
    end seconds = 1

  569.456 ms (230813 allocations: 32.86 MiB)


In [9]:
all(vfall .== vfall3)

true

In [10]:
example_itp = interpolate(agrid, vfall3[:, 1], SteffenMonotonicInterpolation())
intype = Tuple{typeof(example_itp), Float64, Int}
outtype = Tuple{Vector{Float64}, Vector{Float64}, Int}
const jobs = RemoteChannel(()->Channel{intype}(32));
const results = RemoteChannel(()->Channel{outtype}(32));

In [11]:
@everywhere function do_work(jobs, results) # define work function everywhere
           while true
               evf, x, x_nx = take!(jobs)
               out = (DM.bellman(evf, x)..., x_nx)
               put!(results, out)
           end
       end

In [12]:
for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)
end

In [13]:
vfall4 = zeros(length(agrid), length(xgrid))
polall = zeros(length(agrid), length(xgrid))
@btime for i in 1:100
    fill!(vfall4, 0)
    fill!(polall, 0)
    vfe = expectation(vfall4, P)
    for x_nx in eachindex(xgrid)
        vfold = interpolate(agrid, vfe[:, x_nx], SteffenMonotonicInterpolation())
        put!(jobs, (vfold, xgrid[x_nx], x_nx))
    end
    ndone = 0
    while ndone < length(xgrid)
        vf, pol, x_nx = take!(results)
        vfall4[:, x_nx] = vf; polall[:, x_nx] = pol; ndone += 1
    end
    end seconds = 1

  625.244 ms (457580 allocations: 49.79 MiB)


In [14]:
all(vfall .== vfall4)

true

In [17]:
A = @btime bellman(example_itp, 1.) seconds = 1

  172.324 μs (303 allocations: 22.09 KiB)


([-0.784, -0.733205, -0.683302, -0.636186, -0.591328, -0.548648, -0.508132, -0.469845, -0.433622, -0.399572  …  -2.56121e-5, -2.24281e-5, -1.96389e-5, -1.71957e-5, -1.50557e-5, -1.31815e-5, -1.154e-5, -1.01025e-5, -8.84377e-6, -7.74156e-6], [1.0, 1.02345, 1.05256, 1.08021, 1.11393, 1.14605, 1.18285, 1.21966, 1.25952, 1.30261  …  61.9469, 65.3274, 68.8938, 72.6562, 76.6256, 80.8132, 85.231, 89.8917, 94.8087, 99.996])

In [19]:
B = @btime begin 
    put!(jobs, (example_itp, 1., 1)); take!(results)
    end seconds = 1

  431.280 μs (220 allocations: 24.95 KiB)


([-0.784, -0.733205, -0.683302, -0.636186, -0.591328, -0.548648, -0.508132, -0.469845, -0.433622, -0.399572  …  -2.56121e-5, -2.24281e-5, -1.96389e-5, -1.71957e-5, -1.50557e-5, -1.31815e-5, -1.154e-5, -1.01025e-5, -8.84377e-6, -7.74156e-6], [1.0, 1.02345, 1.05256, 1.08021, 1.11393, 1.14605, 1.18285, 1.21966, 1.25952, 1.30261  …  61.9469, 65.3274, 68.8938, 72.6562, 76.6256, 80.8132, 85.231, 89.8917, 94.8087, 99.996], 1)

Moral of the story: Distributed processing is not a good idea when your jobs take less than 1ms.