In [1]:
import Pkg
Pkg.activate(".")

[32m[1m  Activating[22m[39m project at `~/Developer/DistributedStreams`


In [2]:
using Distributed

In [3]:
addprocs(2)

2-element Vector{Int64}:
 2
 3

In [4]:
@everywhere using Base: @kwdef
@everywhere using DistributedStreams, DistributedArrays

In [5]:
@everywhere @enum MessageType begin
    start
    stop
    started
    stopped
    failed
end

@everywhere @kwdef struct ControlMessage
    message_type::MessageType
    target::Int64
    func_f::Union{Function, Nothing}
    func_in::Union{RemoteChannel, Nothing}
    func_out::Union{RemoteChannel, Nothing}
end

In [7]:
@everywhere function launch_sentinel(;workers=[2], verbose=false, buffer_size=32, timeout=1)

    distributed_control = DArray([
        @spawnat p [(worker = p, flag = Ref(false))]
        for p in workers
    ])

    function remote_worker(entries, results, control)
        # list of active workers
        active_workers = Dict{Int64}{DArray}()
        
        # controller used to modify the behaviour of a running worker, e.g.
        # shut it down gracefully
        local_control = only(localpart(control))

        # check for any failed workers -- if failures did occur, report them as
        # a `failed` type message and remove them from the active_workers list
        @async while true
            println("A")
            # check if should stop checking 
            if local_control.flag[]  # Shutdown flag raised
                if verbose
                    println("Sentinel on $(local_control.worker) is shutting down")
                end
                break
            end
            println("B")
            current_worker_list = workers()
            # println("C")
            # println("current_worker_list=$(current_worker_list)")
            # for w in keys(active_workers)
            #     println("Checking: if $(w) is still alive")
            #     if !(w in current_worker_list)
            #         @async put!(results, ControlMessage(
            #             message_type=failed,
            #             target=w,
            #             func_f=nothing,
            #             func_in=nothing,
            #             func_out=nothing
            #         ))
            #         delete!(active_workers, w)
            #         println("Worker $(w) died")
            #     end
            # end

            # we don't need to check all the time
            sleep(timeout)
        end
        
        @async while true
            println("active_workers=$(active_workers)")
            # take data from remote channel asynchronously
            t = @async take!(entries)
            # Gard against hanging `take!` calls by periodically checking if the
            # worker is flagged to be shut down. timedwait is slow, so we run
            # this in async mode. `local_control.safe` can be used to skip this
            # check entirely.
            @async while true
                # introduce timeout which will shut down the worker with
                # `local_control.flag[] == true`
                if timedwait(()->istaskdone(t), timeout) == :ok
                    break
                end
                # ALL CODE ENTERING HERE => TIMEDWAIT TIMED OUT
                if local_control.flag[]  # Shutdown flag raised
                    if verbose
                        println("Sentinel on $(local_control.worker) is shutting down")
                    end
                    sleep(2*timeout) # give everything time to quit
                    return
                end
            end
            println("a")
            # process messages in order
            message = fetch(t)
            println("a")
            if message.message_type == start
                println("Start instruction for $(message.target)")
                control = launch_consumer(
                    message.func_f, message.func_in, message.func_out;
                    workers=[message.target], verbose=true, buffer_size=32, timeout=1,
                    start_safe=false
                )
                active_workers[message.target] = control
                @async put!(results, ControlMessage(
                    message_type=started,
                    target=message.target,
                    func_f=nothing,
                    func_in=nothing,
                    func_out=nothing
                ))
            elseif message.message_type == stop
                println("Stop instruction for $(message.target)")
                if message.target in keys(active_workers)
                    make_unsafe!(active_workers[message.target]; workers=[message.target])
                    stop_workers!(active_workers[message.target]; workers=[message.target])
                    @async put!(results, ControlMessage(
                        message_type=stopped,
                        target=message.target,
                        func_f=nothing,
                        func_in=nothing,
                        func_out=nothing
                    ))
                end
            else
                # all other message types ignored by putting them directly into the
                # output channel
                @async put!(results, message)
            end
            println("c")
            sleep(timeout)
        end
    end

    control_messages  = RemoteChannel(()->Channel{ControlMessage}(buffer_size), 1)
    control_responses = RemoteChannel(()->Channel{ControlMessage}(buffer_size), 1)

    for p in workers
        remote_do(
            remote_worker, p,
            control_messages, control_responses, distributed_control
        )
    end

    return control_messages, control_responses, distributed_control
end

In [8]:
control_messages, control_responses, distributed_control = launch_sentinel(;
    workers=[2], verbose=true, buffer_size=32, timeout=1
)

      From worker 2:	A
      From worker 2:	B
      From worker 2:	active_workers=Dict{Int64, DArray}()


(RemoteChannel{Channel{ControlMessage}}(1, 1, 35), RemoteChannel{Channel{ControlMessage}}(1, 1, 36), @NamedTuple{worker::Int64, flag::Base.RefValue{Bool}}[(worker = 2, flag = Base.RefValue{Bool}(false))])

      From worker 2:	a


In [23]:
distributed_control

1-element DArray{@NamedTuple{worker::Int64, flag::Base.RefValue{Bool}}, 1, Vector{@NamedTuple{worker::Int64, flag::Base.RefValue{Bool}}}}:
 (worker = 2, flag = Base.RefValue{Bool}(false))

In [24]:
ch_in = RemoteChannel(()->Channel{Int64}(32), 1)
ch_out = RemoteChannel(()->Channel{Int64}(32), 1)

@everywhere function test_fn(i)
    println("hi there, I'm running on pid=$(myid())")
    sleep(2)
    return i+1
end

In [25]:
m = ControlMessage(
    message_type=start,
    target=3,
    func_f=test_fn,
    func_in=ch_in,
    func_out=ch_out
)

ControlMessage(start, 3, test_fn, RemoteChannel{Channel{Int64}}(1, 1, 100), RemoteChannel{Channel{Int64}}(1, 1, 101))

In [26]:
put!(control_messages, m)

RemoteChannel{Channel{ControlMessage}}(1, 1, 92)

      From worker 2:	a
      From worker 2:	Start instruction for 3
      From worker 2:	c
      From worker 2:	active_workers=Dict{Int64, DArray}(3 => @NamedTuple{worker::Int64, safe::Base.RefValue{Bool}, flag::Base.RefValue{Bool}}[(worker = 3, safe = Base.RefValue{Bool}(false), flag = Base.RefValue{Bool}(false))])
      From worker 2:	a


In [27]:
take!(control_responses)

ControlMessage(started, 3, nothing, nothing, nothing)

In [28]:
put!(ch_in, 10)

RemoteChannel{Channel{Int64}}(1, 1, 100)

      From worker 3:	hi there, I'm running on pid=3


In [29]:
take!(ch_out)

11

In [30]:
rmprocs(3)

Task (done) @0x00007fd2ebdafb70

In [31]:
t = @async take!(control_messages)

Task (runnable) @0x00007fd2ebb2d910

In [33]:
t

Task (runnable) @0x00007fd2ebb2d910