Skip to content

Commit

Permalink
Add startBlockSyncer
Browse files Browse the repository at this point in the history
  • Loading branch information
rvl committed Mar 13, 2019
1 parent 05d77df commit fb34c1f
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 10 deletions.
8 changes: 8 additions & 0 deletions app/server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import System.Environment
import Text.Read
( readMaybe )

import Cardano.Wallet.BlockSyncer
( startBlockSyncer )

import qualified Data.Text as T

-- | Command-Line Interface specification. See http://docopt.org/
cli :: Docopt
Expand Down Expand Up @@ -64,6 +68,7 @@ main = do
",\n connecting to " ++ (show network) ++
" node on port " ++ (show nodePort)

startBlockSyncer (showNetwork network) nodePort

-- Functions for parsing the values of command line options
--
Expand All @@ -81,6 +86,9 @@ readNetwork "mainnet" = Right Mainnet
readNetwork "testnet" = Right Testnet
readNetwork s = Left $ show s ++ " is neither \"mainnet\" nor \"testnet\"."

showNetwork :: Network -> T.Text
showNetwork = T.toLower . T.pack . show

getArg
:: Arguments
-> Option
Expand Down
2 changes: 2 additions & 0 deletions cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ executable cardano-wallet-server
-O2
build-depends:
base
, cardano-wallet
, docopt
, text
hs-source-dirs:
app/server
main-is:
Expand Down
13 changes: 11 additions & 2 deletions src/Cardano/ChainProducer/RustHttpBridge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
module Cardano.ChainProducer.RustHttpBridge
( RustBackend
, runRustBackend
, newRustBackend
) where

import Control.Monad.Except
Expand All @@ -21,10 +22,14 @@ import Data.Bifunctor
( first )
import Data.Maybe
( fromMaybe )
import Data.Text
( Text )
import Prelude

import Cardano.ChainProducer
( ErrGetNextBlocks (..), MonadChainProducer (..) )
import Cardano.ChainProducer.RustHttpBridge.Client
( newNetworkLayer )
import Cardano.ChainProducer.RustHttpBridge.NetworkLayer
( NetworkLayer (..), NetworkLayerError )
import Cardano.Wallet.Primitive
Expand All @@ -44,14 +49,18 @@ newtype RustBackend a = RustBackend {
} deriving (Monad, Applicative, Functor,
MonadReader (NetworkLayer IO), MonadIO)

instance MonadChainProducer RustBackend where
nextBlocks = rbNextBlocks

runRustBackend :: NetworkLayer IO -> RustBackend a -> IO a
runRustBackend network action = runReaderT (runRB action) network

getNetwork :: RustBackend (NetworkLayer IO)
getNetwork = ask

instance MonadChainProducer RustBackend where
nextBlocks = rbNextBlocks
newRustBackend :: Text -> Int -> IO (RustBackend a -> IO a)
newRustBackend networkName port =
runRustBackend <$> newNetworkLayer networkName port

-- Note: This will be quite inefficient for at least two reasons.
-- 1. If the number of blocks requested is small, it will fetch the same epoch
Expand Down
13 changes: 11 additions & 2 deletions src/Cardano/ChainProducer/RustHttpBridge/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
-- | An API client for the Cardano HTTP Bridge.
module Cardano.ChainProducer.RustHttpBridge.Client
( mkNetworkLayer
, newNetworkLayer
) where

import Prelude
Expand All @@ -23,12 +24,14 @@ import Data.Bifunctor
( first )
import Data.ByteArray
( convert )
import Data.Text
( Text )
import Network.HTTP.Client
( Manager )
( Manager, defaultManagerSettings, newManager )
import Servant.API
( (:<|>) (..) )
import Servant.Client
( BaseUrl, ClientM, client, mkClientEnv, runClientM )
( BaseUrl (..), ClientM, Scheme (Http), client, mkClientEnv, runClientM )
import Servant.Extra.ContentTypes
( Hash (..), WithHash (..) )

Expand Down Expand Up @@ -81,3 +84,9 @@ hashToApi' h = case hashToApi h of
Just h' -> pure h'
Nothing -> throwError
$ NetworkLayerError "hashToApi: Digest was of the wrong length"

newNetworkLayer :: Text -> Int -> IO (NetworkLayer IO)
newNetworkLayer network port = do
mgr <- newManager defaultManagerSettings
let baseUrl = BaseUrl Http "localhost" port ""
pure $ mkNetworkLayer mgr baseUrl (Api.NetworkName network)
56 changes: 50 additions & 6 deletions src/Cardano/Wallet/BlockSyncer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,37 @@
-- catching up).

module Cardano.Wallet.BlockSyncer
(
BlockHeadersConsumed(..)
( BlockHeadersConsumed(..)
, tickingFunction
, startBlockSyncer
) where


import Prelude

import Cardano.ChainProducer
( nextBlocks )
import Cardano.ChainProducer.RustHttpBridge
( newRustBackend )
import Cardano.Wallet.Primitive
( Block (..), BlockHeader )
( Block (..), BlockHeader, epochIndex, headerSlot, slotNumber )
import Cardano.Wallet.Slotting
( SlotId (SlotId) )
import Control.Concurrent
( threadDelay )
import Control.Monad.Except
( runExceptT )
import Data.IORef
( newIORef, readIORef, writeIORef )
import qualified Data.List as L
import Data.Text
( Text )
import Data.Time.Units
( Millisecond, toMicroseconds )

import qualified Data.List as L

import Fmt
( fmt, (+||), (||+) )
import System.Exit
( die )

newtype BlockHeadersConsumed =
BlockHeadersConsumed [BlockHeader]
Expand Down Expand Up @@ -68,3 +82,33 @@ tickingFunction getNextBlocks action tickTime = go
-> Bool
checkIfAlreadyConsumed consumedHeaders (Block theHeader _) =
theHeader `L.notElem` consumedHeaders

-- | Start the chain producer process, consuming blocks by printing their slot.
startBlockSyncer :: Text -> Int -> IO ()
startBlockSyncer networkName port = do
runRustBackend <- newRustBackend networkName port

startSlotRef <- newIORef (SlotId 0 0)

let chunkSize = 30000
interval = 20000

produceBlocks :: IO [Block]
produceBlocks = do
start <- readIORef startSlotRef
res <- runRustBackend (runExceptT $ nextBlocks chunkSize start)
case res of
Left err -> die $ "Chain producer error: " ++ show err
Right [] -> pure []
Right blocks -> do
let start' = headerSlot . header . last $ blocks
writeIORef startSlotRef start'
pure blocks

logBlock :: Block -> IO ()
logBlock block = putStrLn msg
where
msg = fmt ("Received block "+||epochIndex h||+"."+||slotNumber h||+"")
h = header block

tickingFunction produceBlocks logBlock interval (BlockHeadersConsumed [])

0 comments on commit fb34c1f

Please sign in to comment.