# Distributed modeling with dynamic schedule
We demonstrate dynamic scheduling using 2 workers to model 8 shots across Marmousi.

In [1]:
using Distributed

In [2]:
addprocs(2)

2-element Array{Int64,1}:
 2
 3

In [3]:
@everywhere using DistributedArrays, DistributedJets, Jets, WaveFD, JetPackWaveFD, DistributedOperations, Schedulers, Random

In [4]:
v = read!("../02_marmousi_model_setup/marmousi_vp_20m_176x851.bin", Array{Float32}(undef,176,851));
dz,dx = 20.0,20.0
nz,nx = size(v)
@show dz,dx
@show nz,nx;

(dz, dx) = (20.0, 20.0)
(nz, nx) = (176, 851)


In [5]:
sx = range(0,length=8,stop=(851-1)*20)
nshots = length(sx)
@show nshots;

nshots = 8


## Broadcast the velocity (v) to workers

In [6]:
_v = bcast(v)

ArrayFutures with pids=[2, 3, 1] and type (176, 851)

## Build a list of keyword arguments to be passed to the modeling operator

In [7]:
kwargs = (reportinterval = 0, freqQ=5, srcfieldfile="", nsponge = 200)

(reportinterval = 0, freqQ = 5, srcfieldfile = "", nsponge = 200)

## Note on scratch space for temporary files
When dealing with multiple serialized nonlinear wavefields as in this example, we need to specify the location where scratch files will be written.

You may need to change this to point to a temporary directory available on your system.

In [8]:
@everywhere scratch = "/mnt/scratch"
@assert isdir(scratch)

## Define single shot function to be run by the Scheduler
Typically modeled data would be written to storage in this function, but we omit that detail here.

In [9]:
@everywhere function modelshot(isrc, sx, _v; kwargs...)
    nz,nx,dz,dx = 176,851,20.0,20.0   
    @info "modeling shot $(isrc) on $(gethostname()) with id $(myid())..."
    F = JopNlProp2DAcoIsoDenQ_DEO2_FDTD(;
        b = ones(Float32,nz,nx),
        nthreads = div(Sys.CPU_THREADS,2),
        isinterior = true,
        ntrec = 1101,
        dtrec = 0.002,
        dtmod = 0.001,
        dz = dz,
        dx = dx,
        wavelet = WaveletCausalRicker(f=5.0),
        sx = sx[isrc],
        sz = dz,
        rx = dx*[0:1:nx-1;],
        rz = 2*dz*ones(length(0:1:nx-1)),
        srcfieldfile = joinpath(scratch, "field-$isrc.$(randstring()).bin"),
        reportinterval=1000,
        kwargs...)
    
    d = F*_v
    @info "...done modeling shot $(isrc) on $(gethostname()) with id $(myid())"
    @info "extrema of shot $(isrc) is $(extrema(d))"

    # typically write to cloud storage here
    nothing
end

## Use cvxpmap to dynamically schedule tasks to workers
Note that you may see the dynamic nature of the scheduling in that the shot numbers might not necessarily map monotonically to the process identifiers of the workers. 

In [10]:
epmap(i->modelshot(i, sx, localpart(_v); kwargs...), 1:nshots)

┌ Info: running task 1 on process 2; 2 workers total; 7 tasks left in task-pool.
└ @ Schedulers /home/cvx/.julia/dev/Schedulers/src/Schedulers.jl:159
┌ Info: running task 2 on process 3; 2 workers total; 6 tasks left in task-pool.
└ @ Schedulers /home/cvx/.julia/dev/Schedulers/src/Schedulers.jl:159
[ Info: modeling shot 1 on cbox-wask-HB60rs with id 2...
[ Info: modeling shot 2 on cbox-wask-HB60rs with id 3...
[ Info: ...done modeling shot 1 on cbox-wask-HB60rs with id 2
[ Info: ...done modeling shot 2 on cbox-wask-HB60rs with id 3
[ Info: extrema of shot 1 is (-59.97956f0, 109.98762f0)
┌ Info: running task 3 on process 2; 2 workers total; 5 tasks left in task-pool.
└ @ Schedulers /home/cvx/.julia/dev/Schedulers/src/Schedulers.jl:159
[ Info: modeling shot 3 on cbox-wask-HB60rs with id 2...
[ Info: extrema of shot 2 is (-58.06025f0, 106.05499f0)
┌ Info: running task 4 on process 3; 2 workers total; 4 tasks left in task-pool.
└ @ Schedulers /home/cvx/.julia/dev/Schedulers/src/Schedulers.

## Remove workers

In [11]:
rmprocs(workers());