Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add startBlockSyncer #54

Merged
merged 5 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions app/server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
-}
module Main where

import Prelude

import Cardano.Wallet.BlockSyncer
( listen )
import Cardano.Wallet.Primitive
( Block )
import Control.Monad
( when )
import Prelude
import Fmt
( build, fmt )
import System.Console.Docopt
( Arguments
, Docopt
Expand All @@ -28,6 +35,9 @@ import System.Environment
import Text.Read
( readMaybe )

import qualified Cardano.NetworkLayer.HttpBridge as HttpBridge
import qualified Data.Text as T
import qualified Data.Text.IO as T

-- | Command-Line Interface specification. See http://docopt.org/
cli :: Docopt
Expand Down Expand Up @@ -55,15 +65,15 @@ main = do
args <- parseArgsOrExit cli =<< getArgs
when (args `isPresent` (longOption "help")) $ exitWithUsage cli

network <- getArg args (longOption "network") readNetwork
networkName <- getArg args (longOption "network") readNetwork
nodePort <- getArg args (longOption "node-port") readInt
walletPort <- getArg args (longOption "wallet-server-port") readInt

putStrLn $
"TODO: start wallet on port " ++ (show walletPort) ++
",\n connecting to " ++ (show network) ++
" node on port " ++ (show nodePort)
_ <- getArg args (longOption "wallet-server-port") readInt

network <- HttpBridge.newNetworkLayer (showNetwork networkName) nodePort
listen network logBlock
where
logBlock :: Block -> IO ()
logBlock = T.putStrLn . fmt . build

-- Functions for parsing the values of command line options
--
Expand All @@ -81,6 +91,9 @@ readNetwork "mainnet" = Right Mainnet
readNetwork "testnet" = Right Testnet
readNetwork s = Left $ show s ++ " is neither \"mainnet\" nor \"testnet\"."

showNetwork :: Network -> T.Text
showNetwork = T.toLower . T.pack . show

getArg
:: Arguments
-> Option
Expand Down
3 changes: 3 additions & 0 deletions cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ executable cardano-wallet-server
-O2
build-depends:
base
, cardano-wallet
, docopt
, text
, fmt
hs-source-dirs:
app/server
main-is:
Expand Down
13 changes: 9 additions & 4 deletions src/Cardano/NetworkLayer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import Cardano.Wallet.Primitive
( Block, BlockHeader (..), Hash (..), SlotId )
import Control.Monad.Except
( ExceptT )
import Data.Word
( Word64 )


data NetworkLayer m e0 e1 = NetworkLayer
{ nextBlocks :: Word64 -> SlotId -> ExceptT e0 m [Block]
, networkTip :: ExceptT e1 m (Hash "BlockHeader", BlockHeader)
{ nextBlocks :: SlotId -> ExceptT e0 m [Block]
-- ^ Gets some blocks from the node. It will not necessarily return all
-- the blocks that the node has, but will receive a reasonable-sized
-- chunk. It will never return blocks from before the given slot. It
-- may return an empty list if the node does not have any blocks from
-- after the starting slot.

, networkTip
:: ExceptT e1 m (Hash "BlockHeader", BlockHeader)
}
86 changes: 27 additions & 59 deletions src/Cardano/NetworkLayer/HttpBridge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@ import Cardano.NetworkLayer
import Cardano.NetworkLayer.HttpBridge.Api
( ApiT (..), EpochIndex (..), NetworkName (..), api )
import Cardano.Wallet.Primitive
( Block (..)
, BlockHeader (..)
, Hash (..)
, Hash (..)
, SlotId (..)
, blockIsAfter
, blockIsBefore
, blockIsBetween
, slotIncr
, slotsPerEpoch
)
( Block (..), BlockHeader (..), Hash (..), SlotId (..) )
import Control.Exception
( Exception (..) )
import Control.Monad.Except
Expand All @@ -49,8 +39,6 @@ import Data.Bifunctor
( first )
import Data.ByteArray
( convert )
import Data.Maybe
( fromMaybe )
import Data.Text
( Text )
import Data.Word
Expand All @@ -69,7 +57,6 @@ import Servant.Extra.ContentTypes
import qualified Data.Text as T
import qualified Servant.Extra.ContentTypes as Api


-- | Constructs a network layer with the given cardano-http-bridge API.
mkNetworkLayer :: Monad m => HttpBridge m e -> NetworkLayer m e e
mkNetworkLayer httpBridge = NetworkLayer
Expand All @@ -82,61 +69,40 @@ mkNetworkLayer httpBridge = NetworkLayer
newNetworkLayer :: Text -> Int -> IO (NetworkLayer IO HttpBridgeError HttpBridgeError)
newNetworkLayer network port = mkNetworkLayer <$> newHttpBridge network port

-- Note: This will be quite inefficient for at least two reasons.
-- 1. If the number of blocks requested is small, it will fetch the same epoch
-- pack file repeatedly.
-- 2. Fetching the tip block and working backwards is not ideal.
-- We will keep it for now, and it can be improved later.
-- | Retrieve a chunk of blocks from cardano-http-bridge.
--
-- It will either return:
-- - an epoch pack's worth of blocks (those after the given starting slot); or
-- - all of the unstable blocks after the starting slot, if any.
rbNextBlocks
:: Monad m
=> HttpBridge m e -- ^ http-bridge API
-> Word64 -- ^ Number of blocks to retrieve
-> SlotId -- ^ Starting point
-> ExceptT e m [Block]
rbNextBlocks net numBlocks start = do
rbNextBlocks net start = do
(tipHash, tip) <- fmap slotId <$> getNetworkTip net
epochBlocks <- lift $ blocksFromPacks net tip
lastBlocks <- unstableBlocks net tipHash tip epochBlocks
epochBlocks <- lift $ blocksFromPacks 1 net tip
lastBlocks <- if null epochBlocks
then unstableBlocks net tipHash tip
else pure []
pure (epochBlocks ++ lastBlocks)
where
end = slotIncr numBlocks start

-- Grab blocks from epoch pack files
blocksFromPacks network tip = do
let epochs = epochRange numBlocks start tip
blocksFromPacks limit network tip = do
let epochs = take limit [epochIndex start .. (epochIndex tip) - 1]
epochBlocks <- getEpochs network epochs
pure $ filter (blockIsBetween start end) (concat epochBlocks)
pure $ filter (blockIsSameOrAfter start) (concat epochBlocks)

-- The next slot after the last block.
slotAfter [] = Nothing
slotAfter bs = Just . succ . slotId . header . last $ bs
-- Predicate returns true iff the block is from the given slot or a later one.
blockIsSameOrAfter :: SlotId -> Block -> Bool
blockIsSameOrAfter s = (>= s) . slotId . header

-- Grab the remaining blocks which aren't packed in epoch files,
-- starting from the tip.
unstableBlocks network tipHash tip epochBlocks = do
let start' = fromMaybe start (slotAfter epochBlocks)

lastBlocks <- if end > start' && start' <= tip
then fetchBlocksFromTip network start' tipHash
else pure []

pure $ filter (blockIsBefore end) lastBlocks

-- | Calculates which epochs to fetch, given a number of slots, and the start
-- point. It takes into account the latest block available, and that the most
-- recent epoch is never available in a pack file.
epochRange
:: Word64
-- ^ Number of slots
-> SlotId
-- ^ Start point
-> SlotId
-- ^ Latest block available
-> [Word64]
epochRange numBlocks (SlotId startEpoch startSlot) (SlotId tipEpoch _) =
[startEpoch .. min (tipEpoch - 1) (startEpoch + fromIntegral numEpochs)]
where
numEpochs = (numBlocks + fromIntegral startSlot) `div` slotsPerEpoch
unstableBlocks network tipHash tip
| start <= tip = fetchBlocksFromTip network start tipHash
| otherwise = pure []

-- | Fetch epoch blocks until one fails.
getEpochs
Expand All @@ -158,8 +124,9 @@ fetchBlocksFromTip network start tipHash =
where
workBackwards headerHash = do
block <- getBlock network headerHash
if blockIsAfter start block then do
blocks <- workBackwards $ prevBlockHash $ header block
let hdr = header block
if start < slotId hdr then do
blocks <- workBackwards (prevBlockHash hdr)
pure (block:blocks)
else
pure [block]
Expand Down Expand Up @@ -219,9 +186,10 @@ mkHttpBridge mgr baseUrl network = HttpBridge
{ getBlock = \hash -> do
hash' <- hashToApi' hash
run (getApiT <$> getBlockByHash network hash')
, getEpoch = \ep -> run (map getApiT <$>
getEpochById network (EpochIndex ep))
, getNetworkTip = run (blockHeaderHash <$> getTipBlockHeader network)
, getEpoch = \ep ->
run (map getApiT <$> getEpochById network (EpochIndex ep))
, getNetworkTip =
run (blockHeaderHash <$> getTipBlockHeader network)
}
where
run :: ClientM a -> ExceptT HttpBridgeError IO a
Expand Down
102 changes: 58 additions & 44 deletions src/Cardano/Wallet/BlockSyncer.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
Expand All @@ -7,64 +9,76 @@
-- This module contains the ticking function that is responsible for invoking
-- block acquisition functionality and executing it in periodic fashion.
--
-- Known limitations: the ticking function makes sure action is not executed on
-- already consumed block, but does not check and handle block gaps (aka
-- catching up).
-- Known limitations:
-- - Blocks are produced by the network layer. They are not expected to produce
-- any duplicates and to rollback.
--
-- -

module Cardano.Wallet.BlockSyncer
(
BlockHeadersConsumed(..)
, tickingFunction
( tick
, listen
) where


import Prelude

import Cardano.NetworkLayer
( NetworkLayer (..) )
import Cardano.Wallet.Primitive
( Block (..), BlockHeader )
( Block (..), BlockHeader (..), SlotId (..) )
import Control.Concurrent
( threadDelay )
import Control.Monad.Except
( runExceptT )
import Control.Monad.IO.Class
( MonadIO, liftIO )
import Data.Time.Units
( Millisecond, toMicroseconds )
import Fmt
( fmt, (+||), (||+) )
import System.Exit
( die )

import qualified Data.List as L


newtype BlockHeadersConsumed =
BlockHeadersConsumed [BlockHeader]
deriving (Show, Eq)

storingLimit :: Int
storingLimit = 2160

tickingFunction
:: IO [Block]
-- ^ a way to get a new block
-> (Block -> IO ())
-- ^ action taken on a new block
-- | Every interval @delay@, fetches some data from a given source, and call
-- an action for each elements retrieved.
tick
:: forall st m b. (MonadIO m)
=> (st -> m ([b], st))
-- ^ A way to get a new elements
-> (b -> m ())
-- ^ Action to be taken on new elements
-> Millisecond
-- ^ tick time
-> BlockHeadersConsumed
-> IO ()
tickingFunction getNextBlocks action tickTime = go
where
go
:: BlockHeadersConsumed
-> IO ()
go (BlockHeadersConsumed headersConsumed) = do
blocksDownloaded <- getNextBlocks
let blocksToProcess = filter
(checkIfAlreadyConsumed headersConsumed)
(L.nub blocksDownloaded)
mapM_ action blocksToProcess
threadDelay $ (fromIntegral . toMicroseconds) tickTime
go $ BlockHeadersConsumed
$ take storingLimit
$ map header blocksToProcess ++ headersConsumed
-> st
-> m ()
tick next action delay !st = do
(bs, !st') <- next st
mapM_ action bs
liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay
tick next action delay st'

checkIfAlreadyConsumed
:: [BlockHeader]
-> Block
-> Bool
checkIfAlreadyConsumed consumedHeaders (Block theHeader _) =
theHeader `L.notElem` consumedHeaders
-- | Retrieve blocks from a chain producer and execute some given action for
-- each block.
listen
:: forall e0 e1. (Show e0)
=> NetworkLayer IO e0 e1
-> (Block -> IO ())
-> IO ()
listen network action = do
tick getNextBlocks action 5000 (SlotId 0 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely replacing BlockHeadersConsumed with SlotId will make it easier to follow in the logs!!!

where
getNextBlocks :: SlotId -> IO ([Block], SlotId)
getNextBlocks current = do
res <- runExceptT $ nextBlocks network current
case res of
Left err ->
die $ fmt $ "Chain producer error: "+||err||+""
Right [] ->
pure ([], current)
Right blocks ->
-- fixme: there are more blocks available, so we need not
-- wait for an interval to pass before getting more blocks.
let next = succ . slotId . header . last $ blocks
in pure (blocks, next)
Loading