-
Notifications
You must be signed in to change notification settings - Fork 214
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adapt NodeIPC module from cardano-sl
This takes the cardano-sl NodeIPC code and splits the general NodeJS child_process IPC protocol out from the Daedalus/Cardano specific protocol.
- Loading branch information
Showing
2 changed files
with
221 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
{-# LANGUAGE DeriveGeneric #-} | ||
{-# LANGUAGE LambdaCase #-} | ||
{-# LANGUAGE OverloadedStrings #-} | ||
{-# LANGUAGE RankNTypes #-} | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
{-# LANGUAGE StandaloneDeriving #-} | ||
|
||
module Cardano.Wallet.DaedalusIPC | ||
( daedalusIPC | ||
) where | ||
|
||
import Prelude | ||
|
||
|
||
import Control.Concurrent | ||
( threadDelay ) | ||
import Control.Concurrent.Async | ||
( race_ ) | ||
import Control.Concurrent.MVar | ||
( MVar, newEmptyMVar, putMVar, readMVar ) | ||
import Control.Exception | ||
( IOException, catch, tryJust ) | ||
import Control.Monad | ||
( forever, void, when ) | ||
import Data.Aeson | ||
( FromJSON (..) | ||
, ToJSON (..) | ||
, defaultOptions | ||
, eitherDecode | ||
, encode | ||
, genericParseJSON | ||
, genericToEncoding | ||
) | ||
import Data.Aeson.Types | ||
( Options, SumEncoding (ObjectWithSingleField), sumEncoding ) | ||
import Data.Binary.Get | ||
( getWord32le, getWord64le, runGet ) | ||
import Data.Binary.Put | ||
( putLazyByteString, putWord32le, putWord64le, runPut ) | ||
import Data.Maybe | ||
( fromMaybe ) | ||
import Data.Text | ||
( Text ) | ||
import Data.Word | ||
( Word32, Word64 ) | ||
import Distribution.System | ||
( OS (Windows), buildOS ) | ||
import GHC.Generics | ||
( Generic ) | ||
import GHC.IO.Handle.FD | ||
( fdToHandle ) | ||
import System.Environment | ||
( lookupEnv ) | ||
import System.IO | ||
( hFlush, hGetLine, hSetNewlineMode, noNewlineTranslation ) | ||
import System.IO | ||
( Handle, hPutStrLn, stderr, stdout ) | ||
import System.IO.Error | ||
( IOError, isEOFError ) | ||
import Text.Read | ||
( readEither ) | ||
|
||
import qualified Data.ByteString.Lazy as BL | ||
import qualified Data.ByteString.Lazy.Char8 as L8 | ||
import qualified Data.Text as T | ||
|
||
---------------------------------------------------------------------------- | ||
-- Daedalus <-> Wallet child process port discovery protocol | ||
|
||
data MsgIn = QueryPort | Ping | ||
deriving (Show, Eq, Generic) | ||
data MsgOut = Started | Pong | ReplyPort Int | ParseError Text | ||
deriving (Show, Eq, Generic) | ||
|
||
aesonOpts :: Options | ||
aesonOpts = defaultOptions { sumEncoding = ObjectWithSingleField } | ||
|
||
instance FromJSON MsgIn where | ||
parseJSON = genericParseJSON aesonOpts | ||
instance ToJSON MsgOut where | ||
toEncoding = genericToEncoding aesonOpts | ||
|
||
daedalusIPC :: Int -> IO (Either Text (Maybe (IO ()))) | ||
daedalusIPC port = withNodeChannel msg err action | ||
where | ||
-- How to respond to an incoming message | ||
msg QueryPort = pure (Just (ReplyPort port)) | ||
msg Ping = pure (Just Pong) | ||
|
||
-- How to respond to an incoming message that couldn't be parsed | ||
err = Just . ParseError | ||
|
||
-- What to do in context of withNodeChannel | ||
action send = do | ||
void $ send Started | ||
threadDelay maxBound | ||
|
||
---------------------------------------------------------------------------- | ||
-- NodeJS child_process IPC protocol | ||
-- https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options | ||
|
||
withNodeChannel | ||
:: (FromJSON msgin, ToJSON msgout) | ||
=> (msgin -> IO (Maybe msgout)) -- ^ Incoming message handler | ||
-> (Text -> Maybe msgout) -- ^ Parse error handler | ||
-> ((msgout -> IO ()) -> IO ()) -- ^ Action to run | ||
-> IO (Either Text (Maybe (IO ()))) | ||
withNodeChannel onMsg errHandler handleMsg = lookupNodeChannel >>= \case | ||
Right (Just handle) -> pure . Right . Just $ do | ||
chan <- newEmptyMVar | ||
let ipc = ipcListener handle onMsg errHandler chan | ||
action' = handleMsg (putMVar chan) | ||
race_ action' ipc | ||
Right Nothing -> pure $ Right Nothing | ||
Left err -> pure (Left err) | ||
|
||
-- | Parse the NODE_CHANNEL_FD variable, if it's set, and convert to a | ||
-- 'System.IO.Handle'. | ||
lookupNodeChannel :: IO (Either Text (Maybe Handle)) | ||
lookupNodeChannel = (fromMaybe "" <$> lookupEnv "NODE_CHANNEL_FD") >>= \case | ||
"" -> pure (Right Nothing) | ||
var -> case readEither var of | ||
Left err -> pure . Left $ "unable to parse NODE_CHANNEL_FD: " <> T.pack err | ||
Right fd -> tryJust handleBadFd (Just <$> fdToHandle fd) | ||
where | ||
handleBadFd :: IOException -> Maybe Text | ||
handleBadFd = Just . T.pack . show | ||
|
||
ipcListener | ||
:: forall msgin msgout. (FromJSON msgin, ToJSON msgout) | ||
=> Handle | ||
-> (msgin -> IO (Maybe msgout)) | ||
-> (Text -> Maybe msgout) | ||
-> MVar msgout | ||
-> IO () | ||
ipcListener handle onMsg errHandler chan = do | ||
hSetNewlineMode handle noNewlineTranslation | ||
catch (race_ replyLoop sendLoop) onIOError | ||
where | ||
send :: msgout -> IO () | ||
send = sendMessage handle . encode | ||
|
||
maybeSend :: Maybe msgout -> IO () | ||
maybeSend = maybe (pure ()) send | ||
|
||
replyLoop :: IO () | ||
replyLoop = forever $ do | ||
line <- readMessage handle | ||
case eitherDecode line of | ||
Left err -> handleErr (T.pack err) | ||
Right msg -> handleMsg msg | ||
|
||
sendLoop :: IO () | ||
sendLoop = forever (readMVar chan >>= send) | ||
|
||
handleErr :: Text -> IO () | ||
handleErr = maybeSend . errHandler | ||
|
||
handleMsg :: msgin -> IO () | ||
handleMsg msg = onMsg msg >>= maybeSend | ||
|
||
onIOError :: IOError -> IO () | ||
onIOError err = do | ||
hPutStrLn stderr $ "exception caught in NodeIPC: " <> (show err) | ||
when (isEOFError err) $ hPutStrLn stderr "it's an eof" | ||
hFlush stdout | ||
|
||
readMessage :: Handle -> IO BL.ByteString | ||
readMessage | ||
| buildOS == Windows = windowsReadMessage | ||
| otherwise = posixReadMessage | ||
|
||
windowsReadMessage :: Handle -> IO BL.ByteString | ||
windowsReadMessage handle = do | ||
_int1 <- readInt32 handle | ||
_int2 <- readInt32 handle | ||
size <- readInt64 handle | ||
blob <- BL.hGet handle $ fromIntegral size | ||
-- logInfo $ "int is: " <> (show [_int1, _int2]) <> " and blob is: " <> (show blob) | ||
return blob | ||
where | ||
readInt64 :: Handle -> IO Word64 | ||
readInt64 hnd = do | ||
bs <- BL.hGet hnd 8 | ||
pure $ runGet getWord64le bs | ||
|
||
readInt32 :: Handle -> IO Word32 | ||
readInt32 hnd = do | ||
bs <- BL.hGet hnd 4 | ||
pure $ runGet getWord32le bs | ||
|
||
posixReadMessage :: Handle -> IO BL.ByteString | ||
posixReadMessage = fmap L8.pack . hGetLine | ||
|
||
sendMessage :: Handle -> BL.ByteString -> IO () | ||
sendMessage handle msg = send handle msg >> hFlush handle | ||
where | ||
send | ||
| buildOS == Windows = sendWindowsMessage | ||
| otherwise = sendLinuxMessage | ||
|
||
sendWindowsMessage :: Handle -> BL.ByteString -> IO () | ||
sendWindowsMessage = sendWindowsMessage' 1 0 | ||
|
||
sendWindowsMessage' :: Word32 -> Word32 -> Handle -> BL.ByteString -> IO () | ||
sendWindowsMessage' int1 int2 handle blob = | ||
L8.hPut handle $ runPut $ mconcat parts | ||
where | ||
blob' = blob <> "\n" | ||
parts = | ||
[ putWord32le int1 | ||
, putWord32le int2 | ||
, putWord64le $ fromIntegral $ BL.length blob' | ||
, putLazyByteString blob' | ||
] | ||
|
||
sendLinuxMessage :: Handle -> BL.ByteString -> IO () | ||
sendLinuxMessage = L8.hPutStrLn |