Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt NodeIPC module from cardano-sl #388

Merged
merged 12 commits into from
Jun 14, 2019
1 change: 1 addition & 0 deletions cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ executable cardano-wallet
base
, aeson
, aeson-pretty
, async
, bytestring
, cardano-wallet-cli
, cardano-wallet-core
Expand Down
10 changes: 9 additions & 1 deletion exe/wallet/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ import Cardano.Wallet.Api.Types
, WalletPostData (..)
, WalletPutData (..)
)
import Cardano.Wallet.DaedalusIPC
( daedalusIPC )
import Cardano.Wallet.HttpBridge.Compatibility
( HttpBridge, block0 )
import Cardano.Wallet.HttpBridge.Environment
Expand All @@ -78,6 +80,8 @@ import Control.Arrow
( second )
import Control.Concurrent
( threadDelay )
import Control.Concurrent.Async
( race_ )
import Control.Monad
( when )
import Data.Aeson
Expand Down Expand Up @@ -410,7 +414,11 @@ execHttpBridge args _ = do
wallet <- newWalletLayer @_ @(HttpBridge n) tracer block0 db nw tl
let logStartup port = logInfo tracer $
"Wallet backend server listening on: " <> toText port
Server.start logStartup walletListen wallet
Server.withListeningSocket walletListen $ \(port, socket) -> do
let settings = Server.mkWarpSettings logStartup port
tracer' <- appendName "DaedalusIPC" tracer
race_ (daedalusIPC tracer' port) $
Server.startOnSocket settings socket wallet

-- | Generate a random mnemonic of the given size 'n' (n = number of words),
-- and print it to stdout.
Expand Down
3 changes: 3 additions & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ library
-Werror
build-depends:
aeson
, async
, base
, basement
, binary
Expand Down Expand Up @@ -65,6 +66,7 @@ library
, text-class
, time
, transformers
, unordered-containers
, vector
, wai
, warp
Expand All @@ -76,6 +78,7 @@ library
Cardano.Wallet.Api
Cardano.Wallet.Api.Server
Cardano.Wallet.Api.Types
Cardano.Wallet.DaedalusIPC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of referring to Daedalus here. The IPC stuff can be seen as a low-level protocol to allow a client to perform some process management task of the wallet server. Of course, the primary user is Daedalus, but I don't want us to fall in the trap of "making stuff just for the frontend" as a Daedalus*** module would suggest. This already caused us quite some pain with the old API that has some non-sensical stuff into it.

In short: let's build more generic interfaces that serve a well-defined purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that NodeIPC was rather confusing, so maybe have something like NodeJSIPC or JSClientIPC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of our main requirements is working with the Daedalus frontend. This ipc protocol is unique to Daedalus. I'm not aware of any requirements for integrating with some imaginary software that doesn't exist yet, but we can change the name if that happens.

It would be better to document the ways of starting cardano-wallet server on a port, so that prospective users can assess whether any of the schemes would work for them.

Cardano.Wallet.DB
Cardano.Wallet.DB.MVar
Cardano.Wallet.DB.Sqlite
Expand Down
36 changes: 25 additions & 11 deletions lib/core/src/Cardano/Wallet/Api/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
-- endpoints reachable through HTTP.

module Cardano.Wallet.Api.Server
( start
, Listen (..)
( Listen (..)
, start
, startOnSocket
, mkWarpSettings
, withListeningSocket
) where

import Prelude
Expand Down Expand Up @@ -159,14 +162,11 @@ start
-> Listen
-> WalletLayer (SeqState t) t
-> IO ()
start onStartup portOption wl =
void $ withListeningSocket portOption $ \(port, socket) -> do
let settings = Warp.defaultSettings
& Warp.setPort port
& Warp.setBeforeMainLoop (onStartup port)
startOnSocket settings socket wl
pure port
start onStartup portOpt wl =
void $ withListeningSocket portOpt $ \(port, socket) ->
startOnSocket (mkWarpSettings onStartup port) socket wl

-- | Start the application server, using the given settings and a bound socket.
startOnSocket
:: forall t. (TxId t, KeyToAddress t, EncodeAddress t, DecodeAddress t)
=> Warp.Settings
Expand All @@ -184,12 +184,26 @@ startOnSocket settings socket wl = Warp.runSettingsSocket settings socket
application :: Application
application = serve (Proxy @("v2" :> Api t)) server

-- | Create warp server settings.
mkWarpSettings
:: (Warp.Port -> IO ())
-- ^ Function to run after the listening socket is bound, just before
-- Warp enters its event loop.
-> Warp.Port
-- ^ Port that socket will be listening on.
-> Warp.Settings
mkWarpSettings onStartup port = Warp.defaultSettings
& Warp.setPort port
& Warp.setBeforeMainLoop (onStartup port)

-- | Run an action with a TCP socket bound to a port specified by the `Listen`
-- parameter.
withListeningSocket
:: Listen
-> ((Port, Socket) -> IO Port)
-> IO Port
-- ^ Whether to listen on a given port, or random port.
-> ((Port, Socket) -> IO ())
-- ^ Action to run with listening socket.
-> IO ()
withListeningSocket portOpt = bracket acquire release
where
acquire = case portOpt of
Expand Down
261 changes: 261 additions & 0 deletions lib/core/src/Cardano/Wallet/DaedalusIPC.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
-- Copyright: © 2018-2019 IOHK
-- License: MIT
--
-- Provides a mechanism for Daedalus to discover what port the cardano-wallet
-- server is listening on.
--
-- See <https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options>
-- for more information about the message protocol.

module Cardano.Wallet.DaedalusIPC
( daedalusIPC
) where

import Prelude

import Cardano.BM.Trace
( Trace, logError, logInfo, logNotice )
import Control.Concurrent
( threadDelay )
import Control.Concurrent.Async
( concurrently_, race )
import Control.Concurrent.MVar
( MVar, newEmptyMVar, putMVar, takeMVar )
import Control.Exception
( IOException, catch, tryJust )
import Control.Monad
( forever )
import Data.Aeson
( FromJSON (..)
, ToJSON (..)
, Value (..)
, eitherDecode
, encode
, object
, withObject
, (.:)
, (.=)
)
import Data.Bifunctor
( first )
import Data.Binary.Get
( getWord32le, getWord64le, runGet )
import Data.Binary.Put
( putLazyByteString, putWord32le, putWord64le, runPut )
import Data.Functor
( ($>) )
import Data.Maybe
( fromMaybe )
import Data.Text
( Text )
import Data.Word
( Word32, Word64 )
import Fmt
( fmt, (+||), (||+) )
import GHC.IO.Handle.FD
( fdToHandle )
import System.Environment
( lookupEnv )
import System.Info
( arch )
import System.IO
( Handle, hFlush, hGetLine, hSetNewlineMode, noNewlineTranslation )
import System.IO.Error
( IOError )
import Text.Read
( readEither )

import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as L8
import qualified Data.Text as T

----------------------------------------------------------------------------
-- Daedalus <-> Wallet child process port discovery protocol

data MsgIn = QueryPort
deriving (Show, Eq)
data MsgOut = Started | ReplyPort Int | ParseError Text
deriving (Show, Eq)

instance FromJSON MsgIn where
parseJSON = withObject "MsgIn" $ \v -> do
(_ :: [()]) <- v .: "QueryPort"
pure QueryPort

instance ToJSON MsgOut where
toJSON Started = object [ "Started" .= Array mempty ]
toJSON (ReplyPort p) = object [ "ReplyPort" .= p ]
toJSON (ParseError e) = object [ "ParseError" .= e ]

-- | Start up the Daedalus IPC process. It's called 'daedalusIPC', but this
-- could be any nodejs program that needs to start cardano-wallet. All it does
-- is reply with a port number when asked, using a very nodejs-specific IPC
-- method.
--
-- If the IPC channel was successfully set up, this function won't return until
-- the parent process exits. Otherwise, it will return immediately. Before
-- returning, it will log an message about why it has exited.
daedalusIPC
:: Trace IO Text
-- ^ Logging object
-> Int
-- ^ Port number to send to Daedalus
-> IO ()
daedalusIPC trace port = withNodeChannel (pure . msg) action >>= \case
Right runServer -> do
logInfo trace "Daedalus IPC server starting"
runServer >>= \case
Left (NodeChannelFinished err) ->
logNotice trace $ fmt $
"Daedalus IPC finished for this reason: "+||err||+""
Right () -> logError trace "Unreachable code"
Left NodeChannelDisabled -> do
logInfo trace $ "Daedalus IPC is not enabled."
sleep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sleeping here? What would be the consequence of returning () ? Do we expect daedalusIPC to never return? If so, I'd rather have it mentioned in a function comment to explain this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should have been commented. This is the error handling I want:

  1. If set up is unsuccessful, return immediately, causing the cardano-wallet process to exit. Daedalus does not want zombie cardano-wallet processes hanging around that it can't communicate with.
  2. If set up is successful, never return until Daedalus exits.
  3. If not running within a nodejs channel, never return (simplifies calling code)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so we really assume that any termination is an error, hence the sleep when there's no node.js channel (not an error so to speak).

Left (NodeChannelBadFD err) ->
logError trace $ fmt $ "Problem starting Daedalus IPC: "+||err||+""
where
-- How to respond to an incoming message, or when there is an incoming
-- message that couldn't be parsed.
msg (Right QueryPort) = Just (ReplyPort port)
msg (Left e) = Just (ParseError e)

-- What to do in context of withNodeChannel
action :: (MsgOut -> IO ()) -> IO ()
action send = send Started >> sleep

sleep = threadDelay maxBound

----------------------------------------------------------------------------
-- NodeJS child_process IPC protocol
-- https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options

-- | Possible reasons why the node channel can't be set up.
data NodeChannelError
= NodeChannelDisabled
-- ^ This process has not been started as a nodejs @'ipc'@ child_process.
| NodeChannelBadFD Text
-- ^ The @NODE_CHANNEL_FD@ environment variable has an incorrect value.
deriving (Show, Eq)

-- | The only way a node channel finishes on its own is if there is some error
-- reading or writing to its file descriptor.
newtype NodeChannelFinished = NodeChannelFinished IOError

-- | Communicate with a parent process using a NodeJS-specific protocol. This
-- process must have been spawned with one of @stdio@ array entries set to
-- @'ipc'@.
--
-- If the channel could be set up, then it returns a function for communicating
-- with the parent process.
withNodeChannel
:: (FromJSON msgin, ToJSON msgout)
=> (Either Text msgin -> IO (Maybe msgout))
-- ^ Handler for messages coming from the parent process. Left values are
-- for JSON parse errors. The handler can optionally return a reply
-- message.
-> ((msgout -> IO ()) -> IO a)
-- ^ Action to run with the channel. It is passed a function for sending
-- messages to the parent process.
-> IO (Either NodeChannelError (IO (Either NodeChannelFinished a)))
withNodeChannel onMsg handleMsg = fmap setup <$> lookupNodeChannel
where
setup handle = do
chan <- newEmptyMVar
let ipc = ipcListener handle onMsg chan
action' = handleMsg (putMVar chan)
race ipc action'

-- | Parse the NODE_CHANNEL_FD variable, if it's set, and convert to a
-- 'System.IO.Handle'.
lookupNodeChannel :: IO (Either NodeChannelError Handle)
lookupNodeChannel = (fromMaybe "" <$> lookupEnv "NODE_CHANNEL_FD") >>= \case
"" -> pure (Left NodeChannelDisabled)
var -> case readEither var of
Left err -> pure . Left . NodeChannelBadFD $
"unable to parse NODE_CHANNEL_FD: " <> T.pack err
Right fd -> tryJust handleBadFd (fdToHandle fd)
where
handleBadFd :: IOException -> Maybe NodeChannelError
handleBadFd = Just . NodeChannelBadFD . T.pack . show

ipcListener
:: forall msgin msgout. (FromJSON msgin, ToJSON msgout)
=> Handle
-> (Either Text msgin -> IO (Maybe msgout))
-> MVar msgout
-> IO NodeChannelFinished
ipcListener handle onMsg chan = NodeChannelFinished <$> do
hSetNewlineMode handle noNewlineTranslation
(concurrently_ replyLoop sendLoop $> unexpected) `catch` pure
where
sendLoop, replyLoop :: IO ()
replyLoop = forever (recvMsg >>= onMsg >>= maybeSend)
sendLoop = forever (takeMVar chan >>= sendMsg)

recvMsg :: IO (Either Text msgin)
recvMsg = first T.pack . eitherDecode <$> readMessage handle

sendMsg :: msgout -> IO ()
sendMsg = sendMessage handle . encode

maybeSend :: Maybe msgout -> IO ()
maybeSend = maybe (pure ()) (putMVar chan)

unexpected = userError "ipcListener: unreachable code"

readMessage :: Handle -> IO BL.ByteString
readMessage = if isWindows then windowsReadMessage else posixReadMessage

isWindows :: Bool
isWindows = arch == "windows"

windowsReadMessage :: Handle -> IO BL.ByteString
windowsReadMessage handle = do
_int1 <- readInt32 handle
_int2 <- readInt32 handle
size <- readInt64 handle
-- logInfo $ "int is: " <> (show [_int1, _int2]) <> " and blob is: " <> (show blob)
BL.hGet handle $ fromIntegral size
where
readInt64 :: Handle -> IO Word64
readInt64 hnd = do
bs <- BL.hGet hnd 8
pure $ runGet getWord64le bs

readInt32 :: Handle -> IO Word32
readInt32 hnd = do
bs <- BL.hGet hnd 4
pure $ runGet getWord32le bs

posixReadMessage :: Handle -> IO BL.ByteString
posixReadMessage = fmap L8.pack . hGetLine

sendMessage :: Handle -> BL.ByteString -> IO ()
sendMessage handle msg = send handle msg >> hFlush handle
where
send = if isWindows then sendMessageWindows else sendMessagePosix

sendMessageWindows :: Handle -> BL.ByteString -> IO ()
sendMessageWindows = sendWindowsMessage' 1 0

sendWindowsMessage' :: Word32 -> Word32 -> Handle -> BL.ByteString -> IO ()
sendWindowsMessage' int1 int2 handle blob =
L8.hPut handle $ runPut $ mconcat parts
where
blob' = blob <> "\n"
parts =
[ putWord32le int1
, putWord32le int2
, putWord64le $ fromIntegral $ BL.length blob'
, putLazyByteString blob'
]

sendMessagePosix :: Handle -> BL.ByteString -> IO ()
sendMessagePosix = L8.hPutStrLn
Loading