Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
duog committed Jan 18, 2022
1 parent 8602880 commit ad02b36
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import Data.ByteString.Short.Base64 (encodeBase64)
import qualified Data.ByteString.Lazy.Char8 as Char8
import Data.ByteString.Short (ShortByteString)
import qualified Data.ByteString.Short as Short
import Data.Coerce
import Data.Foldable(for_)
import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as IntMap
Expand All @@ -35,6 +36,7 @@ import TxIn.Types
import TxIn.GenesisUTxO
import qualified TxIn.UTxODb as UTxODb
import qualified UTxODb.InMemory as UTxODb
import qualified UTxODb.Pipeline as UTxODb
import qualified UTxODb.Snapshots as UTxODb
import qualified UTxODb.Haskey.Db as UTxODb

Expand Down Expand Up @@ -421,10 +423,10 @@ measureAge =

utxodbInMemSim :: [Row] -> IO ()
utxodbInMemSim rows = do
let init_seq_no = UTxODb.SeqNo (-1)
let init_seq_no = UTxODb.SeqNo (-2)
db <- UTxODb.initTVarDb init_seq_no

init_ls <- UTxODb.addTxIns db genesisUTxO $ UTxODb.initLedgerState init_seq_no
init_ls <- UTxODb.addTxIns db genesisUTxO (UTxODb.SeqNo (-1)) $ UTxODb.initLedgerState init_seq_no
flip Strict.execStateT init_ls $ for_ (filterRowsForEBBs rows) $ \r -> do
ls0 <- Strict.get
ls <- liftIO (UTxODb.addRow db r (UTxODb.injectTables UTxODb.emptyTables ls0))
Expand All @@ -435,13 +437,18 @@ utxodbHaskeySim :: Maybe (IO UTxODb.HaskeyBackend) -> [Row] -> IO ()
utxodbHaskeySim mb_hb rows = do
hb <- fromMaybe (error "No Haskey Backend") mb_hb

let init_seq_no = UTxODb.SeqNo (-1)
let init_seq_no = UTxODb.SeqNo (-2)
db <- UTxODb.openHaskeyDb init_seq_no hb
init_ls <- UTxODb.addTxIns db genesisUTxO $ UTxODb.initLedgerState init_seq_no
flip Strict.execStateT init_ls $ for_ (filterRowsForEBBs rows) $ \r -> do
ls0 <- Strict.get
ls <- liftIO (UTxODb.addRow db r (UTxODb.injectTables UTxODb.emptyTables ls0))
Strict.put ls
init_ls <- UTxODb.addTxIns db genesisUTxO (UTxODb.SeqNo (-1)) $ UTxODb.initLedgerState init_seq_no
let
get_keys r = let
keyset = UTxODb.consumed . UTxODb.keysForRow $ r
in UTxODb.OnDiskLedgerState { UTxODb.od_utxo = UTxODb.TableKeySet keyset }
UTxODb.runPipeline db init_ls 10 (filterRowsForEBBs rows) get_keys UTxODb.rowOp
-- flip Strict.execStateT init_ls $ for_ (filterRowsForEBBs rows) $ \r -> do
-- ls0 <- Strict.get
-- ls <- liftIO (UTxODb.addRow db r (UTxODb.injectTables UTxODb.emptyTables ls0))
-- Strict.put ls
pure ()

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Data.Word (Word32, Word64)
import qualified Data.Vector as V
import GHC.Generics (Generic())
import Data.Binary (Binary())
import Data.Foldable(toList)

import qualified Data.BTree.Primitives.Value as Haskey(Value(..))
import qualified Data.BTree.Primitives.Key as Haskey(Key(..))
Expand All @@ -29,7 +30,7 @@ instance Haskey.Key TxIn


data TxOutputIds = TxOutputIds !ShortByteString !Word32 -- count
deriving (Eq, Ord)
deriving (Eq, Ord, Show)

