Skip to content

Commit

Permalink
Actually, Static constructor should not be exposed
Browse files Browse the repository at this point in the history
  • Loading branch information
edsko committed Aug 2, 2012
1 parent 8550105 commit 13ac8e7
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 41 deletions.
8 changes: 6 additions & 2 deletions distributed-process-azure/distributed-process-azure.cabal
Expand Up @@ -29,10 +29,14 @@ Library
distributed-process >= 0.2 && < 0.3,
binary >= 0.5 && < 0.6,
network-transport-tcp >= 0.2 && < 0.3,
optparse-applicative >= 0.2 && < 0.4
optparse-applicative >= 0.2 && < 0.4,
transformers >= 0.3 && < 0.4
Exposed-modules: Control.Distributed.Process.Backend.Azure,
Control.Distributed.Process.Backend.Azure.GenericMain
Extensions: ViewPatterns
Extensions: ViewPatterns,
RankNTypes,
ExistentialQuantification,
ScopedTypeVariables
ghc-options: -Wall
HS-Source-Dirs: src

Expand Down
@@ -1,3 +1,4 @@
{-# LANGUAGE TemplateHaskell #-}
module Control.Distributed.Process.Backend.Azure
( -- * Initialization
Backend(..)
Expand All @@ -13,13 +14,14 @@ import System.Environment (getEnv)
import System.FilePath ((</>), takeFileName)
import System.Environment.Executable (getExecutablePath)
import System.Posix.Types (Fd)
import Data.Binary (encode)
import Data.Binary (encode, decode)
import Data.Digest.Pure.MD5 (md5, MD5Digest)
import qualified Data.ByteString.Lazy as BSL (readFile)
import qualified Data.ByteString.Lazy.Char8 as BSLC (putStr)
import qualified Data.ByteString.Lazy as BSL (readFile, putStr)
import Data.Typeable (Typeable)
import Control.Applicative ((<$>))
import Control.Monad (void)
import Control.Exception (catches, Handler(Handler))
import Control.Monad.IO.Class (liftIO)

-- Azure
import Network.Azure.ServiceManagement
Expand Down Expand Up @@ -61,7 +63,23 @@ import qualified Network.SSH.Client.LibSSH2.ByteString.Lazy as SSHBS
import Control.Distributed.Process
( Closure
, Process
, Static
)
import Control.Distributed.Process.Closure
( remotable
, mkClosure
, cpBind
, SerializableDict(SerializableDict)
)
import Control.Distributed.Process.Serializable (Serializable)

encodeToStdout :: Serializable a => a -> Process ()
encodeToStdout = liftIO . BSL.putStr . encode

encodeToStdoutDict :: SerializableDict a -> a -> Process ()
encodeToStdoutDict SerializableDict = encodeToStdout

remotable ['encodeToStdoutDict]

-- | Azure backend
data Backend = Backend {
Expand All @@ -71,8 +89,8 @@ data Backend = Backend {
, copyToVM :: VirtualMachine -> IO ()
-- | Check the MD5 hash of the remote executable
, checkMD5 :: VirtualMachine -> IO Bool
-- | @runOnVM vm port p@ starts a CH node on port 'port' and runs 'p'
, runOnVM :: VirtualMachine -> String -> Closure (Process ()) -> IO ()
-- | @runOnVM vm port p bg@ starts a CH node on port 'port' and runs 'p'
, callOnVM :: forall a. Serializable a => Static (SerializableDict a) -> VirtualMachine -> String -> Closure (Process a) -> IO a
}

data AzureParameters = AzureParameters {
Expand Down Expand Up @@ -120,7 +138,7 @@ initializeBackend params = do
cloudServices = Azure.cloudServices setup
, copyToVM = apiCopyToVM params
, checkMD5 = apiCheckMD5 params
, runOnVM = apiRunOnVM params
, callOnVM = apiCallOnVM params
}

-- | Start a CH node on the given virtual machine
Expand All @@ -129,21 +147,24 @@ apiCopyToVM params vm =
void . withSSH2 params vm $ \fd s -> catchSshError s $
SSH.scpSendFile fd s 0o700 (azureSshLocalPath params) (azureSshRemotePath params)

-- | Start the executable on the remote machine
apiRunOnVM :: AzureParameters -> VirtualMachine -> String -> Closure (Process ()) -> IO ()
apiRunOnVM params vm port proc =
void . withSSH2 params vm $ \fd s -> do
let exe = "PATH=. " ++ azureSshRemotePath params
++ " onvm run "
++ " --host " ++ vmIpAddress vm
++ " --port " ++ port
++ " 2>&1"
(_, r) <- SSH.withChannel (SSH.openChannelSession s) id fd s $ \ch -> do
SSH.retryIfNeeded fd s $ SSH.channelExecute ch exe
SSHBS.writeChannel fd ch (encode proc)
SSH.channelSendEOF ch
SSHBS.readAllChannel fd ch
BSLC.putStr r
-- | Call a process on a VM
apiCallOnVM :: Serializable a => AzureParameters -> Static (SerializableDict a) -> VirtualMachine -> String -> Closure (Process a) -> IO a
apiCallOnVM params dict vm port proc =
withSSH2 params vm $ \fd s -> do
let exe = "PATH=. " ++ azureSshRemotePath params
++ " onvm run "
++ " --host " ++ vmIpAddress vm
++ " --port " ++ port
++ " 2>&1"
(_, r) <- SSH.withChannel (SSH.openChannelSession s) id fd s $ \ch -> do
SSH.retryIfNeeded fd s $ SSH.channelExecute ch exe
SSHBS.writeChannel fd ch (encode proc)
SSH.channelSendEOF ch
SSHBS.readAllChannel fd ch
return (decode r)
where
proc' :: Closure (Process ())
proc' = proc `cpBind` undefined

-- | Check the MD5 hash of the executable on the remote machine
apiCheckMD5 :: AzureParameters -> VirtualMachine -> IO Bool
Expand Down
@@ -1,16 +1,19 @@
{-# LANGUAGE TemplateHaskell #-}

import Control.Monad.IO.Class (liftIO)
import Control.Distributed.Process (Process)
import Control.Distributed.Process.Closure (remotable, mkClosure)
import Control.Distributed.Process.Backend.Azure.GenericMain (genericMain)
import Control.Distributed.Process (Process, ProcessId, getSelfPid)
import Control.Distributed.Process.Closure (remotable, mkClosure, sdictProcessId)
import Control.Distributed.Process.Backend.Azure.GenericMain
( genericMain
, ProcessPair(..)
)

cprint :: String -> Process ()
cprint = liftIO . putStrLn
getPid :: () -> Process ProcessId
getPid () = getSelfPid

remotable ['cprint]
remotable ['getPid]

main = genericMain __remoteTable $ \cmd ->
case cmd of
"hello" -> return $ $(mkClosure 'cprint) "Hi world!"
"hello" -> return $ ProcessPair ($(mkClosure 'getPid) ()) print sdictProcessId
_ -> error "unknown command"
@@ -1,5 +1,8 @@
-- | Generic main
module Control.Distributed.Process.Backend.Azure.GenericMain (genericMain) where
module Control.Distributed.Process.Backend.Azure.GenericMain
( genericMain
, ProcessPair(..)
) where

import Prelude hiding (catch)
import System.Exit (exitSuccess, exitFailure)
Expand All @@ -20,7 +23,7 @@ import Control.Distributed.Process.Backend.Azure
, cloudServices
, CloudService(cloudServiceName, cloudServiceVMs)
, VirtualMachine(vmName)
, Backend(copyToVM, checkMD5, runOnVM)
, Backend(copyToVM, checkMD5, callOnVM)
)
import qualified Network.SSH.Client.LibSSH2.Foreign as SSH
( initialize
Expand Down Expand Up @@ -49,16 +52,25 @@ import Control.Distributed.Process
, Closure
, Process
, unClosure
, Static
)
import Control.Distributed.Process.Node (newLocalNode, runProcess, initRemoteTable)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Closure (SerializableDict)
import Network.Transport.TCP (createTransport, defaultTCPParameters)

--------------------------------------------------------------------------------
-- Main --
--------------------------------------------------------------------------------

genericMain :: (RemoteTable -> RemoteTable) -- ^ Standard CH remote table
-> (String -> IO (Closure (Process ()))) -- ^ Closures to support in 'run'
data ProcessPair b = forall a. Serializable a => ProcessPair {
ppairRemote :: Closure (Process a)
, ppairLocal :: a -> IO b
, ppairDict :: Static (SerializableDict a)
}

genericMain :: (RemoteTable -> RemoteTable) -- ^ Standard CH remote table
-> (String -> IO (ProcessPair ())) -- ^ Closures to support in 'run'
-> IO ()
genericMain remoteTable cmds = do
_ <- SSH.initialize True
Expand Down Expand Up @@ -90,13 +102,15 @@ genericMain remoteTable cmds = do
then exitSuccess
else exitFailure
RunOn {} -> do
closure <- cmds (closureId cmd)
params <- azureParameters (azureOptions cmd) (Just (sshOptions cmd))
backend <- initializeBackend params
css <- cloudServices backend
procPair <- cmds (closureId cmd)
params <- azureParameters (azureOptions cmd) (Just (sshOptions cmd))
backend <- initializeBackend params
css <- cloudServices backend
forM_ (findTarget (target cmd) css) $ \vm -> do
putStr (vmName vm ++ ": ") >> hFlush stdout
runOnVM backend vm (remotePort cmd) closure
case procPair of
ProcessPair rProc lProc dict ->
callOnVM backend dict vm (remotePort cmd) rProc >>= lProc
OnVmCommand (vmCmd@OnVmRun {}) -> do
onVmRun (remoteTable initRemoteTable) (onVmIP vmCmd) (onVmPort vmCmd)
SSH.exit
Expand Down
2 changes: 1 addition & 1 deletion distributed-process/ChangeLog
@@ -1,6 +1,6 @@
2012-08-02 Edsko de Vries <edsko@well-typed.com> 0.2.2.1

* Expose the constructors of Closure and Static
* Expose the constructors of Closure
* Improved docs

2012-07-31 Edsko de Vries <edsko@well-typed.com> 0.2.2.0
Expand Down
2 changes: 1 addition & 1 deletion distributed-process/src/Control/Distributed/Process.hs
Expand Up @@ -64,7 +64,7 @@ module Control.Distributed.Process
, DiedReason(..)
-- * Closures
, Closure(..)
, Static(..)
, Static
, unClosure
, RemoteTable
-- * Logging
Expand Down

0 comments on commit 13ac8e7

Please sign in to comment.