From fb34c1f872280e023cf6c3dcdda7c2a3b4427ed7 Mon Sep 17 00:00:00 2001 From: Rodney Lorrimar Date: Wed, 13 Mar 2019 16:22:09 +1000 Subject: [PATCH] Add startBlockSyncer --- app/server/Main.hs | 8 +++ cardano-wallet.cabal | 2 + src/Cardano/ChainProducer/RustHttpBridge.hs | 13 ++++- .../ChainProducer/RustHttpBridge/Client.hs | 13 ++++- src/Cardano/Wallet/BlockSyncer.hs | 56 +++++++++++++++++-- 5 files changed, 82 insertions(+), 10 deletions(-) diff --git a/app/server/Main.hs b/app/server/Main.hs index 9407a950321..72be213c41b 100644 --- a/app/server/Main.hs +++ b/app/server/Main.hs @@ -28,6 +28,10 @@ import System.Environment import Text.Read ( readMaybe ) +import Cardano.Wallet.BlockSyncer + ( startBlockSyncer ) + +import qualified Data.Text as T -- | Command-Line Interface specification. See http://docopt.org/ cli :: Docopt @@ -64,6 +68,7 @@ main = do ",\n connecting to " ++ (show network) ++ " node on port " ++ (show nodePort) + startBlockSyncer (showNetwork network) nodePort -- Functions for parsing the values of command line options -- @@ -81,6 +86,9 @@ readNetwork "mainnet" = Right Mainnet readNetwork "testnet" = Right Testnet readNetwork s = Left $ show s ++ " is neither \"mainnet\" nor \"testnet\"." +showNetwork :: Network -> T.Text +showNetwork = T.toLower . T.pack . show + getArg :: Arguments -> Option diff --git a/cardano-wallet.cabal b/cardano-wallet.cabal index 688cb2c223a..c92721429cc 100644 --- a/cardano-wallet.cabal +++ b/cardano-wallet.cabal @@ -81,7 +81,9 @@ executable cardano-wallet-server -O2 build-depends: base + , cardano-wallet , docopt + , text hs-source-dirs: app/server main-is: diff --git a/src/Cardano/ChainProducer/RustHttpBridge.hs b/src/Cardano/ChainProducer/RustHttpBridge.hs index 24292750511..9e555b89a3c 100644 --- a/src/Cardano/ChainProducer/RustHttpBridge.hs +++ b/src/Cardano/ChainProducer/RustHttpBridge.hs @@ -9,6 +9,7 @@ module Cardano.ChainProducer.RustHttpBridge ( RustBackend , runRustBackend + , newRustBackend ) where import Control.Monad.Except @@ -21,10 +22,14 @@ import Data.Bifunctor ( first ) import Data.Maybe ( fromMaybe ) +import Data.Text + ( Text ) import Prelude import Cardano.ChainProducer ( ErrGetNextBlocks (..), MonadChainProducer (..) ) +import Cardano.ChainProducer.RustHttpBridge.Client + ( newNetworkLayer ) import Cardano.ChainProducer.RustHttpBridge.NetworkLayer ( NetworkLayer (..), NetworkLayerError ) import Cardano.Wallet.Primitive @@ -44,14 +49,18 @@ newtype RustBackend a = RustBackend { } deriving (Monad, Applicative, Functor, MonadReader (NetworkLayer IO), MonadIO) +instance MonadChainProducer RustBackend where + nextBlocks = rbNextBlocks + runRustBackend :: NetworkLayer IO -> RustBackend a -> IO a runRustBackend network action = runReaderT (runRB action) network getNetwork :: RustBackend (NetworkLayer IO) getNetwork = ask -instance MonadChainProducer RustBackend where - nextBlocks = rbNextBlocks +newRustBackend :: Text -> Int -> IO (RustBackend a -> IO a) +newRustBackend networkName port = + runRustBackend <$> newNetworkLayer networkName port -- Note: This will be quite inefficient for at least two reasons. -- 1. If the number of blocks requested is small, it will fetch the same epoch diff --git a/src/Cardano/ChainProducer/RustHttpBridge/Client.hs b/src/Cardano/ChainProducer/RustHttpBridge/Client.hs index 81ac6146d39..d48d96f3082 100644 --- a/src/Cardano/ChainProducer/RustHttpBridge/Client.hs +++ b/src/Cardano/ChainProducer/RustHttpBridge/Client.hs @@ -5,6 +5,7 @@ -- | An API client for the Cardano HTTP Bridge. module Cardano.ChainProducer.RustHttpBridge.Client ( mkNetworkLayer + , newNetworkLayer ) where import Prelude @@ -23,12 +24,14 @@ import Data.Bifunctor ( first ) import Data.ByteArray ( convert ) +import Data.Text + ( Text ) import Network.HTTP.Client - ( Manager ) + ( Manager, defaultManagerSettings, newManager ) import Servant.API ( (:<|>) (..) ) import Servant.Client - ( BaseUrl, ClientM, client, mkClientEnv, runClientM ) + ( BaseUrl (..), ClientM, Scheme (Http), client, mkClientEnv, runClientM ) import Servant.Extra.ContentTypes ( Hash (..), WithHash (..) ) @@ -81,3 +84,9 @@ hashToApi' h = case hashToApi h of Just h' -> pure h' Nothing -> throwError $ NetworkLayerError "hashToApi: Digest was of the wrong length" + +newNetworkLayer :: Text -> Int -> IO (NetworkLayer IO) +newNetworkLayer network port = do + mgr <- newManager defaultManagerSettings + let baseUrl = BaseUrl Http "localhost" port "" + pure $ mkNetworkLayer mgr baseUrl (Api.NetworkName network) diff --git a/src/Cardano/Wallet/BlockSyncer.hs b/src/Cardano/Wallet/BlockSyncer.hs index f68516dad0d..1aadc16bb5a 100644 --- a/src/Cardano/Wallet/BlockSyncer.hs +++ b/src/Cardano/Wallet/BlockSyncer.hs @@ -12,23 +12,37 @@ -- catching up). module Cardano.Wallet.BlockSyncer - ( - BlockHeadersConsumed(..) + ( BlockHeadersConsumed(..) , tickingFunction + , startBlockSyncer ) where import Prelude +import Cardano.ChainProducer + ( nextBlocks ) +import Cardano.ChainProducer.RustHttpBridge + ( newRustBackend ) import Cardano.Wallet.Primitive - ( Block (..), BlockHeader ) + ( Block (..), BlockHeader, epochIndex, headerSlot, slotNumber ) +import Cardano.Wallet.Slotting + ( SlotId (SlotId) ) import Control.Concurrent ( threadDelay ) +import Control.Monad.Except + ( runExceptT ) +import Data.IORef + ( newIORef, readIORef, writeIORef ) +import qualified Data.List as L +import Data.Text + ( Text ) import Data.Time.Units ( Millisecond, toMicroseconds ) - -import qualified Data.List as L - +import Fmt + ( fmt, (+||), (||+) ) +import System.Exit + ( die ) newtype BlockHeadersConsumed = BlockHeadersConsumed [BlockHeader] @@ -68,3 +82,33 @@ tickingFunction getNextBlocks action tickTime = go -> Bool checkIfAlreadyConsumed consumedHeaders (Block theHeader _) = theHeader `L.notElem` consumedHeaders + +-- | Start the chain producer process, consuming blocks by printing their slot. +startBlockSyncer :: Text -> Int -> IO () +startBlockSyncer networkName port = do + runRustBackend <- newRustBackend networkName port + + startSlotRef <- newIORef (SlotId 0 0) + + let chunkSize = 30000 + interval = 20000 + + produceBlocks :: IO [Block] + produceBlocks = do + start <- readIORef startSlotRef + res <- runRustBackend (runExceptT $ nextBlocks chunkSize start) + case res of + Left err -> die $ "Chain producer error: " ++ show err + Right [] -> pure [] + Right blocks -> do + let start' = headerSlot . header . last $ blocks + writeIORef startSlotRef start' + pure blocks + + logBlock :: Block -> IO () + logBlock block = putStrLn msg + where + msg = fmt ("Received block "+||epochIndex h||+"."+||slotNumber h||+"") + h = header block + + tickingFunction produceBlocks logBlock interval (BlockHeadersConsumed [])