outputTxIns :: TxOutputIds -> [TxIn]
outputTxIns (TxOutputIds h n) = [ TxIn h (i - 1) | i <- [1 .. n] ]
Expand All @@ -40,7 +41,7 @@ data Row = Row {
, rNumTx :: {-# UNPACK #-} !Int
, rConsumed :: {-# UNPACK #-} !(V.Vector TxIn)
, rCreated :: {-# UNPACK #-} !(V.Vector TxOutputIds)
}
} deriving (Show)

{-
for some slots in the Byron era there is an addtional psuedo block demarcates the
Expand All @@ -49,10 +50,13 @@ It never contains any transactions
-}
filterRowsForEBBs :: [Row] -> [Row]
filterRowsForEBBs = go Nothing where
go _ [] = []
go Nothing (x: xs) = x: go (Just x) xs
go mb_x [] = toList mb_x
go Nothing (x: xs) = x : go (Just x) xs
go (Just x) (y: ys)
| rBlockNumber x == rBlockNumber y = if rNumTx y /= 0
then error "EBB block has transactions"
else go (Just x) ys
else go Nothing ys
| rSlotNumber x == rSlotNumber y = if rNumTx y /= 0
then error "EBB block has transactions"
else go Nothing ys
| otherwise = y : go (Just y) ys
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,13 @@ import qualified UTxODb.Haskey.Tree as HaskeyDb
data LedgerState table = LedgerState
{ utxo :: !(table Db.TableTypeRW TxIn Bool)
, seq_no :: !Int64
, seq_no_offset :: !Int64
}

instance Db.HasSeqNo LedgerState where
stateSeqNo LedgerState{seq_no, seq_no_offset} = coerce (seq_no + seq_no_offset)
stateSeqNo LedgerState{seq_no} = coerce seq_no

initLedgerState :: Db.SeqNo LedgerState -> LedgerState Db.EmptyTable
initLedgerState sn = LedgerState { seq_no = coerce sn, seq_no_offset = 0, utxo = Db.EmptyTable }
initLedgerState sn = LedgerState { seq_no = coerce sn, utxo = Db.EmptyTable }

instance Db.HasTables LedgerState where
type StateTableKeyConstraint LedgerState = All
Expand Down Expand Up @@ -125,16 +124,13 @@ ledgerRules r ls@LedgerState{utxo = utxo0, seq_no = old_seq_no} = do
in throwM $ LedgerRulesException message
let
new_seq_no = fromIntegral $ rSlotNumber r
new_ls
| new_seq_no == old_seq_no = ls { utxo = utxo2, seq_no_offset = seq_no_offset ls + 1}
| otherwise = ls { utxo = utxo2, seq_no = new_seq_no}
-- unless (old_seq_no < new_seq_no) $ throwM $ LedgerRulesException $ unwords ["nonmonotonic slot no:", show old_seq_no, ">", show new_seq_no]
new_ls = ls { utxo = utxo2, seq_no = new_seq_no}
unless (old_seq_no < new_seq_no) $ throwM $ LedgerRulesException $ unwords ["nonmonotonic slot no:", show old_seq_no, ">", show new_seq_no]
pure new_ls


addTxIns :: Db.DiskDb dbhandle LedgerState => dbhandle -> Set TxIn -> LedgerState Db.EmptyTable -> IO (LedgerState Db.EmptyTable)
addTxIns handle txins ls0 = do
putStrLn $ "addTxIns: " <> show (length txins)
addTxIns :: Db.DiskDb dbhandle LedgerState => dbhandle -> Set TxIn -> Db.SeqNo state -> LedgerState Db.EmptyTable -> IO (LedgerState Db.EmptyTable)
addTxIns handle txins new_seq_no ls0 = do
let keyset = OnDiskLedgerState { od_utxo = Db.AnnTable (Db.TableKeySet txins) ()}
tracking_tables <-
Db.annotatedReadsetToTrackingTables <$> Db.readDb handle keyset
Expand All @@ -144,8 +140,7 @@ addTxIns handle txins ls0 = do
go !ls txin = case Db.lookup txin (utxo ls ) of
Nothing -> pure $ ls { utxo = Db.insert txin True (utxo ls) }
Just _ -> throwM $ LedgerRulesException $ "addTxIns: duplicate txin:" <> show txin
wrangle ls = ls { seq_no_offset = seq_no_offset ls + 1 }
in wrangle <$> foldM go init_ls txins
in (\x -> x { seq_no = coerce new_seq_no }) <$> foldM go init_ls txins
let table_diffs = Db.projectTables . Db.trackingTablesToTableDiffs $ ls1
Db.writeDb handle [Left table_diffs] (Db.stateSeqNo ls0) (Db.stateSeqNo ls1)
putStrLn $ "addTxIns: " <> show (length txins)
Expand All @@ -170,3 +165,12 @@ addRow handle r ls0 = do
<> "\t" <> show (Set.size created)

pure $ Db.injectTables Db.emptyTables ls1


rowOp :: Row -> LedgerState Db.TrackingTable -> LedgerState Db.TableDiff
rowOp r ls0 = let
ls1 = case ledgerRules r ls0 of
Nothing -> error "ledgerRules"
Just x -> x
table_diffs = Db.trackingTablesToTableDiffs ls1
in table_diffs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import Data.Foldable
import Data.Traversable
import Data.Maybe
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Map.Strict (Map)
import Data.Coerce
import qualified Data.BTree.Alloc.Class as Haskey
Expand All @@ -51,6 +52,7 @@ import UTxODb.Snapshots
import UTxODb.Haskey.Tree
import Control.Concurrent.MVar
import Data.Functor
import Data.Monoid

data HaskeySideTable state = HaskeySideTable
{ seqId :: !(SeqNo state)
Expand Down Expand Up @@ -107,18 +109,24 @@ readTransaction keysets HaskeyRoot{..} = do
go_lookup tag (AnnTable TableQuery {tqKeySet} x) t =
go_lookup tag (AnnTable (TableKeySet tqKeySet) x) t
go_lookup tag (AnnTable (TableKeySet ks) x) (RO_ODT t) = do
result <- readTree ks t
result <- if Set.null ks
then pure Map.empty
else readTree ks t
let
trs = emptyTableReadSetRO { troPTMap = PMapRO (PMapRep result ks)}
pure $ AnnTable trs (x, seqId)
go_lookup tag (AnnTable (TableKeySet ks) x) (RW_ODT t) = do
result <- readTree ks t
result <- if Set.null ks
then pure Map.empty
else readTree ks t
let
trs = TableReadSetRW (PMapRW (PMapRep result ks))
pure $ AnnTable trs (x, seqId)

go_lookup tag (AnnTable (TableKeySet ks) x) (RWU_ODT t) = do
result <- readTree ks t
result <- if Set.null ks
then pure Map.empty
else readTree ks t
let
trs = TableReadSetRWU (PMapRWU (PMapRep result ks))
pure $ AnnTable trs (x, seqId)
Expand Down Expand Up @@ -180,10 +188,22 @@ writeTransaction changes old_seq new_seq hr@HaskeyRoot{..} = do
st <- Haskey.insert () hst { seqId = new_seq } sideTree
Haskey.commit Nothing hr { sideTree = st, innerTables = x }

keysetSize :: HasTables (Tables state) => AnnTableKeySets state a -> Int
keysetSize = getSum . foldMapTables go where
go _ (AnnTable (TableKeySet ks) _) = Sum $ length ks
go _ (AnnTable TableQuery{tqKeySet} _) = Sum $ length tqKeySet


-- isEmptyChanges :: [Either (TableDiffs state) (TableSnapshots state)] -> Bool
-- isEmptyChanges [] = True
-- isEmptyChanges (Left diffs : xs )= empty_diffs && isEmptyChanges xs where
-- empty_diffs = getSum . foldMapTables go where
-- go TableDiffRO = True
-- go (TableDiffRW (DiffMapRW m)) = Map.null m
-- go (TableDiffRWU (DiffMapRWU m)) = Map.null m
-- isEmptyChanges _ = False
instance HasHaskeyOnDiskTables state => DiskDb (HaskeyDb state) state where
readDb HaskeyDb {hdbBackend, hdbRoot} keysets =
runHaskeyBackend hdbBackend $ Haskey.transactReadOnly (readTransaction keysets) hdbRoot
readDb HaskeyDb {hdbBackend, hdbRoot} keysets = runHaskeyBackend hdbBackend $ Haskey.transactReadOnly (readTransaction keysets) hdbRoot
writeDb HaskeyDb {..} changes old_seq new_seq = do
mb_e <- runHaskeyBackend hdbBackend $ Haskey.transact (writeTransaction changes old_seq new_seq) hdbRoot
for_ mb_e throwM
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- |

module UTxODb.Pipeline where

import UTxODb.Snapshots hiding (Seq)
import Control.Concurrent.Async
import Control.Concurrent.STM (TVar)
import Control.Monad.STM (atomically)
import Control.Concurrent.STM.TVar (readTVar)
import Control.Concurrent.STM.TVar (newTVarIO)
import Control.Concurrent.STM.TVar (writeTVar)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Control.Concurrent.STM (stateTVar, newTBQueueIO, readTBQueue, writeTBQueue)
import Data.Maybe
import Control.Monad
import Data.Foldable
import Control.Monad.Catch
import Data.Either

data Pipeline handle state = Pipeline
{ db :: !handle
, changelog :: !(TVar (DbChangelog state))
, flush_seq :: !(TVar (Seq (SeqNo state)))
, rollback_window :: !Int
, flush_window :: !Int
}

newtype ReadHandle state = ReadHandle (Async (AnnTableReadSets state (KeySetSanityInfo state, SeqNo state)))

prepare :: (DiskDb handle state, HasOnDiskTables state) => Pipeline handle state -> TableKeySets state -> IO (ReadHandle state)
prepare Pipeline{db, changelog} keyset = do
a <- async $ do
ann_tks <- atomically $ do
cl <- readTVar changelog
pure $ rewindTableKeySets cl keyset
r <- readDb db ann_tks
pure r

pure $ ReadHandle a


submit :: forall handle state a. (DiskDb handle state, HasOnDiskTables state, HasSeqNo state)
=> Pipeline handle state
-> ReadHandle state
-> (state TrackingTable -> (state TableDiff, a))
-> IO a
submit Pipeline{changelog, flush_seq, rollback_window, flush_window, db} (ReadHandle a) op = do
readsets <- wait a
(changes_to_flush, old_ondisk_anchor, new_ondisk_anchor, rollback_anchor, new_sn, l, r) <- atomically $ do
cl0 <- readTVar changelog
let
Just tbl_rs = forwardTableReadSets cl0 readsets
s_empty_table = currentStateDbChangeLog cl0
s_tracking_table = readsetToTrackingTables (injectTables tbl_rs s_empty_table)
(s_table_diff, r) = op s_tracking_table
new_sn = stateSeqNo s_table_diff
cl1 :: DbChangelog state
cl1 = extendDbChangelog new_sn s_table_diff Nothing cl0
(mb_new_anchor :: Maybe (SeqNo state), l, should_flush) <- stateTVar flush_seq $ \s0 -> let
s1 = new_sn Seq.<| s0
(s2, to_flush) = Seq.splitAt (rollback_window + flush_window) s1
mb_new_anchor = Seq.lookup rollback_window s2
in ((mb_new_anchor, length s2, not $ null to_flush), s2)
let
cl2 = fromMaybe cl1 $ join $ advanceDbChangelog <$> mb_new_anchor <*> pure cl1
(changes_to_flush, cl3)
| should_flush = flushDbChangelog cl2
| otherwise = ([], cl2)
new_ondisk_anchor = diskAnchorDbChangelog cl3
writeTVar changelog cl3
pure (changes_to_flush, diskAnchorDbChangelog cl0, new_ondisk_anchor, stateAnchorDbChangelog cl3, endOfDbChangelog cl3, l, r)
putStrLn $ unwords ["submit:"
, "new_sn:" , show new_sn
, "changes_to_flush:", show . length $ changes_to_flush
, "old ondisk_anchor:", show old_ondisk_anchor
, "new ondisk_anchor:", show new_ondisk_anchor
, "rollback_anchor:", show rollback_anchor
, "in memory buffer size:", show l
]
unless (null changes_to_flush) $ writeDb db changes_to_flush old_ondisk_anchor new_ondisk_anchor
pure r

runPipeline :: (Show a, HasOnDiskTables state, DiskDb handle state, HasSeqNo state)
=> handle
-> state EmptyTable
-> Int
-> [a]
-> (a -> TableKeySets state)
-> (a -> state TrackingTable -> state TableDiff)
-> IO ()
runPipeline h init_state q inputs get_keys do_op = do
pipeline <- initPipeline h init_state
chan <- newTBQueueIO $ fromIntegral q
let drain_chan = do
x <- atomically $ readTBQueue chan
case x of
Nothing -> pure ()
Just (rh, op) -> do
submit pipeline rh $ (,()) . op
putStrLn "submit finished"
drain_chan

withAsync (drain_chan `catchAll` (putStrLn . show)) $ \drain_handle -> do
for_ inputs $ \i -> do
putStrLn $ "row: " <> show i
rh <- prepare pipeline (get_keys i)
atomically $ writeTBQueue chan $ Just (rh, do_op i)
atomically $ writeTBQueue chan Nothing
wait drain_handle


initPipeline :: (HasOnDiskTables state, HasSeqNo state) => handle -> state EmptyTable -> IO (Pipeline handle state)
initPipeline db init_state = do
changelog <- newTVarIO $ initialDbChangelog (stateSeqNo init_state) init_state
flush_seq <- newTVarIO mempty
let
rollback_window = 1000 -- these are both arbitrary
flush_window = 1000
pure Pipeline{..}
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,13 @@ trackingTablesToTableDiffs = mapTables (const getTableDiff)
annotatedReadsetToTrackingTables :: forall a state. HasTables state => state (AnnTable TableReadSet a) -> state TrackingTable
annotatedReadsetToTrackingTables = mapTables go where
go :: forall t k v. TableTag t v -> AnnTable TableReadSet a t k v -> TrackingTable t k v
go tag (AnnTable t _) = mkTrackingTable t
go _g (AnnTable t _) = mkTrackingTable t

readsetToTrackingTables :: forall state. HasTables state => state TableReadSet -> state TrackingTable
readsetToTrackingTables = mapTables go where
go :: forall t k v. TableTag t v -> TableReadSet t k v -> TrackingTable t k v
go _ t = mkTrackingTable t


type TableKind = TableType -> * -> * -> *
type StateKind = TableKind -> *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ library
TxIn.UTxODb
UTxODb.Snapshots
UTxODb.InMemory
UTxODb.Pipeline
UTxODb.Haskey.Db
UTxODb.Haskey.Tree

Expand All @@ -37,8 +38,8 @@ library
exceptions,
mtl,
haskey-btree,
haskey

haskey,
async

executable txin
hs-source-dirs: app
Expand Down

0 comments on commit ad02b36

Please sign in to comment.