Skip to content

Commit

Permalink
spike adding cpSendChan
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Watson committed Feb 15, 2013
1 parent 6785222 commit 590ab79
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/Control/Distributed/Process.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE ScopedTypeVariables #-}

{- | [Cloud Haskell]
This is an implementation of Cloud Haskell, as described in
Expand Down Expand Up @@ -179,6 +181,7 @@ import Control.Distributed.Process.Internal.Closure.BuiltIn
, splitCP
, cpLink
, cpSend
, cpSendChan
, cpNewChan
, cpDelay
)
Expand Down Expand Up @@ -421,14 +424,14 @@ spawnChannel :: forall a. Typeable a => Static (SerializableDict a)
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel dict nid proc = do
us <- getSelfPid
_ <- spawn nid (go us)
expect
(sp, rp) <- newChan
_ <- spawn nid (go sp)
r <- receiveChan rp
where
go :: ProcessId -> Closure (Process ())
go pid = cpNewChan dict
go :: SendPort (SendPort a) -> Closure (Process ())
go chan = cpNewChan dict
`bindCP`
(cpSend (sdictSendPort dict) pid `splitCP` proc)
(cpSendChan (sdictSendPort dict) chan `splitCP` proc)
`bindCP`
(idCP `closureCompose` staticClosure sndStatic)

Expand Down
25 changes: 25 additions & 0 deletions src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Control.Distributed.Process.Internal.Closure.BuiltIn
, cpLink
, cpUnlink
, cpSend
, cpSendChan
, cpExpect
, cpNewChan
-- * Support for some CH operations
Expand Down Expand Up @@ -59,6 +60,7 @@ import Control.Distributed.Process.Internal.Primitives
, send
, expect
, newChan
, sendChan
, monitor
, unmonitor
, match
Expand All @@ -83,6 +85,7 @@ remoteTable =
. registerStatic "$link" (toDynamic link)
. registerStatic "$unlink" (toDynamic unlink)
. registerStatic "$sendDict" (toDynamic (sendDict :: SerializableDict ANY -> ProcessId -> ANY -> Process ()))
. registerStatic "$sendChanDict" (toDynamic (sendChanDict :: SerializableDict ANY -> (SendPort ANY) -> ANY -> Process ()))
. registerStatic "$expectDict" (toDynamic (expectDict :: SerializableDict ANY -> Process ANY))
. registerStatic "$newChanDict" (toDynamic (newChanDict :: SerializableDict ANY -> Process (SendPort ANY, ReceivePort ANY)))
. registerStatic "$cpSplit" (toDynamic (cpSplit :: (ANY1 -> Process ANY3) -> (ANY2 -> Process ANY4) -> (ANY1, ANY2) -> Process (ANY3, ANY4)))
Expand All @@ -98,6 +101,12 @@ remoteTable =
sendDict :: forall a. SerializableDict a -> ProcessId -> a -> Process ()
sendDict SerializableDict = send

sendChanDict :: forall a . SerializableDict a
-> SendPort a
-> a
-> Process ()
sendChanDict SerializableDict = sendChan

expectDict :: forall a. SerializableDict a -> Process a
expectDict SerializableDict = expect

Expand Down Expand Up @@ -201,6 +210,9 @@ bindCP x f = bindProcessStatic `closureApplyStatic` x `closureApply` f
decodeProcessIdStatic :: Static (ByteString -> ProcessId)
decodeProcessIdStatic = staticLabel "$decodeProcessId"

decodeSendPortStatic :: forall a . Typeable a => Static (ByteString -> SendPort a)
decodeSendPortStatic = staticLabel "$sdictSendPort_"

-- | 'CP' version of 'link'
cpLink :: ProcessId -> Closure (Process ())
cpLink = closure (linkStatic `staticCompose` decodeProcessIdStatic) . encode
Expand Down Expand Up @@ -229,6 +241,19 @@ cpSend dict pid = closure decoder (encode pid)
=> Static (SerializableDict a -> ProcessId -> a -> Process ())
sendDictStatic = staticLabel "$sendDict"

cpSendChan :: forall a. Typeable a
=> Static (SerializableDict a) -> SendPort a -> CP a ()
cpSendChan dict sp = closure decoder (encode sp)
where
decoder :: Static (ByteString -> a -> Process ())
decoder = (sendChanDictStatic `staticApply` dict)
`staticCompose`
decodeSendPortStatic

sendChanDictStatic :: Typeable a
=> Static (SerializableDict a -> SendPort a -> a -> Process ())
sendChanDictStatic = staticLabel "$sendChanDict"

-- | 'CP' version of 'expect'
cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a)
cpExpect dict = staticClosure (expectDictStatic `staticApply` dict)
Expand Down

0 comments on commit 590ab79

Please sign in to comment.