-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enhanced actor example to support multi-buffers
- Loading branch information
Lukas Convent
committed
Jun 24, 2017
1 parent
104ae03
commit 93f2559
Showing
3 changed files
with
196 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
--- start of standard stuff --- | ||
data Maybe X = nothing | just X | ||
data Pair X Y = pair X Y | ||
|
||
reverse' : {List X -> List X -> List X} | ||
reverse' [] ys = ys | ||
reverse' (x :: xs) ys = reverse' xs (x :: ys) | ||
|
||
reverse : {List X -> List X} | ||
reverse xs = reverse' xs [] | ||
|
||
map : {{X -> Y} -> List X -> List Y} | ||
map f [] = [] | ||
map f (x :: xs) = f x :: map f xs | ||
|
||
print : {String -> [Console]Unit} | ||
print s = map ouch s; unit | ||
--- end of standard stuff --- | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Queue interface and fast queue implementation | ||
-------------------------------------------------------------------------------- | ||
|
||
interface Queue S = enqueue : S -> Unit | ||
| dequeue : Maybe S | ||
|
||
-- Execute a computation along with a queue as state (generalisation of | ||
-- zipQueue from coop-factored.fk) | ||
data ZipQueue S = zipq (List S) (List S) | ||
|
||
runZipQueue: {{[Queue S]X} -> ZipQueue S -> Pair X (ZipQueue S)} | ||
runZipQueue t q = runZipQueue' t! q | ||
|
||
runZipQueue': {<Queue S>X -> ZipQueue S -> Pair X (ZipQueue S)} | ||
runZipQueue' <enqueue q -> k> (zipq ps qs) = runZipQueue' (k unit) (zipq ps (q :: qs)) | ||
runZipQueue' <dequeue -> k> (zipq [] []) = runZipQueue' (k nothing) (zipq [] []) | ||
runZipQueue' <dequeue -> k> (zipq [] qs) = runZipQueue' (k dequeue!) (zipq (reverse qs) []) | ||
runZipQueue' <dequeue -> k> (zipq (p :: ps) qs) = runZipQueue' (k (just p)) (zipq ps qs) | ||
runZipQueue' x (zipq ps qs) = pair x (zipq ps qs) | ||
|
||
evalZipQueue: {{[Queue S]X} -> ZipQueue S -> X} | ||
evalZipQueue t q = case (runZipQueue t q) { (pair x _) -> x } | ||
|
||
execZipQueue: {{[Queue S]X} -> ZipQueue S -> ZipQueue S} | ||
execZipQueue t q = case (runZipQueue t q) { (pair _ q) -> q } | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Definitions of interfaces, data types | ||
-------------------------------------------------------------------------------- | ||
|
||
-- interface Co [C] = fork : {[C | Co [C|]]Unit} -> Unit | ||
interface Co = fork : {[Co]Unit} -> Unit | ||
| yield : Unit | ||
|
||
-- Single item mailbox | ||
data Mailbox X = mbox (Ref (ZipQueue X)) | ||
|
||
-- interface Actor X [A] = spawn Y: {[A | Actor Y [A|]]Unit} -> Mailbox Y | ||
interface Actor X = spawn Y: {[Actor Y]Unit} -> Mailbox Y | ||
| self: Mailbox X | ||
| recv: X | ||
| send Y: Mailbox Y -> Y -> Unit | ||
|
||
data WithSender X Y = withSender (Mailbox X) Y | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Simple example actors | ||
-------------------------------------------------------------------------------- | ||
|
||
doubleActor: {[Actor (WithSender Int Int)]Unit} | ||
doubleActor! = case recv! { (withSender sender inp) -> send sender (inp + inp) } | ||
|
||
doubleSpawningActor: {[Actor Int [Console], Console]Unit} | ||
doubleSpawningActor! = let doubler = spawn doubleActor in | ||
send doubler (withSender self! 3); | ||
case recv! { 6 -> print "you truly know how to double" | ||
| _ -> print "naw" } | ||
|
||
-- This one would only work with a multi-buffer, not our single-Mailbox. | ||
divConqActor: {[Actor Int [Console], Console]Unit} | ||
divConqActor! = let doublerA = spawn doubleActor in | ||
let doublerB = spawn doubleActor in | ||
send doublerA (withSender self! 1); | ||
send doublerB (withSender self! 2); | ||
print "calculating (1+1) + (2+2)\n"; | ||
case (recv! + recv!) { 6 -> print "yay got 6" | ||
| _ -> print "naw" } | ||
|
||
soliloquistActor: {[Actor Int [Console], Console]Unit} | ||
soliloquistActor! = let me = self! in | ||
send me 42; | ||
case recv! { 42 -> print "ouh how unexpected" | ||
| _ -> print "what did I send again?" } | ||
|
||
nicePrintActor: {[Actor String [Console], Console]Unit} | ||
nicePrintActor! = let inp = recv! in | ||
print "Hey, a message came in: "; print inp | ||
|
||
writingActor: {[Actor Int [Console], Console]Unit} | ||
writingActor! = let you = spawn nicePrintActor in | ||
send you "important msg" | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Turn an actor step into a process step | ||
-------------------------------------------------------------------------------- | ||
|
||
-- step: {Mailbox X -> <Actor X [E|RefState, Co[E|RefState]]>Z -> [E|RefState, Co[E|RefState]]Z} | ||
step: {Mailbox X -> <Actor X [RefState, Co[RefState]]>Unit -> [RefState, Co [RefState]]Unit} | ||
step me <self -> k> = step me (k me) | ||
step (mbox me') <recv -> k> = case (runZipQueue {dequeue!} (read me')) | ||
{ (pair nothing _) -> yield!; | ||
step (mbox me') (k recv!) | ||
| (pair (just x) q) -> write me' q; | ||
step (mbox me') (k x) } | ||
step me <send (mbox you') x -> k> = let q = read you' in | ||
case (execZipQueue {enqueue x} q) | ||
{ q' -> write you' q'; | ||
step me (k unit) } | ||
step me <spawn other -> k> = let you = mbox (new (zipq [] [])) in | ||
fork {step you other!}; | ||
step me (k you) | ||
step me x = x | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Two simple process-executers for up to two processes | ||
-------------------------------------------------------------------------------- | ||
|
||
-- Execute up to two processes. If the 1st one forks, replace the 2nd by fork | ||
duoCoExec: {<Co>Unit -> <Co>Unit -> Unit} | ||
duoCoExec <fork e -> k> _ = duoCoExec (k unit) e! | ||
duoCoExec <fork e -> k> <_> = duoCoExec (k unit) e! | ||
duoCoExec <yield -> _> <fork e' -> k'> = duoCoExec (k' unit) e'! | ||
duoCoExec <yield -> k> <yield -> k'> = duoCoExec (k unit) (k' unit) -- possibly deadlock | ||
duoCoExec <yield -> k> unit = duoCoExec (k unit) unit -- possibly deadlock | ||
duoCoExec unit <fork e' -> k'> = duoCoExec (k' unit) e'! | ||
duoCoExec unit <yield -> k'> = duoCoExec (k' unit) unit -- possibly deadlock | ||
duoCoExec unit unit = unit -- done | ||
|
||
-- Same as duoCoExec, but only the 1st process is executed, the 2nd is frozen | ||
duoSuspCoExec: {<Co>Unit -> Unit} | ||
duoSuspCoExec <fork e -> k> = duoSuspCoExec' (k unit) e | ||
duoSuspCoExec <yield -> k> = unit | ||
duoSuspCoExec _ = unit | ||
|
||
duoSuspCoExec': {<Co>Unit -> {[Co]Unit} -> Unit} | ||
duoSuspCoExec' <fork e -> k> _ = duoSuspCoExec' (k unit) e | ||
duoSuspCoExec' <yield -> k> e' = duoSuspCoExec' e'! {k unit} | ||
duoSuspCoExec' unit e' = duoSuspCoExec e'! | ||
|
||
-------------------------------------------------------------------------------- | ||
-- This is the process executer from coop-factored.fk, using a queue to handle | ||
-- multiple processes | ||
-------------------------------------------------------------------------------- | ||
|
||
data Proc = proc {[Queue Proc]Unit} | ||
|
||
pushProc : {{[Queue Proc]Unit} -> [Queue Proc]Unit} | ||
pushProc p = enqueue (proc p) | ||
|
||
popProc : {[Queue Proc]Maybe Unit} | ||
popProc! = case dequeue! { (just (proc x)) -> just x! | ||
| nothing -> nothing } | ||
|
||
popProcs : {[Queue Proc]Unit} | ||
popProcs! = case popProc! { (just unit) -> popProcs! | ||
| nothing -> unit } | ||
|
||
-- Serialise a queue-tree and return a "queue-manager" in the following way: | ||
-- Given a forkable computation, | ||
-- 1) yield ==translate==> push continuation into queue, pop rest of list | ||
-- 2) fork ==translate==> push fork into queue, then carry on | ||
-- 3) unit ==translate==> pop rest of list | ||
-- roundRobin : {<Co [R | Queue Proc]>Unit -> [R | Queue Proc]Unit} | ||
roundRobin : {<Co [Queue Proc]>Unit -> [Queue Proc]Unit} | ||
roundRobin <yield -> k> = pushProc {roundRobin (k unit)}; popProcs! | ||
roundRobin <fork p -> k> = pushProc {roundRobin p!}; roundRobin (k unit) | ||
roundRobin unit = popProcs! | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Test doubleSpawningActor example | ||
-------------------------------------------------------------------------------- | ||
|
||
main: {[Console, RefState]Unit} | ||
main! = let me = mbox (new (zipq [] [])) in | ||
runZipQueue' (roundRobin (step me divConqActor!)) (zipq [] []); unit |