-
Notifications
You must be signed in to change notification settings - Fork 86
/
Parser.hs
264 lines (243 loc) · 10.2 KB
/
Parser.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- | The ImmutableDB doesn't care about the serialisation format, but in
-- practice we use CBOR. If we were to change the serialisation format, we
-- would have to write a new 'EpochFileParser' implementation, but the rest of
-- the ImmutableDB would be unaffected.
module Ouroboros.Storage.ImmutableDB.Parser
( -- * EpochFileParser
EpochFileError (..)
, epochFileParser
, epochFileParser'
) where
import Codec.CBOR.Decoding (Decoder)
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as BL
import Data.Functor ((<&>))
import Data.Word (Word64)
import Streaming (Of, Stream)
import qualified Streaming as S
import qualified Streaming.Prelude as S
import Ouroboros.Network.Block (ChainHash (..), HasHeader (..),
HeaderHash, SlotNo)
import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Consensus.Util.CBOR as Util.CBOR
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Storage.Common
import Ouroboros.Storage.FS.API (HasFS)
import Ouroboros.Storage.FS.CRC
import qualified Ouroboros.Storage.ImmutableDB.Impl.Index.Secondary as Secondary
import Ouroboros.Storage.ImmutableDB.Types
{-------------------------------------------------------------------------------
EpochFileParser
-------------------------------------------------------------------------------}
data EpochFileError hash =
EpochErrRead Util.CBOR.ReadIncrementalErr
-- | The previous hash of a block did not match the hash of the previous
-- block.
| EpochErrHashMismatch
(WithOrigin hash) -- ^ The hash of the previous block
(WithOrigin hash) -- ^ The previous hash of the block
-- | The integrity verification of the block with the given hash and
-- 'BlockOrEBB' number returned 'False', indicating that the block got
-- corrupted.
| EpochErrCorrupt hash BlockOrEBB
-- | The block has a slot number equal to or greater than the current slot
-- (wall clock). This block is in the future, so we must truncate it.
| EpochErrFutureBlock
SlotNo -- ^ Current slot (wall clock)
SlotNo -- ^ Slot number of the block
deriving (Eq, Show)
epochFileParser'
:: forall m blk hash h. (IOLike m, Eq hash)
=> (blk -> SlotNo)
-> (blk -> hash)
-> (blk -> WithOrigin hash) -- ^ Previous hash
-> HasFS m h
-> (forall s. Decoder s (BL.ByteString -> blk))
-> (blk -> Maybe EpochNo) -- ^ If an EBB, return the epoch number
-> (blk -> BinaryInfo ())
-> (blk -> Bool) -- ^ Check integrity of the block. 'False' =
-- corrupt.
-> EpochFileParser
(EpochFileError hash)
m
(Secondary.Entry hash)
hash
epochFileParser' getSlotNo getHash getPrevHash hasFS decodeBlock isEBB
getBinaryInfo isNotCorrupt =
EpochFileParser $ \fsPath currentSlotNo expectedChecksums k ->
Util.CBOR.withStreamIncrementalOffsets hasFS decoder fsPath
( k
. checkIfHashesLineUp
. checkEntries expectedChecksums
. checkFutureSlot currentSlotNo
. fmap (fmap (first EpochErrRead))
)
where
decoder :: forall s. Decoder s (BL.ByteString -> (blk, CRC))
decoder = decodeBlock <&> \mkBlk bs ->
let !blk = mkBlk bs
!checksum = computeCRC bs
in (blk, checksum)
-- | Stop when a block has slot number >= the current slot, return
-- 'EpochErrFutureBlock'.
checkFutureSlot
:: SlotNo -- ^ Current slot (wall clock).
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
checkFutureSlot currentSlotNo = mapS $ \x@(offset, (_, (blk, _))) ->
if getSlotNo blk >= currentSlotNo
then Left $ Just (EpochErrFutureBlock currentSlotNo (getSlotNo blk), offset)
else Right x
-- | Go over the expected checksums and blocks in parallel. Stop with an
-- error when a block is corrupt. Yield correct entries along the way.
--
-- If there's an expected checksum and it matches the block's checksum,
-- then the block is correct. Continue with the next.
--
-- If they do not match or if there's no expected checksum in the stream,
-- check the integrity of the block (expensive). When corrupt, stop
-- parsing blocks and return an error that the block is corrupt. When not
-- corrupt, continue with the next.
checkEntries
:: [CRC]
-- ^ Expected checksums
-> Stream (Of (Word64, (Word64, (blk, CRC))))
m
(Maybe (EpochFileError hash, Word64))
-- ^ Input stream of blocks (with additional info)
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkEntries = \expected -> mapAccumS expected handle
where
handle
:: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either (Maybe (EpochFileError hash, Word64))
( (Secondary.Entry hash, WithOrigin hash)
, [CRC]
)
handle expected blkAndInfo@(offset, (_, (blk, checksum))) =
case expected of
expectedChecksum:expected'
| expectedChecksum == checksum
-> Right (entryAndPrevHash, expected')
-- No expected entry or a mismatch
_ | isNotCorrupt blk
-- The (expensive) integrity check passed, so continue
-> Right (entryAndPrevHash, drop 1 expected)
| otherwise
-- The block is corrupt, stop
-> Left $ Just (EpochErrCorrupt headerHash blockOrEBB, offset)
where
entryAndPrevHash@(actualEntry, _) =
entryForBlockAndInfo blkAndInfo
Secondary.Entry { headerHash, blockOrEBB } = actualEntry
entryForBlockAndInfo
:: (Word64, (Word64, (blk, CRC)))
-> (Secondary.Entry hash, WithOrigin hash)
entryForBlockAndInfo (offset, (_size, (blk, checksum))) = (entry, prevHash)
where
-- Don't accidentally hold on to the block!
!prevHash = getPrevHash blk
!entry = Secondary.Entry
{ blockOffset = Secondary.BlockOffset offset
, headerOffset = Secondary.HeaderOffset headerOffset
, headerSize = Secondary.HeaderSize headerSize
, checksum = checksum
, headerHash = getHash blk
, blockOrEBB = case isEBB blk of
Just epoch -> EBB epoch
Nothing -> Block (getSlotNo blk)
}
BinaryInfo { headerOffset, headerSize } = getBinaryInfo blk
checkIfHashesLineUp
:: Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
-> Stream (Of (Secondary.Entry hash, WithOrigin hash))
m
(Maybe (EpochFileError hash, Word64))
checkIfHashesLineUp = mapAccumS0 checkFirst checkNext
where
-- We pass the hash of the previous block around as the state (@s@).
checkFirst x@(entry, _) = Right (x, Secondary.headerHash entry)
checkNext hashOfPrevBlock x@(entry, prevHash)
| prevHash == At hashOfPrevBlock
= Right (x, Secondary.headerHash entry)
| otherwise
= Left (Just (err, offset))
where
err = EpochErrHashMismatch (At hashOfPrevBlock) prevHash
offset = Secondary.unBlockOffset $ Secondary.blockOffset entry
-- | A version of 'epochFileParser'' for blocks that implement 'HasHeader'.
epochFileParser
:: forall m blk h. (IOLike m, HasHeader blk)
=> HasFS m h
-> (forall s. Decoder s (BL.ByteString -> blk))
-> (blk -> Maybe EpochNo) -- ^ If an EBB, return the epoch number
-> (blk -> BinaryInfo ())
-> (blk -> Bool) -- ^ Check integrity of the block. 'False' =
-- corrupt.
-> EpochFileParser
(EpochFileError (HeaderHash blk))
m
(Secondary.Entry (HeaderHash blk))
(HeaderHash blk)
epochFileParser =
epochFileParser' blockSlot blockHash (convertPrevHash . blockPrevHash)
where
convertPrevHash :: ChainHash blk -> WithOrigin (HeaderHash blk)
convertPrevHash GenesisHash = Origin
convertPrevHash (BlockHash h) = At h
{-------------------------------------------------------------------------------
Streaming utilities
-------------------------------------------------------------------------------}
-- | Thread some state through a 'Stream'. An early return is possible by
-- returning 'Left'.
mapAccumS
:: Monad m
=> s -- ^ Initial state
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS st0 handle = go st0
where
go st input = S.lift (S.next input) >>= \case
Left r -> return r
Right (a, input') -> case handle st a of
Left r -> return r
Right (b, st') -> S.yield b *> go st' input'
-- | Variant of 'mapAccumS' that calls the first function argument on the
-- first element in the stream to construct the initial state. For all
-- elements in the stream after the first one, the second function argument is
-- used.
mapAccumS0
:: forall m a b r s. Monad m
=> (a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 handleFirst handleNext = mapAccumS Nothing handle
where
handle :: Maybe s -> a -> Either r (b, Maybe s)
handle mbSt = fmap (fmap Just) . maybe handleFirst handleNext mbSt
-- | Map over elements of a stream, allowing an early return by returning
-- 'Left'.
mapS
:: Monad m
=> (a -> Either r b)
-> Stream (Of a) m r
-> Stream (Of b) m r
mapS handle = mapAccumS () (\() a -> (, ()) <$> handle a)