-
Notifications
You must be signed in to change notification settings - Fork 213
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[3] Adding ticking function test and downloading block logic [3] Block syncer working ok plus additional tests [3] Cleaning the code [3] cabal fix
- Loading branch information
1 parent
0decd6d
commit 2bc23be
Showing
3 changed files
with
275 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
{-# LANGUAGE DataKinds #-} | ||
{-# LANGUAGE OverloadedStrings #-} | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
|
||
module Cardano.Wallet.BlockSyncer | ||
( | ||
blockSyncerBracket | ||
, runBlockLayerMockup | ||
, BlocksConsumed(..) | ||
) where | ||
|
||
import Prelude | ||
|
||
|
||
import Control.Concurrent | ||
( forkIO, killThread, myThreadId, threadDelay ) | ||
import Control.Exception | ||
( SomeException, bracket, catch ) | ||
import Data.IORef | ||
( IORef, newIORef, readIORef, writeIORef ) | ||
import Data.List | ||
( delete, (!!) ) | ||
import Data.Text.Internal | ||
( showText ) | ||
import Data.Word | ||
( Word64 ) | ||
import Formatting | ||
( sformat, (%) ) | ||
import qualified Formatting as F | ||
|
||
|
||
import Cardano.Wallet.Primitive | ||
( Block (..), BlockHeader (..), Hash (..) ) | ||
|
||
|
||
|
||
newtype BlocksConsumed = BlocksConsumed [(Hash "BlockHeader")] deriving (Show, Eq) | ||
|
||
data BlockLayerMockup = BlockLayerMockup { | ||
getBlock :: (Hash "BlockHeader") -> IO Block | ||
, getEpoch :: Word64 -> IO [Block] | ||
, getNetworkTip :: IO ((Hash "BlockHeader"),BlockHeader) | ||
} | ||
|
||
runBlockLayerMockup | ||
:: Int | ||
-> [((Hash "BlockHeader"),Block)] | ||
-> Int | ||
-> IO BlockLayerMockup | ||
runBlockLayerMockup period blocks killTicks= do | ||
initialHeaderData <- newIORef 0 | ||
_ <- forkIO $ countDown initialHeaderData | ||
pure BlockLayerMockup { | ||
getBlock = _getBlock blocks | ||
, getEpoch = _getEpoch blocks | ||
, getNetworkTip = _getNetworkTip blocks initialHeaderData | ||
} | ||
where | ||
countDown | ||
:: IORef Int | ||
-> IO () | ||
countDown counter = do | ||
previousCounter <- readIORef counter | ||
if previousCounter >= killTicks then do | ||
threadID <- myThreadId | ||
killThread threadID | ||
else do | ||
threadDelay (period*1000*1000) | ||
writeIORef counter $ min (length blocks - 1) (previousCounter + 1) | ||
countDown counter | ||
|
||
|
||
_getBlock | ||
:: [((Hash "BlockHeader"),Block)] | ||
-> (Hash "BlockHeader") | ||
-> IO Block | ||
_getBlock blocks headerHash = | ||
case filter (\(h,_) -> h == headerHash) blocks of | ||
[(_,block)] -> return block | ||
_ -> error "Something wrong with the test blocks" | ||
|
||
_getEpoch | ||
:: [((Hash "BlockHeader"),Block)] | ||
-> Word64 | ||
-> IO [Block] | ||
_getEpoch blocks epoch = do | ||
let bs = filter (\(_,Block (BlockHeader theEpoch _ _) _) -> theEpoch == epoch) blocks | ||
pure $ map snd bs | ||
|
||
_getNetworkTip | ||
:: [((Hash "BlockHeader"),Block)] | ||
-> IORef Int | ||
-> IO ((Hash "BlockHeader"), BlockHeader) | ||
_getNetworkTip blocks currentPosition = do | ||
index <- readIORef currentPosition | ||
let (headerHash, Block blockHeader _ ) = (reverse blocks) !! index | ||
pure (headerHash,blockHeader) | ||
|
||
blockSyncerBracket | ||
:: IORef BlocksConsumed | ||
-> BlockLayerMockup | ||
-> Int | ||
-> IO () | ||
blockSyncerBracket blocksPersisted blockLayer period = | ||
bracket | ||
(return ()) --(putStrLn "setting up for ticking function") | ||
(\_->return ()) --(\_ -> putStrLn "cleaning for ticking function") | ||
(tickingFunction blocksPersisted blockLayer period) | ||
|
||
tickingFunction | ||
:: IORef BlocksConsumed | ||
-> BlockLayerMockup | ||
-> Int | ||
-> () | ||
-> IO () | ||
tickingFunction initialState blockLayer delayPeriod _ = | ||
go initialState | ||
`catch` | ||
(\(e :: SomeException) -> | ||
let msg = "Terminating tickingFunction of BlockSyncer due to " % F.shown | ||
in (putStrLn . showText) $ sformat msg e | ||
) | ||
where | ||
go | ||
:: IORef BlocksConsumed | ||
-> IO () | ||
go previousState = do | ||
--putStrLn "syncing the block ..." | ||
(BlocksConsumed headerHashesConsumed) <- readIORef initialState | ||
(currentHeaderHash, BlockHeader _ _ previousHeaderHash) <- getNetworkTip blockLayer | ||
updatedHeaderHashes <- | ||
case headerHashesConsumed of | ||
[] -> pure [currentHeaderHash] | ||
lastConsumed : _ | lastConsumed == currentHeaderHash -> | ||
pure headerHashesConsumed | ||
| lastConsumed == previousHeaderHash -> | ||
pure $ currentHeaderHash : headerHashesConsumed | ||
| otherwise -> do | ||
missingHashes <- retrieveMissingBlocks [previousHeaderHash] | ||
headerHashesConsumed | ||
blockLayer | ||
pure $ currentHeaderHash : (missingHashes ++ headerHashesConsumed) | ||
writeIORef initialState $ BlocksConsumed updatedHeaderHashes | ||
threadDelay (delayPeriod*1000*1000) | ||
go previousState | ||
|
||
retrieveMissingBlocks | ||
:: [(Hash "BlockHeader")] | ||
-> [(Hash "BlockHeader")] | ||
-> BlockLayerMockup | ||
-> IO [(Hash "BlockHeader")] | ||
retrieveMissingBlocks missing headerHashesConsumed layer = do | ||
let lastHeaderHash = last missing | ||
if lastHeaderHash == head headerHashesConsumed then | ||
pure $ delete lastHeaderHash missing | ||
else do | ||
(Block (BlockHeader _ _ previousHeaderHash) _) <- (getBlock layer) lastHeaderHash | ||
retrieveMissingBlocks (missing ++ [previousHeaderHash]) headerHashesConsumed layer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
{-# LANGUAGE DataKinds #-} | ||
module Cardano.Wallet.BlockSyncerSpec | ||
( spec | ||
) where | ||
|
||
|
||
import Prelude | ||
|
||
|
||
import Control.Concurrent | ||
( forkIO, killThread, threadDelay, throwTo ) | ||
import Control.Monad.IO.Class | ||
( liftIO ) | ||
import Data.ByteString | ||
( ByteString, pack ) | ||
import Data.IORef | ||
( newIORef, readIORef ) | ||
import qualified Data.Set as Set | ||
import System.Exit | ||
( ExitCode (..) ) | ||
import Test.Hspec | ||
( Spec, describe, it ) | ||
import Test.Hspec.Expectations | ||
( shouldBe ) | ||
import Test.QuickCheck | ||
( generate, vector ) | ||
import Test.QuickCheck.Gen | ||
( Gen ) | ||
|
||
|
||
import Cardano.Wallet.BlockSyncer | ||
( BlocksConsumed (..), blockSyncerBracket, runBlockLayerMockup ) | ||
import Cardano.Wallet.Primitive | ||
( Block (..), BlockHeader (..), Hash (..) ) | ||
|
||
|
||
spec :: Spec | ||
spec = do | ||
describe "Block syncer downloads all blocks" $ do | ||
it "the bracket releases itself as expected when exception is thrown" $ do | ||
consecutiveBlocks <- liftIO $ mkConsecutiveTestBlocks 1 | ||
blockLayerMockup <- runBlockLayerMockup 2 consecutiveBlocks 2 | ||
|
||
initialHeaderData <- newIORef $ BlocksConsumed [] | ||
threadID <- forkIO $ blockSyncerBracket initialHeaderData blockLayerMockup 2 | ||
threadDelay (1*1000*1000) | ||
throwTo threadID ExitSuccess | ||
|
||
it "the bracket releases itself as expected when its thread is killed" $ do | ||
consecutiveBlocks <- liftIO $ mkConsecutiveTestBlocks 1 | ||
blockLayerMockup <- runBlockLayerMockup 2 consecutiveBlocks 2 | ||
|
||
initialHeaderData <- newIORef $ BlocksConsumed [] | ||
threadID <- forkIO $ blockSyncerBracket initialHeaderData blockLayerMockup 2 | ||
threadDelay (1*1000*1000) | ||
killThread threadID | ||
|
||
it "when the block polling's ticking function is 5x more frequent" $ do | ||
testBlocksPolling 10 1 5 | ||
it "when the block polling's ticking function is 2x more frequent" $ do | ||
testBlocksPolling 10 1 2 | ||
it "when the block polling's ticking function and mockup ticking is are the same" $ do | ||
testBlocksPolling 20 1 1 | ||
it "when the mockup ticking is 5x more frequent" $ do | ||
testBlocksPolling 10 5 1 | ||
it "when the mockup ticking is 2x more frequent" $ do | ||
testBlocksPolling 10 2 1 | ||
|
||
where | ||
testBlocksPolling | ||
:: Int | ||
-> Int | ||
-> Int | ||
-> IO () | ||
testBlocksPolling numberOfBlocks mockupPeriod blockTickerPeriod = do | ||
consecutiveBlocks <- liftIO $ mkConsecutiveTestBlocks numberOfBlocks | ||
let testTimeEvalInSec = numberOfBlocks*mockupPeriod + 2*blockTickerPeriod | ||
blockLayerMockup <- runBlockLayerMockup mockupPeriod consecutiveBlocks testTimeEvalInSec | ||
initialHeaderData <- newIORef $ BlocksConsumed [] | ||
threadID <- forkIO $ blockSyncerBracket initialHeaderData blockLayerMockup blockTickerPeriod | ||
threadDelay (testTimeEvalInSec*1000*1000) | ||
killThread threadID | ||
(BlocksConsumed finalHeaderHashes) <- readIORef initialHeaderData | ||
finalHeaderHashes `shouldBe` (map fst consecutiveBlocks) | ||
|
||
|
||
mkConsecutiveTestBlocks | ||
:: Int | ||
-> IO [((Hash "BlockHeader"),Block)] | ||
mkConsecutiveTestBlocks blockNum = | ||
loop blockNum [] | ||
where | ||
loop | ||
:: Int | ||
-> [((Hash "BlockHeader"),Block)] | ||
-> IO [((Hash "BlockHeader"),Block)] | ||
loop blockNumToGo res = do | ||
let bytelistGenerator = pack <$> vector 10 :: Gen ByteString | ||
if blockNumToGo <= 0 then | ||
return res | ||
else | ||
case res of | ||
[] -> do | ||
lastBlockHeader <- Hash <$> generate bytelistGenerator | ||
theBlockHeader <- Hash <$> generate bytelistGenerator | ||
let theEpoch = 0 | ||
let theSlot = 1 | ||
let theBlock = Block (BlockHeader theEpoch theSlot lastBlockHeader) Set.empty | ||
loop (blockNumToGo - 1) [(theBlockHeader, theBlock)] | ||
(lastBlockHeader, Block (BlockHeader lastEpoch lastSlot _) _ ):_ -> do | ||
let theBlock = Block (BlockHeader lastEpoch (lastSlot + 1) lastBlockHeader) Set.empty | ||
theBlockHeader <- Hash <$> generate bytelistGenerator | ||
loop (blockNumToGo - 1) $ (theBlockHeader, theBlock):res |