From 590ab79493df03c951172f252f9675eeb9c724a3 Mon Sep 17 00:00:00 2001 From: Tim Watson Date: Fri, 15 Feb 2013 12:13:45 +0000 Subject: [PATCH] spike adding cpSendChan --- src/Control/Distributed/Process.hs | 15 ++++++----- .../Process/Internal/Closure/BuiltIn.hs | 25 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/Control/Distributed/Process.hs b/src/Control/Distributed/Process.hs index f2340e84..863ff5cf 100644 --- a/src/Control/Distributed/Process.hs +++ b/src/Control/Distributed/Process.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE ScopedTypeVariables #-} + {- | [Cloud Haskell] This is an implementation of Cloud Haskell, as described in @@ -179,6 +181,7 @@ import Control.Distributed.Process.Internal.Closure.BuiltIn , splitCP , cpLink , cpSend + , cpSendChan , cpNewChan , cpDelay ) @@ -421,14 +424,14 @@ spawnChannel :: forall a. Typeable a => Static (SerializableDict a) -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) spawnChannel dict nid proc = do - us <- getSelfPid - _ <- spawn nid (go us) - expect + (sp, rp) <- newChan + _ <- spawn nid (go sp) + r <- receiveChan rp where - go :: ProcessId -> Closure (Process ()) - go pid = cpNewChan dict + go :: SendPort (SendPort a) -> Closure (Process ()) + go chan = cpNewChan dict `bindCP` - (cpSend (sdictSendPort dict) pid `splitCP` proc) + (cpSendChan (sdictSendPort dict) chan `splitCP` proc) `bindCP` (idCP `closureCompose` staticClosure sndStatic) diff --git a/src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs b/src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs index 763c9d07..08b829d9 100644 --- a/src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs +++ b/src/Control/Distributed/Process/Internal/Closure/BuiltIn.hs @@ -19,6 +19,7 @@ module Control.Distributed.Process.Internal.Closure.BuiltIn , cpLink , cpUnlink , cpSend + , cpSendChan , cpExpect , cpNewChan -- * Support for some CH operations @@ -59,6 +60,7 @@ import Control.Distributed.Process.Internal.Primitives , send , expect , newChan + , sendChan , monitor , unmonitor , match @@ -83,6 +85,7 @@ remoteTable = . registerStatic "$link" (toDynamic link) . registerStatic "$unlink" (toDynamic unlink) . registerStatic "$sendDict" (toDynamic (sendDict :: SerializableDict ANY -> ProcessId -> ANY -> Process ())) + . registerStatic "$sendChanDict" (toDynamic (sendChanDict :: SerializableDict ANY -> (SendPort ANY) -> ANY -> Process ())) . registerStatic "$expectDict" (toDynamic (expectDict :: SerializableDict ANY -> Process ANY)) . registerStatic "$newChanDict" (toDynamic (newChanDict :: SerializableDict ANY -> Process (SendPort ANY, ReceivePort ANY))) . registerStatic "$cpSplit" (toDynamic (cpSplit :: (ANY1 -> Process ANY3) -> (ANY2 -> Process ANY4) -> (ANY1, ANY2) -> Process (ANY3, ANY4))) @@ -98,6 +101,12 @@ remoteTable = sendDict :: forall a. SerializableDict a -> ProcessId -> a -> Process () sendDict SerializableDict = send + sendChanDict :: forall a . SerializableDict a + -> SendPort a + -> a + -> Process () + sendChanDict SerializableDict = sendChan + expectDict :: forall a. SerializableDict a -> Process a expectDict SerializableDict = expect @@ -201,6 +210,9 @@ bindCP x f = bindProcessStatic `closureApplyStatic` x `closureApply` f decodeProcessIdStatic :: Static (ByteString -> ProcessId) decodeProcessIdStatic = staticLabel "$decodeProcessId" +decodeSendPortStatic :: forall a . Typeable a => Static (ByteString -> SendPort a) +decodeSendPortStatic = staticLabel "$sdictSendPort_" + -- | 'CP' version of 'link' cpLink :: ProcessId -> Closure (Process ()) cpLink = closure (linkStatic `staticCompose` decodeProcessIdStatic) . encode @@ -229,6 +241,19 @@ cpSend dict pid = closure decoder (encode pid) => Static (SerializableDict a -> ProcessId -> a -> Process ()) sendDictStatic = staticLabel "$sendDict" +cpSendChan :: forall a. Typeable a + => Static (SerializableDict a) -> SendPort a -> CP a () +cpSendChan dict sp = closure decoder (encode sp) + where + decoder :: Static (ByteString -> a -> Process ()) + decoder = (sendChanDictStatic `staticApply` dict) + `staticCompose` + decodeSendPortStatic + + sendChanDictStatic :: Typeable a + => Static (SerializableDict a -> SendPort a -> a -> Process ()) + sendChanDictStatic = staticLabel "$sendChanDict" + -- | 'CP' version of 'expect' cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a) cpExpect dict = staticClosure (expectDictStatic `staticApply` dict)