-
Notifications
You must be signed in to change notification settings - Fork 20
/
LgrDB.hs
435 lines (403 loc) · 16.4 KB
/
LgrDB.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
-- | Thin wrapper around the LedgerDB
module Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB (
LgrDB
-- opaque
, LedgerDB'
, LgrDbSerialiseConstraints
-- * Initialization
, LgrDbArgs (..)
, defaultArgs
, openDB
-- * 'TraceReplayEvent' decorator
, LedgerDB.decorateReplayTracerWithGoal
-- * Wrappers
, currentPoint
, getCurrent
, getDiskPolicy
, setCurrent
, takeSnapshot
, trimSnapshots
-- * Validation
, ValidateResult (..)
, validate
-- * Previously applied blocks
, garbageCollectPrevApplied
, getPrevApplied
-- * Re-exports
, LedgerDB.AnnLedgerError (..)
, LedgerDB.DiskPolicy (..)
, LedgerDB.DiskSnapshot
, LedgerDB.ExceededRollback (..)
, LedgerDB.TraceReplayEvent (..)
, LedgerDB.TraceSnapshotEvent (..)
, LedgerDB.ledgerDbCurrent
-- * Exported for testing purposes
, mkLgrDB
) where
import Codec.Serialise (Serialise (decode))
import Control.Monad.Trans.Class
import Control.Tracer
import Data.Foldable (foldl')
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Word (Word64)
import GHC.Generics (Generic)
import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDbFailure (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache
(BlockCache)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache
import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Storage.LedgerDB (LedgerDB')
import qualified Ouroboros.Consensus.Storage.LedgerDB as LedgerDB
import Ouroboros.Consensus.Storage.Serialisation
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import System.FS.API (SomeHasFS (..), createDirectoryIfMissing)
import System.FS.API.Types (FsError, mkFsPath)
-- | Thin wrapper around the ledger database
data LgrDB m blk = LgrDB {
varDB :: !(StrictTVar m (LedgerDB' blk))
-- ^ INVARIANT: the tip of the 'LedgerDB' is always in sync with the tip
-- of the current chain of the ChainDB.
, varPrevApplied :: !(StrictTVar m (Set (RealPoint blk)))
-- ^ INVARIANT: this set contains only points that are in the
-- VolatileDB.
--
-- INVARIANT: all points on the current chain fragment are in this set.
--
-- The VolatileDB might contain invalid blocks, these will not be in
-- this set.
--
-- When a garbage-collection is performed on the VolatileDB, the points
-- of the blocks eligible for garbage-collection should be removed from
-- this set.
, resolveBlock :: !(RealPoint blk -> m blk)
-- ^ Read a block from disk
, cfg :: !(TopLevelConfig blk)
, diskPolicy :: !LedgerDB.DiskPolicy
, hasFS :: !(SomeHasFS m)
, tracer :: !(Tracer m (LedgerDB.TraceSnapshotEvent blk))
} deriving (Generic)
deriving instance (IOLike m, LedgerSupportsProtocol blk)
=> NoThunks (LgrDB m blk)
-- use generic instance
-- | 'EncodeDisk' and 'DecodeDisk' constraints needed for the LgrDB.
type LgrDbSerialiseConstraints blk =
( Serialise (HeaderHash blk)
, EncodeDisk blk (LedgerState blk)
, DecodeDisk blk (LedgerState blk)
, EncodeDisk blk (AnnTip blk)
, DecodeDisk blk (AnnTip blk)
, EncodeDisk blk (ChainDepState (BlockProtocol blk))
, DecodeDisk blk (ChainDepState (BlockProtocol blk))
)
{-------------------------------------------------------------------------------
Initialization
-------------------------------------------------------------------------------}
data LgrDbArgs f m blk = LgrDbArgs {
lgrDiskPolicyArgs :: LedgerDB.DiskPolicyArgs
, lgrGenesis :: HKD f (m (ExtLedgerState blk))
, lgrHasFS :: SomeHasFS m
, lgrTopLevelConfig :: HKD f (TopLevelConfig blk)
, lgrTraceLedger :: Tracer m (LedgerDB' blk)
, lgrTracer :: Tracer m (LedgerDB.TraceSnapshotEvent blk)
}
-- | Default arguments
defaultArgs ::
Applicative m
=> SomeHasFS m
-> LgrDbArgs Defaults m blk
defaultArgs lgrHasFS = LgrDbArgs {
lgrDiskPolicyArgs = LedgerDB.defaultDiskPolicyArgs
, lgrGenesis = NoDefault
, lgrHasFS
, lgrTopLevelConfig = NoDefault
, lgrTraceLedger = nullTracer
, lgrTracer = nullTracer
}
-- | Open the ledger DB
--
-- In addition to the ledger DB also returns the number of immutable blocks
-- that were replayed.
openDB :: forall m blk.
( IOLike m
, LedgerSupportsProtocol blk
, LgrDbSerialiseConstraints blk
, InspectLedger blk
, HasCallStack
)
=> LgrDbArgs Identity m blk
-- ^ Stateless initializaton arguments
-> Tracer m (LedgerDB.ReplayGoal blk -> LedgerDB.TraceReplayEvent blk)
-- ^ Used to trace the progress while replaying blocks against the
-- ledger.
-> ImmutableDB m blk
-- ^ Reference to the immutable DB
--
-- After reading a snapshot from disk, the ledger DB will be brought
-- up to date with tip of the immutable DB. The corresponding ledger
-- state can then be used as the starting point for chain selection in
-- the ChainDB driver.
-> (RealPoint blk -> m blk)
-- ^ Read a block from disk
--
-- The block may be in the immutable DB or in the volatile DB; the ledger
-- DB does not know where the boundary is at any given point.
-> m (LgrDB m blk, Word64)
openDB args@LgrDbArgs { lgrHasFS = lgrHasFS@(SomeHasFS hasFS), .. } replayTracer immutableDB getBlock = do
createDirectoryIfMissing hasFS True (mkFsPath [])
(db, replayed) <- initFromDisk args replayTracer immutableDB
-- When initializing the ledger DB from disk we:
--
-- - Look for the newest valid snapshot, say 'Lbs', which corresponds to the
-- application of a block in the immutable DB, say 'b'.
--
-- - Push onto the ledger DB all the ledger states that result from applying
-- blocks found in the on-disk immutable DB, starting from the successor
-- of 'b'.
--
-- The anchor of 'LedgerDB' must be the oldest point we can rollback to. So
-- if we follow the procedure described above (that 'initFromDisk'
-- implements), the newest ledger state in 'db', say 'Lbn' corresponds to
-- the most recent block in the immutable DB. If this block is in the
-- immutable DB, it means that at some point it was part of a chain that was
-- >k blocks long. Thus 'Lbn' is the oldest point we can roll back to.
-- Therefore, we need to make the newest state (current) of the ledger DB
-- the anchor.
let dbPrunedToImmDBTip = LedgerDB.ledgerDbPrune (SecurityParam 0) db
(varDB, varPrevApplied) <-
(,) <$> newTVarIO dbPrunedToImmDBTip <*> newTVarIO Set.empty
return (
LgrDB {
varDB = varDB
, varPrevApplied = varPrevApplied
, resolveBlock = getBlock
, cfg = lgrTopLevelConfig
, diskPolicy = let k = configSecurityParam lgrTopLevelConfig
in LedgerDB.mkDiskPolicy k lgrDiskPolicyArgs
, hasFS = lgrHasFS
, tracer = lgrTracer
}
, replayed
)
initFromDisk ::
forall blk m.
( IOLike m
, LedgerSupportsProtocol blk
, LgrDbSerialiseConstraints blk
, InspectLedger blk
, HasCallStack
)
=> LgrDbArgs Identity m blk
-> Tracer m (LedgerDB.ReplayGoal blk -> LedgerDB.TraceReplayEvent blk)
-> ImmutableDB m blk
-> m (LedgerDB' blk, Word64)
initFromDisk LgrDbArgs { lgrHasFS = hasFS, .. }
replayTracer
immutableDB = wrapFailure (Proxy @blk) $ do
(_initLog, db, replayed) <-
LedgerDB.initLedgerDB
replayTracer
lgrTracer
hasFS
(decodeDiskExtLedgerState ccfg)
decode
(LedgerDB.configLedgerDb lgrTopLevelConfig)
lgrGenesis
(streamAPI immutableDB)
return (db, replayed)
where
ccfg = configCodec lgrTopLevelConfig
-- | For testing purposes
mkLgrDB :: StrictTVar m (LedgerDB' blk)
-> StrictTVar m (Set (RealPoint blk))
-> (RealPoint blk -> m blk)
-> LgrDbArgs Identity m blk
-> SecurityParam
-> LgrDB m blk
mkLgrDB varDB varPrevApplied resolveBlock args k = LgrDB {..}
where
LgrDbArgs {
lgrTopLevelConfig = cfg
, lgrDiskPolicyArgs = diskPolicyArgs
, lgrHasFS = hasFS
, lgrTracer = tracer
} = args
diskPolicy = LedgerDB.mkDiskPolicy k diskPolicyArgs
{-------------------------------------------------------------------------------
Wrappers
-------------------------------------------------------------------------------}
getCurrent :: IOLike m => LgrDB m blk -> STM m (LedgerDB' blk)
getCurrent LgrDB{..} = readTVar varDB
-- | PRECONDITION: The new 'LedgerDB' must be the result of calling either
-- 'LedgerDB.ledgerDbSwitch' or 'LedgerDB.ledgerDbPushMany' on the current
-- 'LedgerDB'.
setCurrent :: IOLike m => LgrDB m blk -> LedgerDB' blk -> STM m ()
setCurrent LgrDB{..} = writeTVar $! varDB
currentPoint :: forall blk. UpdateLedger blk => LedgerDB' blk -> Point blk
currentPoint = castPoint
. ledgerTipPoint
. ledgerState
. LedgerDB.ledgerDbCurrent
takeSnapshot ::
forall m blk.
( IOLike m
, LgrDbSerialiseConstraints blk
, HasHeader blk
, IsLedger (LedgerState blk)
)
=> LgrDB m blk -> m (Maybe (LedgerDB.DiskSnapshot, RealPoint blk))
takeSnapshot lgrDB@LgrDB{ cfg, tracer, hasFS } = wrapFailure (Proxy @blk) $ do
ledgerDB <- LedgerDB.ledgerDbAnchor <$> atomically (getCurrent lgrDB)
LedgerDB.takeSnapshot
tracer
hasFS
(encodeDiskExtLedgerState ccfg)
ledgerDB
where
ccfg = configCodec cfg
trimSnapshots ::
forall m blk. (MonadCatch m, HasHeader blk)
=> LgrDB m blk
-> m [LedgerDB.DiskSnapshot]
trimSnapshots LgrDB { diskPolicy, tracer, hasFS } = wrapFailure (Proxy @blk) $
LedgerDB.trimSnapshots tracer hasFS diskPolicy
getDiskPolicy :: LgrDB m blk -> LedgerDB.DiskPolicy
getDiskPolicy = diskPolicy
{-------------------------------------------------------------------------------
Validation
-------------------------------------------------------------------------------}
data ValidateResult blk =
ValidateSuccessful (LedgerDB' blk)
| ValidateLedgerError (LedgerDB.AnnLedgerError' blk)
| ValidateExceededRollBack LedgerDB.ExceededRollback
validate :: forall m blk. (IOLike m, LedgerSupportsProtocol blk, HasCallStack)
=> LgrDB m blk
-> LedgerDB' blk
-- ^ This is used as the starting point for validation, not the one
-- in the 'LgrDB'.
-> BlockCache blk
-> Word64 -- ^ How many blocks to roll back
-> (LedgerDB.UpdateLedgerDbTraceEvent blk -> m ())
-> [Header blk]
-> m (ValidateResult blk)
validate LgrDB{..} ledgerDB blockCache numRollbacks trace = \hdrs -> do
aps <- mkAps hdrs <$> atomically (readTVar varPrevApplied)
res <- fmap rewrap $ LedgerDB.defaultResolveWithErrors resolveBlock $
LedgerDB.ledgerDbSwitch
(LedgerDB.configLedgerDb cfg)
numRollbacks
(lift . lift . trace)
aps
ledgerDB
atomically $ modifyTVar varPrevApplied $
addPoints (validBlockPoints res (map headerRealPoint hdrs))
return res
where
rewrap :: Either (LedgerDB.AnnLedgerError' blk) (Either LedgerDB.ExceededRollback (LedgerDB' blk))
-> ValidateResult blk
rewrap (Left e) = ValidateLedgerError e
rewrap (Right (Left e)) = ValidateExceededRollBack e
rewrap (Right (Right l)) = ValidateSuccessful l
mkAps :: forall n l. l ~ ExtLedgerState blk
=> [Header blk]
-> Set (RealPoint blk)
-> [LedgerDB.Ap n l blk ( LedgerDB.ResolvesBlocks n blk
, LedgerDB.ThrowsLedgerError n l blk
)]
mkAps hdrs prevApplied =
[ case ( Set.member (headerRealPoint hdr) prevApplied
, BlockCache.lookup (headerHash hdr) blockCache
) of
(False, Nothing) -> LedgerDB.ApplyRef (headerRealPoint hdr)
(True, Nothing) -> LedgerDB.Weaken $ LedgerDB.ReapplyRef (headerRealPoint hdr)
(False, Just blk) -> LedgerDB.Weaken $ LedgerDB.ApplyVal blk
(True, Just blk) -> LedgerDB.Weaken $ LedgerDB.ReapplyVal blk
| hdr <- hdrs
]
-- | Based on the 'ValidateResult', return the hashes corresponding to
-- valid blocks.
validBlockPoints :: ValidateResult blk -> [RealPoint blk] -> [RealPoint blk]
validBlockPoints = \case
ValidateExceededRollBack _ -> const []
ValidateSuccessful _ -> id
ValidateLedgerError e -> takeWhile (/= LedgerDB.annLedgerErrRef e)
addPoints :: [RealPoint blk]
-> Set (RealPoint blk) -> Set (RealPoint blk)
addPoints hs set = foldl' (flip Set.insert) set hs
{-------------------------------------------------------------------------------
Stream API to the immutable DB
-------------------------------------------------------------------------------}
streamAPI ::
forall m blk.
(IOLike m, HasHeader blk)
=> ImmutableDB m blk -> LedgerDB.StreamAPI m blk
streamAPI immutableDB = LedgerDB.StreamAPI streamAfter
where
streamAfter :: HasCallStack
=> Point blk
-> (Either (RealPoint blk) (m (LedgerDB.NextBlock blk)) -> m a)
-> m a
streamAfter tip k = withRegistry $ \registry -> do
eItr <-
ImmutableDB.streamAfterPoint
immutableDB
registry
GetBlock
tip
case eItr of
-- Snapshot is too recent
Left err -> k $ Left $ ImmutableDB.missingBlockPoint err
Right itr -> k $ Right $ streamUsing itr
streamUsing :: ImmutableDB.Iterator m blk blk -> m (LedgerDB.NextBlock blk)
streamUsing itr = ImmutableDB.iteratorNext itr >>= \case
ImmutableDB.IteratorExhausted -> return $ LedgerDB.NoMoreBlocks
ImmutableDB.IteratorResult blk -> return $ LedgerDB.NextBlock blk
{-------------------------------------------------------------------------------
Previously applied blocks
-------------------------------------------------------------------------------}
getPrevApplied :: IOLike m => LgrDB m blk -> STM m (Set (RealPoint blk))
getPrevApplied LgrDB{..} = readTVar varPrevApplied
-- | Remove all points with a slot older than the given slot from the set of
-- previously applied points.
garbageCollectPrevApplied :: IOLike m => LgrDB m blk -> SlotNo -> STM m ()
garbageCollectPrevApplied LgrDB{..} slotNo = modifyTVar varPrevApplied $
Set.dropWhileAntitone ((< slotNo) . realPointSlot)
{-------------------------------------------------------------------------------
Error handling
-------------------------------------------------------------------------------}
-- | Wrap exceptions that may indicate disk failure in a 'ChainDbFailure'
-- exception using the 'LgrDbFailure' constructor.
wrapFailure ::
forall m x blk. (MonadCatch m, HasHeader blk)
=> Proxy blk
-> m x
-> m x
wrapFailure _ k = catch k rethrow
where
rethrow :: FsError -> m x
rethrow err = throwIO $ LgrDbFailure @blk err