Skip to content

Commit

Permalink
Merge pull request #388 from input-output-hk/rvl/144/daedalus-ipc
Browse files Browse the repository at this point in the history
Adapt NodeIPC module from cardano-sl
  • Loading branch information
KtorZ committed Jun 14, 2019
2 parents d89bed5 + 5413101 commit d9b3553
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 13 deletions.
1 change: 1 addition & 0 deletions cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ executable cardano-wallet
base
, aeson
, aeson-pretty
, async
, bytestring
, cardano-wallet-cli
, cardano-wallet-core
Expand Down
10 changes: 9 additions & 1 deletion exe/wallet/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ import Cardano.Wallet.Api.Types
, WalletPostData (..)
, WalletPutData (..)
)
import Cardano.Wallet.DaedalusIPC
( daedalusIPC )
import Cardano.Wallet.HttpBridge.Compatibility
( HttpBridge, block0 )
import Cardano.Wallet.HttpBridge.Environment
Expand All @@ -78,6 +80,8 @@ import Control.Arrow
( second )
import Control.Concurrent
( threadDelay )
import Control.Concurrent.Async
( race_ )
import Control.Monad
( when )
import Data.Aeson
Expand Down Expand Up @@ -410,7 +414,11 @@ execHttpBridge args _ = do
wallet <- newWalletLayer @_ @(HttpBridge n) tracer block0 db nw tl
let logStartup port = logInfo tracer $
"Wallet backend server listening on: " <> toText port
Server.start logStartup walletListen wallet
Server.withListeningSocket walletListen $ \(port, socket) -> do
let settings = Server.mkWarpSettings logStartup port
tracer' <- appendName "DaedalusIPC" tracer
race_ (daedalusIPC tracer' port) $
Server.startOnSocket settings socket wallet

-- | Generate a random mnemonic of the given size 'n' (n = number of words),
-- and print it to stdout.
Expand Down
3 changes: 3 additions & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ library
-Werror
build-depends:
aeson
, async
, base
, basement
, binary
Expand Down Expand Up @@ -65,6 +66,7 @@ library
, text-class
, time
, transformers
, unordered-containers
, vector
, wai
, warp
Expand All @@ -76,6 +78,7 @@ library
Cardano.Wallet.Api
Cardano.Wallet.Api.Server
Cardano.Wallet.Api.Types
Cardano.Wallet.DaedalusIPC
Cardano.Wallet.DB
Cardano.Wallet.DB.MVar
Cardano.Wallet.DB.Sqlite
Expand Down
36 changes: 25 additions & 11 deletions lib/core/src/Cardano/Wallet/Api/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
-- endpoints reachable through HTTP.

module Cardano.Wallet.Api.Server
( start
, Listen (..)
( Listen (..)
, start
, startOnSocket
, mkWarpSettings
, withListeningSocket
) where

import Prelude
Expand Down Expand Up @@ -159,14 +162,11 @@ start
-> Listen
-> WalletLayer (SeqState t) t
-> IO ()
start onStartup portOption wl =
void $ withListeningSocket portOption $ \(port, socket) -> do
let settings = Warp.defaultSettings
& Warp.setPort port
& Warp.setBeforeMainLoop (onStartup port)
startOnSocket settings socket wl
pure port
start onStartup portOpt wl =
void $ withListeningSocket portOpt $ \(port, socket) ->
startOnSocket (mkWarpSettings onStartup port) socket wl

-- | Start the application server, using the given settings and a bound socket.
startOnSocket
:: forall t. (TxId t, KeyToAddress t, EncodeAddress t, DecodeAddress t)
=> Warp.Settings
Expand All @@ -184,12 +184,26 @@ startOnSocket settings socket wl = Warp.runSettingsSocket settings socket
application :: Application
application = serve (Proxy @("v2" :> Api t)) server

-- | Create warp server settings.
mkWarpSettings
:: (Warp.Port -> IO ())
-- ^ Function to run after the listening socket is bound, just before
-- Warp enters its event loop.
-> Warp.Port
-- ^ Port that socket will be listening on.
-> Warp.Settings
mkWarpSettings onStartup port = Warp.defaultSettings
& Warp.setPort port
& Warp.setBeforeMainLoop (onStartup port)

-- | Run an action with a TCP socket bound to a port specified by the `Listen`
-- parameter.
withListeningSocket
:: Listen
-> ((Port, Socket) -> IO Port)
-> IO Port
-- ^ Whether to listen on a given port, or random port.
-> ((Port, Socket) -> IO ())
-- ^ Action to run with listening socket.
-> IO ()
withListeningSocket portOpt = bracket acquire release
where
acquire = case portOpt of
Expand Down
261 changes: 261 additions & 0 deletions lib/core/src/Cardano/Wallet/DaedalusIPC.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
-- Copyright: © 2018-2019 IOHK
-- License: MIT
--
-- Provides a mechanism for Daedalus to discover what port the cardano-wallet
-- server is listening on.
--
-- See <https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options>
-- for more information about the message protocol.

module Cardano.Wallet.DaedalusIPC
( daedalusIPC
) where

import Prelude

import Cardano.BM.Trace
( Trace, logError, logInfo, logNotice )
import Control.Concurrent
( threadDelay )
import Control.Concurrent.Async
( concurrently_, race )
import Control.Concurrent.MVar
( MVar, newEmptyMVar, putMVar, takeMVar )
import Control.Exception
( IOException, catch, tryJust )
import Control.Monad
( forever )
import Data.Aeson
( FromJSON (..)
, ToJSON (..)
, Value (..)
, eitherDecode
, encode
, object
, withObject
, (.:)
, (.=)
)
import Data.Bifunctor
( first )
import Data.Binary.Get
( getWord32le, getWord64le, runGet )
import Data.Binary.Put
( putLazyByteString, putWord32le, putWord64le, runPut )
import Data.Functor
( ($>) )
import Data.Maybe
( fromMaybe )
import Data.Text
( Text )
import Data.Word
( Word32, Word64 )
import Fmt
( fmt, (+||), (||+) )
import GHC.IO.Handle.FD
( fdToHandle )
import System.Environment
( lookupEnv )
import System.Info
( arch )
import System.IO
( Handle, hFlush, hGetLine, hSetNewlineMode, noNewlineTranslation )
import System.IO.Error
( IOError )
import Text.Read
( readEither )

import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as L8
import qualified Data.Text as T

----------------------------------------------------------------------------
-- Daedalus <-> Wallet child process port discovery protocol

data MsgIn = QueryPort
deriving (Show, Eq)
data MsgOut = Started | ReplyPort Int | ParseError Text
deriving (Show, Eq)

instance FromJSON MsgIn where
parseJSON = withObject "MsgIn" $ \v -> do
(_ :: [()]) <- v .: "QueryPort"
pure QueryPort

instance ToJSON MsgOut where
toJSON Started = object [ "Started" .= Array mempty ]
toJSON (ReplyPort p) = object [ "ReplyPort" .= p ]
toJSON (ParseError e) = object [ "ParseError" .= e ]

-- | Start up the Daedalus IPC process. It's called 'daedalusIPC', but this
-- could be any nodejs program that needs to start cardano-wallet. All it does
-- is reply with a port number when asked, using a very nodejs-specific IPC
-- method.
--
-- If the IPC channel was successfully set up, this function won't return until
-- the parent process exits. Otherwise, it will return immediately. Before
-- returning, it will log an message about why it has exited.
daedalusIPC
:: Trace IO Text
-- ^ Logging object
-> Int
-- ^ Port number to send to Daedalus
-> IO ()
daedalusIPC trace port = withNodeChannel (pure . msg) action >>= \case
Right runServer -> do
logInfo trace "Daedalus IPC server starting"
runServer >>= \case
Left (NodeChannelFinished err) ->
logNotice trace $ fmt $
"Daedalus IPC finished for this reason: "+||err||+""
Right () -> logError trace "Unreachable code"
Left NodeChannelDisabled -> do
logInfo trace $ "Daedalus IPC is not enabled."
sleep
Left (NodeChannelBadFD err) ->
logError trace $ fmt $ "Problem starting Daedalus IPC: "+||err||+""
where
-- How to respond to an incoming message, or when there is an incoming
-- message that couldn't be parsed.
msg (Right QueryPort) = Just (ReplyPort port)
msg (Left e) = Just (ParseError e)

-- What to do in context of withNodeChannel
action :: (MsgOut -> IO ()) -> IO ()
action send = send Started >> sleep

sleep = threadDelay maxBound

----------------------------------------------------------------------------
-- NodeJS child_process IPC protocol
-- https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options

-- | Possible reasons why the node channel can't be set up.
data NodeChannelError
= NodeChannelDisabled
-- ^ This process has not been started as a nodejs @'ipc'@ child_process.
| NodeChannelBadFD Text
-- ^ The @NODE_CHANNEL_FD@ environment variable has an incorrect value.
deriving (Show, Eq)

-- | The only way a node channel finishes on its own is if there is some error
-- reading or writing to its file descriptor.
newtype NodeChannelFinished = NodeChannelFinished IOError

-- | Communicate with a parent process using a NodeJS-specific protocol. This
-- process must have been spawned with one of @stdio@ array entries set to
-- @'ipc'@.
--
-- If the channel could be set up, then it returns a function for communicating
-- with the parent process.
withNodeChannel
:: (FromJSON msgin, ToJSON msgout)
=> (Either Text msgin -> IO (Maybe msgout))
-- ^ Handler for messages coming from the parent process. Left values are
-- for JSON parse errors. The handler can optionally return a reply
-- message.
-> ((msgout -> IO ()) -> IO a)
-- ^ Action to run with the channel. It is passed a function for sending
-- messages to the parent process.
-> IO (Either NodeChannelError (IO (Either NodeChannelFinished a)))
withNodeChannel onMsg handleMsg = fmap setup <$> lookupNodeChannel
where
setup handle = do
chan <- newEmptyMVar
let ipc = ipcListener handle onMsg chan
action' = handleMsg (putMVar chan)
race ipc action'

-- | Parse the NODE_CHANNEL_FD variable, if it's set, and convert to a
-- 'System.IO.Handle'.
lookupNodeChannel :: IO (Either NodeChannelError Handle)
lookupNodeChannel = (fromMaybe "" <$> lookupEnv "NODE_CHANNEL_FD") >>= \case
"" -> pure (Left NodeChannelDisabled)
var -> case readEither var of
Left err -> pure . Left . NodeChannelBadFD $
"unable to parse NODE_CHANNEL_FD: " <> T.pack err
Right fd -> tryJust handleBadFd (fdToHandle fd)
where
handleBadFd :: IOException -> Maybe NodeChannelError
handleBadFd = Just . NodeChannelBadFD . T.pack . show

ipcListener
:: forall msgin msgout. (FromJSON msgin, ToJSON msgout)
=> Handle
-> (Either Text msgin -> IO (Maybe msgout))
-> MVar msgout
-> IO NodeChannelFinished
ipcListener handle onMsg chan = NodeChannelFinished <$> do
hSetNewlineMode handle noNewlineTranslation
(concurrently_ replyLoop sendLoop $> unexpected) `catch` pure
where
sendLoop, replyLoop :: IO ()
replyLoop = forever (recvMsg >>= onMsg >>= maybeSend)
sendLoop = forever (takeMVar chan >>= sendMsg)

recvMsg :: IO (Either Text msgin)
recvMsg = first T.pack . eitherDecode <$> readMessage handle

sendMsg :: msgout -> IO ()
sendMsg = sendMessage handle . encode

maybeSend :: Maybe msgout -> IO ()
maybeSend = maybe (pure ()) (putMVar chan)

unexpected = userError "ipcListener: unreachable code"

readMessage :: Handle -> IO BL.ByteString
readMessage = if isWindows then windowsReadMessage else posixReadMessage

isWindows :: Bool
isWindows = arch == "windows"

windowsReadMessage :: Handle -> IO BL.ByteString
windowsReadMessage handle = do
_int1 <- readInt32 handle
_int2 <- readInt32 handle
size <- readInt64 handle
-- logInfo $ "int is: " <> (show [_int1, _int2]) <> " and blob is: " <> (show blob)
BL.hGet handle $ fromIntegral size
where
readInt64 :: Handle -> IO Word64
readInt64 hnd = do
bs <- BL.hGet hnd 8
pure $ runGet getWord64le bs

readInt32 :: Handle -> IO Word32
readInt32 hnd = do
bs <- BL.hGet hnd 4
pure $ runGet getWord32le bs

posixReadMessage :: Handle -> IO BL.ByteString
posixReadMessage = fmap L8.pack . hGetLine

sendMessage :: Handle -> BL.ByteString -> IO ()
sendMessage handle msg = send handle msg >> hFlush handle
where
send = if isWindows then sendMessageWindows else sendMessagePosix

sendMessageWindows :: Handle -> BL.ByteString -> IO ()
sendMessageWindows = sendWindowsMessage' 1 0

sendWindowsMessage' :: Word32 -> Word32 -> Handle -> BL.ByteString -> IO ()
sendWindowsMessage' int1 int2 handle blob =
L8.hPut handle $ runPut $ mconcat parts
where
blob' = blob <> "\n"
parts =
[ putWord32le int1
, putWord32le int2
, putWord64le $ fromIntegral $ BL.length blob'
, putLazyByteString blob'
]

sendMessagePosix :: Handle -> BL.ByteString -> IO ()
sendMessagePosix = L8.hPutStrLn
Loading

0 comments on commit d9b3553

Please sign in to comment.