Skip to content

Commit

Permalink
Allow BlockSyncer.listen to catch up with the node
Browse files Browse the repository at this point in the history
It will now process blocks continuously, in chunks, until all the local
blocks from the node are done. Then it will start polling.
  • Loading branch information
rvl committed Mar 15, 2019
1 parent 598c3eb commit 66fb354
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
35 changes: 23 additions & 12 deletions src/Cardano/NetworkLayer.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Cardano.NetworkLayer
( NetworkLayer (..)
, tick
, TickResult(..)
, listen
) where

Expand Down Expand Up @@ -39,24 +41,35 @@ data NetworkLayer m e0 e1 = NetworkLayer
:: ExceptT e1 m (Hash "BlockHeader", BlockHeader)
}

-- | Every interval @delay@, fetches some data from a given source, and call
-- an action for each elements retrieved.
-- | Repeatedly fetch data from a given source function, and call an action for
-- each element retrieved.
--
-- If the data source indicates that it has no more data at present ('Sleep'),
-- then sleep for the interval @delay@, and then try the fetch again.
tick
:: forall st m b. (MonadIO m)
=> (st -> m ([b], st))
=> (st -> m (TickResult [b], st))
-- ^ A way to get a new elements
-> (b -> m ())
-- ^ Action to be taken on new elements
-> Millisecond
-- ^ tick time
-- ^ Tick time
-> st
-- ^ Initial state.
-> m ()
tick next action delay !st = do
(bs, !st') <- next st
mapM_ action bs
liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay
(res, !st') <- next st
case res of
GotChunk bs -> mapM_ action bs
Sleep -> liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay
tick next action delay st'

-- | The result type of the element fetch function provided to 'tick'.
data TickResult a
= GotChunk !a -- ^ Have a result, and there may be more available.
| Sleep -- ^ There is no result available now, so wait.
deriving (Show, Eq, Foldable)

-- | Retrieve blocks from a chain producer and execute some given action for
-- each block.
listen
Expand All @@ -67,16 +80,14 @@ listen
listen network action = do
tick getNextBlocks action 5000 (SlotId 0 0)
where
getNextBlocks :: SlotId -> IO ([Block], SlotId)
getNextBlocks :: SlotId -> IO (TickResult [Block], SlotId)
getNextBlocks current = do
res <- runExceptT $ nextBlocks network current
case res of
Left err ->
die $ fmt $ "Chain producer error: "+||err||+""
Right [] ->
pure ([], current)
pure (Sleep, current)
Right blocks ->
-- fixme: there are more blocks available, so we need not
-- wait for an interval to pass before getting more blocks.
let next = succ . slotId . header . last $ blocks
in pure (blocks, next)
in pure (GotChunk blocks, next)
30 changes: 24 additions & 6 deletions test/unit/Cardano/NetworkLayerSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Cardano.NetworkLayerSpec
import Prelude

import Cardano.NetworkLayer
( tick )
( TickResult (..), tick )
import Cardano.Wallet.Primitive
( Block (..), BlockHeader (..), Hash (..), SlotId (..) )
import Control.Concurrent
Expand All @@ -24,6 +24,8 @@ import Control.Monad
( foldM )
import Control.Monad.IO.Class
( liftIO )
import Data.Foldable
( toList )
import Data.Functor
( ($>) )
import Data.Time.Units
Expand Down Expand Up @@ -61,7 +63,7 @@ tickingFunctionTest (TickingTime tickTime, Blocks blocks) =
(readerChan, reader) <- mkReader
(writerChan, writer) <- mkWriter
waitFor writerChan $ tick writer reader tickTime blocks
takeMVar readerChan `shouldReturn` reverse (mconcat blocks)
takeMVar readerChan `shouldReturn` reverse (flatten blocks)

waitFor
:: MVar ()
Expand All @@ -73,13 +75,13 @@ waitFor done action = do
killThread threadId

mkWriter
:: st ~ [[a]] => IO (MVar (), st -> IO ([a], st))
:: st ~ [TickResult [a]] => IO (MVar (), st -> IO (TickResult [a], st))
mkWriter = do
done <- newEmptyMVar
return
( done
, \case
st@[] -> putMVar done () $> ([], st)
st@[] -> putMVar done () $> (Sleep, st)
h:st -> return (h, st)
)

Expand All @@ -92,6 +94,9 @@ mkReader = do
, \x -> modifyMVar_ ref $ return . (x :)
)

flatten :: [TickResult [Block]] -> [Block]
flatten = concat . concat . map toList

{-------------------------------------------------------------------------------
Arbitrary Instances
-------------------------------------------------------------------------------}
Expand All @@ -107,7 +112,7 @@ instance Arbitrary TickingTime where
return $ TickingTime tickTime


newtype Blocks = Blocks [[Block]]
newtype Blocks = Blocks [TickResult [Block]]
deriving Show

instance Arbitrary Blocks where
Expand All @@ -116,7 +121,8 @@ instance Arbitrary Blocks where
n <- fromIntegral . (`mod` 42) <$> arbitrary @Word8
let h0 = BlockHeader (SlotId 1 0) (Hash "initial block")
let b0 = (blockHeaderHash h0, Block h0 mempty)
Blocks <$> groups (map snd $ take n $ iterate next b0)
gs <- groups (map snd $ take n $ iterate next b0)
Blocks <$> tickResults gs
where
next :: (Hash "BlockHeader", Block) -> (Hash "BlockHeader", Block)
next (prev, b) =
Expand Down Expand Up @@ -148,3 +154,15 @@ groups = fmap reverse . foldM arbitraryGroup [[]]
choose (1 :: Int, 3) >>= \case
1 -> return $ [a]:grp:rest
_ -> return $ (grp ++ [a]):rest

-- Converts list of chunks to TickResult and inserts some Sleeps.
tickResults :: [[a]] -> Gen [TickResult [a]]
tickResults [] = pure []
tickResults (c:cs) = do
choose (1 :: Int, 4) >>= \case
1 -> do
rs <- tickResults cs
return (GotChunk c:rs)
_ -> do
rs <- tickResults (c:cs)
return (Sleep:rs)

0 comments on commit 66fb354

Please sign in to comment.