We have a list of states - one for each process, and we want to do M measurements of states in total and write these measurements to a file. The M measurements are divided into nw processes doing M_min+1 measurements while np-nw processes do M_min measurements.

In this minimal example we let each measurement be a number myid()x10+rand() and a sleep(2) between each measurement. This is collected in a RemoteChannel and this channel is printed to STOUT instead of saving it to disk.

In [1]:
@everywhere using Distributions
@everywhere using Base.Test
@everywhere using StatsBase
@everywhere using BenchmarkTools
using MCMCDiagnostics
cd("../Source/")
@everywhere include("types.jl")
@everywhere include("functions_neighbors.jl")
@everywhere include("functions_types.jl")
@everywhere include("functions_symmetric_energy.jl")
@everywhere include("functions_mc.jl")
@everywhere include("functions_thermalization.jl")
@everywhere include("functions_observables.jl")
include("functions_plots_and_files.jl")
cd("../Notebooks/")
using Plots
pyplot()

Plots.PyPlotBackend()

In [2]:
@everywhere function intensive(Δt::Int64)
    r = √(2)
    for i = 2:Δt
        r = mod(√(2)*r+√(7), 1)
    end
    return r
end

In [3]:
@everywhere function produceMeasurement(M::Int64, Δt::Int64, r_chan::RemoteChannel{Channel{Float64}})
    put!(r_chan, myid()*10+intensive(Δt))
    for i = 2:M
        r = intensive(Δt)
        put!(r_chan, myid()*10+r)
    end
    return
end

In [4]:
function emptyChannel(r_chan::RemoteChannel{Channel{Float64}})
    while isready(r_chan)
        println(take!(r_chan))
    end
    return
end

emptyChannel (generic function with 1 method)

In [5]:
function produceMeasurementMaster(M::Int64, Δt::Int64, r_chan::RemoteChannel{Channel{Float64}})
    r = intensive(Δt)
    println("$(myid()*10+r)")
    for i = 2:M
        #sleep(floor(Int64, Δt/2))
        r = intensive(Δt)
        println("$(myid()*10+r)")
        emptyChannel(r_chan)
        flush(STDOUT)
    end
    return
end

produceMeasurementMaster (generic function with 1 method)

In [16]:
function collection(r_chan::RemoteChannel{Channel{Float64}})
    while true
        ψ = take!(r_chan)
        println(ψ)
    end
end

function measurements(M::Int64, Δt::Int64)
    # Setup storage channel for parallel processes
    r_chan = RemoteChannel(()->Channel{Float64}(M))
    
     # Splitting the problem into np sub-problems.
    np = nprocs()-1
    # Minimum amount of work pr. process
    M_min = Int(floor(M/np))
    # Number of workers doing +1 extra work
    nw = M%np
    
    println("Minimum work: $(M_min)\n")
    @async collection(r_chan)
    @sync begin
        for i = 1:nw
            @async remotecall_fetch(produceMeasurement, i+1, M_min+1, Δt, r_chan)
        end
        for i = 1:np-nw
            @async remotecall_fetch(produceMeasurement, i+1+nw, M_min, Δt, r_chan)
        end
    end
    return
end

measurements (generic function with 1 method)

In [41]:
2^12

4096

In [17]:
@time measurements(30, 59600000)

Minimum work: 15

30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
30.14176452568755
20.14176452568755
 30.190745 seconds (52.84 k allocations: 2.982 MiB, 0.07% gc time)


In [46]:
@time ψ_chan = RemoteChannel(()->Channel{State}(300))

  0.074757 seconds (6.87 k allocations: 376.488 KiB)


RemoteChannel{Channel{State}}(1, 1, 140937)

In [45]:
@benchmark ψ_chan = RemoteChannel(()->Channel{State}(300))

BenchmarkTools.Trial: 
  memory estimate:  1.34 KiB
  allocs estimate:  36
  --------------
  minimum time:     21.671 μs (0.00% GC)
  median time:      31.049 μs (0.00% GC)
  mean time:        31.065 μs (0.00% GC)
  maximum time:     1.227 ms (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

In [50]:
ψ_res = Array{State,1}(300);

In [7]:
ψ_chan = RemoteChannel(()->Channel{State}(300))

RemoteChannel{Channel{State}}(1, 1, 44)

In [8]:
close(ψ_chan)



In [None]:
wait(ψ_chan)