Skip to content

Commit

Permalink
spike adding cpSendChan
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Watson committed Feb 15, 2013
1 parent 6785222 commit 590ab79
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/Control/Distributed/Process.hs
@@ -1,3 +1,5 @@
{-# LANGUAGE ScopedTypeVariables #-}

{- | [Cloud Haskell] {- | [Cloud Haskell]
This is an implementation of Cloud Haskell, as described in This is an implementation of Cloud Haskell, as described in
Expand Down Expand Up @@ -179,6 +181,7 @@ import Control.Distributed.Process.Internal.Closure.BuiltIn
, splitCP , splitCP
, cpLink , cpLink
, cpSend , cpSend
, cpSendChan
, cpNewChan , cpNewChan
, cpDelay , cpDelay
) )
Expand Down Expand Up @@ -421,14 +424,14 @@ spawnChannel :: forall a. Typeable a => Static (SerializableDict a)
-> Closure (ReceivePort a -> Process ()) -> Closure (ReceivePort a -> Process ())
-> Process (SendPort a) -> Process (SendPort a)
spawnChannel dict nid proc = do spawnChannel dict nid proc = do
us <- getSelfPid (sp, rp) <- newChan
_ <- spawn nid (go us) _ <- spawn nid (go sp)
expect r <- receiveChan rp
where where
go :: ProcessId -> Closure (Process ()) go :: SendPort (SendPort a) -> Closure (Process ())
go pid = cpNewChan dict go chan = cpNewChan dict
`bindCP` `bindCP`
(cpSend (sdictSendPort dict) pid `splitCP` proc) (cpSendChan (sdictSendPort dict) chan `splitCP` proc)
`bindCP` `bindCP`
(idCP `closureCompose` staticClosure sndStatic) (idCP `closureCompose` staticClosure sndStatic)


Expand Down
25 changes: 25 additions & 0 deletions src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs
Expand Up @@ -19,6 +19,7 @@ module Control.Distributed.Process.Internal.Closure.BuiltIn
, cpLink , cpLink
, cpUnlink , cpUnlink
, cpSend , cpSend
, cpSendChan
, cpExpect , cpExpect
, cpNewChan , cpNewChan
-- * Support for some CH operations -- * Support for some CH operations
Expand Down Expand Up @@ -59,6 +60,7 @@ import Control.Distributed.Process.Internal.Primitives
, send , send
, expect , expect
, newChan , newChan
, sendChan
, monitor , monitor
, unmonitor , unmonitor
, match , match
Expand All @@ -83,6 +85,7 @@ remoteTable =
. registerStatic "$link" (toDynamic link) . registerStatic "$link" (toDynamic link)
. registerStatic "$unlink" (toDynamic unlink) . registerStatic "$unlink" (toDynamic unlink)
. registerStatic "$sendDict" (toDynamic (sendDict :: SerializableDict ANY -> ProcessId -> ANY -> Process ())) . registerStatic "$sendDict" (toDynamic (sendDict :: SerializableDict ANY -> ProcessId -> ANY -> Process ()))
. registerStatic "$sendChanDict" (toDynamic (sendChanDict :: SerializableDict ANY -> (SendPort ANY) -> ANY -> Process ()))
. registerStatic "$expectDict" (toDynamic (expectDict :: SerializableDict ANY -> Process ANY)) . registerStatic "$expectDict" (toDynamic (expectDict :: SerializableDict ANY -> Process ANY))
. registerStatic "$newChanDict" (toDynamic (newChanDict :: SerializableDict ANY -> Process (SendPort ANY, ReceivePort ANY))) . registerStatic "$newChanDict" (toDynamic (newChanDict :: SerializableDict ANY -> Process (SendPort ANY, ReceivePort ANY)))
. registerStatic "$cpSplit" (toDynamic (cpSplit :: (ANY1 -> Process ANY3) -> (ANY2 -> Process ANY4) -> (ANY1, ANY2) -> Process (ANY3, ANY4))) . registerStatic "$cpSplit" (toDynamic (cpSplit :: (ANY1 -> Process ANY3) -> (ANY2 -> Process ANY4) -> (ANY1, ANY2) -> Process (ANY3, ANY4)))
Expand All @@ -98,6 +101,12 @@ remoteTable =
sendDict :: forall a. SerializableDict a -> ProcessId -> a -> Process () sendDict :: forall a. SerializableDict a -> ProcessId -> a -> Process ()
sendDict SerializableDict = send sendDict SerializableDict = send


sendChanDict :: forall a . SerializableDict a
-> SendPort a
-> a
-> Process ()
sendChanDict SerializableDict = sendChan

expectDict :: forall a. SerializableDict a -> Process a expectDict :: forall a. SerializableDict a -> Process a
expectDict SerializableDict = expect expectDict SerializableDict = expect


Expand Down Expand Up @@ -201,6 +210,9 @@ bindCP x f = bindProcessStatic `closureApplyStatic` x `closureApply` f
decodeProcessIdStatic :: Static (ByteString -> ProcessId) decodeProcessIdStatic :: Static (ByteString -> ProcessId)
decodeProcessIdStatic = staticLabel "$decodeProcessId" decodeProcessIdStatic = staticLabel "$decodeProcessId"


decodeSendPortStatic :: forall a . Typeable a => Static (ByteString -> SendPort a)
decodeSendPortStatic = staticLabel "$sdictSendPort_"

-- | 'CP' version of 'link' -- | 'CP' version of 'link'
cpLink :: ProcessId -> Closure (Process ()) cpLink :: ProcessId -> Closure (Process ())
cpLink = closure (linkStatic `staticCompose` decodeProcessIdStatic) . encode cpLink = closure (linkStatic `staticCompose` decodeProcessIdStatic) . encode
Expand Down Expand Up @@ -229,6 +241,19 @@ cpSend dict pid = closure decoder (encode pid)
=> Static (SerializableDict a -> ProcessId -> a -> Process ()) => Static (SerializableDict a -> ProcessId -> a -> Process ())
sendDictStatic = staticLabel "$sendDict" sendDictStatic = staticLabel "$sendDict"


cpSendChan :: forall a. Typeable a
=> Static (SerializableDict a) -> SendPort a -> CP a ()
cpSendChan dict sp = closure decoder (encode sp)
where
decoder :: Static (ByteString -> a -> Process ())
decoder = (sendChanDictStatic `staticApply` dict)
`staticCompose`
decodeSendPortStatic

sendChanDictStatic :: Typeable a
=> Static (SerializableDict a -> SendPort a -> a -> Process ())
sendChanDictStatic = staticLabel "$sendChanDict"

-- | 'CP' version of 'expect' -- | 'CP' version of 'expect'
cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a)
cpExpect dict = staticClosure (expectDictStatic `staticApply` dict) cpExpect dict = staticClosure (expectDictStatic `staticApply` dict)
Expand Down

0 comments on commit 590ab79

Please sign in to comment.