SlowActors
took the following steps to integrate with Actors
:
using Reexport
@reexport using Actors
I the case of SlowActors
this is Mailbox
:
struct Mailbox
box::Deque
lock::ReentrantLock
A::Union{Nothing,SlowActor}
end
You then can create a new concrete Link{Yourtype}
type:
newLink() = Link(Mailbox(Deque{Any}(), ReentrantLock(), nothing),
myid(), :local)
Your actor creation primitive, e.g. spawn
must return that type too:
function spawn(bhv::Func; sticky=false)
lk = Link(
Mailbox(Deque{Any}(), ReentrantLock(),
SlowActor(nothing, sticky, _ACT())),
myid(), :local
)
lk.chn.A.act.bhv = bhv
lk.chn.A.act.self = lk
end
export newLink, spawn
Actors.send!(lk::Link{Mailbox}, msg::Msg) = (push!(lk.chn, msg); _start_on_send(lk))
Actors.send!(lk::Link{Mailbox}, msg...) = (push!(lk.chn, msg); _start_on_send(lk))
Then you can send messages between actors using the Actors
interface. Still some further primitives are missing, e.g. self()
and become!
/become
. You get those and much more if you plug in the messaging protocol of Actors
.
The _ACT
variable contains status information for each actor such as the behavior function, the actor link, init and exit functions, actor ties ...
Therefore on actor creation you create also _ACT()
. As shown above, you must set the fields bhv
and self
with the behavior and the actor link respectively:
...
lk.chn.A.act.bhv = bhv
lk.chn.A.act.self = lk
end
When your actor task starts, it puts a reference to its _ACT
variable into task_local_storage
:
function _act(mbx::Mailbox)
task_local_storage("_ACT", mbx.A.act)
magic = hash(rand(Int))
while true
msg = lock(mbx.lock) do
isempty(mbx.box) ?
magic : popfirst!(mbx.box)
end
msg == magic && break
msg isa Actors.Exit && break
onmessage(mbx.A.act, msg)
end
end
If a message of type Msg
arrives, onmessage
provides a finite state machine to process it. To call onmessage(A::_ACT, msg)
enables the Actors
messaging protocol.
You can extend Msg
with new message types or create other message types. In that case you might want to extend Actors.onmessage
to handle those other types if you don't want the standard behavior to pass it as last argument to the behavior function.
The Actors
API works with receive!
to enable easy communication with actors. You enable receive!
on your concrete link type if you extend three Base functions with your Link{Yourtype}
parameter Yourtype
, e.g.:
Base.isready(mbx::Mailbox) = !isempty(mbx)
function Base.fetch(mbx::Mailbox) # this waits for a message
timedwait(60, pollint=0.01) do
!isempty(mbx)
end == :ok ?
lock(()->first(mbx.box), mbx.lock) :
:timed_out
end
Base.take!(mbx::Mailbox) = popfirst!(mbx)