Skip to content

Commit

Permalink
Add a few queries about events
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt committed Apr 1, 2023
1 parent 27d3e9c commit 64998e9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 22 deletions.
2 changes: 2 additions & 0 deletions marconi-core/marconi-core.cabal
Expand Up @@ -67,9 +67,11 @@ library
, base >=4.7 && <5
, contra-tracer
, lens
, mtl
, primitive
, sqlite-simple
, stm
, text
, transformers
, vector

Expand Down
133 changes: 111 additions & 22 deletions marconi-core/src/Marconi/Core/Experiment.hs
Expand Up @@ -16,35 +16,39 @@
- We want to be able to compose easily indexers to build new ones. For example, the original indexer design can be
seen as the combination of two indexers, a full in-memory indexer, and a full in database indexer.
- The original implementation considered the 'StorablePoint' as a data that can be derived from 'Event',
- The original implementation considered the 'StorablePoint' as data that can be derived from 'Event',
leading to the design of synthetic events to deal with indexer that didn't index enough data.
- In marconi, the original design use a callback design to handle `MVar` modification,
- In marconi, the original design uses a callback design to handle `MVar` modification,
we wanted to address this point as well.
What's include in this module:
What's included in this module:
- Base type classes to define an indexer, its query interface, and the required plumbing to handle rollback
- A full in-memory indexer (naive), an indexer that compose it with a SQL layer for persistence
- Tracing, as a modifier to an existing indexer (it allows us to opt in for traces if we want, indexer by indexer)
- A coordinator for indexers, that can be exposed as an itdexer itself
- Some queries that can be applied to many indexers
Contrary to the original Marconi design, indexers don't have a mandatory memory representation.
Contrary to the original Marconi design, indexers don't have a unique (in-memory/sqlite) implementation.
In this module @desc@ is the descriptor of an indexer.
In this module 'desc' is the descriptor of an indexer.
It's usually an uninhabited type used to define the corresponding type families.
The idea behind @desc@ is that you may want to provide different indexer for the same descriptor,
The idea behind 'desc' is that you may want to provide different indexer for the same descriptor,
or to provide an indexer implementation that works for different descriptors.
Decoupling the descriptor enables such a decoupling.
Similarly the 'query' type parameter often holds as a descriptor for a 'Query'/'Result' type families.
-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE UndecidableInstances #-}
module Marconi.Core.Experiment where

import Control.Concurrent (QSemN)
import Control.Concurrent qualified as Con
import Control.Lens (folded, makeLenses, view, (%~), (&), (<<.~), (?~), (^.), (^..))
import Control.Lens (filtered, folded, makeLenses, view, (%~), (&), (<<.~), (?~), (^.), (^..), (^?))
import Control.Monad (forever, guard)
import Control.Monad.Except (MonadError (catchError, throwError))
import Control.Tracer (Tracer, traceWith)

import Control.Concurrent qualified as STM
Expand All @@ -55,6 +59,7 @@ import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Data.Foldable (foldlM, foldrM, traverse_)
import Data.Functor (($>))
import Data.List (intersect)
import Data.Text (Text)
import Database.SQLite.Simple qualified as SQL


Expand All @@ -72,14 +77,19 @@ type family Point desc
-- if several events need to be associated to the same point in time, wrap a `List`, etc.
type family Event desc

-- | A result is a data family from the corresponding query type.


-- | A query is a data family that take a query descriptor.
-- A query is tied to an indexer by a typeclass, this design choice has two main reasons:
-- * we want to be able to define different query for the same indexer
-- (eg. we may want to define two distinct query types for an utxo indexer:
-- one to ge all the utxo for a given address,
-- another one for to get all the utxos emitted at a given slot).
-- * we want to assign a query type to different indexers.
data family Result query
data family Query desc

-- | A result is a data family from the corresponding query descriptor.
data family Result desc


-- | Attach an event to a point in time
Expand Down Expand Up @@ -112,22 +122,36 @@ class Monad m => IsIndex indexer desc m where
lastSyncPoint :: indexer desc -> m (Maybe (Point desc))
{-# MINIMAL index, lastSyncPoint #-}


-- | Error that can occurs when you query an indexer
data QueryError desc
= AheadOfLastSync !(Maybe (Result desc))
-- ^ The required point is ahead of the current index.
-- The error may still provide its latest result if it make sense.
| NotStoredAnymore
-- ^ The requested point is too far in the past and has been aggregated
| IndexerError !Text
-- ^ The indexer query failed

-- | The indexer can answer a Query to produce the corresponding result of that query.
--
-- * @indexer@ is the indexer implementation type
-- * @desc@ the descriptor of the indexer, fixing the @Point@ types
-- * @query@ the type of query we want to answer fixing the @Result@ type
-- * @query@ the type of query we want to answer
-- * @m@ the monad in which our indexer operates
class Queryable indexer desc query m where
class MonadError (QueryError query) m => Queryable indexer desc query m where

-- | Query an indexer at a given point in time
-- It can be read as:
-- "With the knowledge you have at that point in time,
-- what is your answer to this query?"
--
-- Note: Not sure we shouldn't explicitly handle error when we query an invalid point here
query :: Point desc -> query -> indexer desc -> m (Result query)
query :: Ord (Point desc) => Point desc -> Query query -> indexer desc -> m (Result query)

-- | Check if the given point is ahead of the last syncPoint of an indexer, throw an error if it's the case
isNotAheadOfSync ::
(Ord (Point desc), MonadError (QueryError query) m, IsIndex indexer desc m) =>
Point desc -> indexer desc -> m Bool
isNotAheadOfSync p indexer = maybe False (p <=) <$> lastSyncPoint indexer

-- | We can reset an indexer to a previous `Point`
-- * @indexer@ is the indexer implementation type
Expand Down Expand Up @@ -239,9 +263,11 @@ makeLenses 'MixedIndexer
-- | The indexer can take a result and complete it with its events
-- It's used by the in-memory part of a 'MixedIndexer' to specify
-- how we can complete the database result with the memory content.
class ResumableResult indexer desc result m where
class ResumableResult indexer desc query m where

resumeResult :: result -> indexer desc -> m result
resumeResult ::
Ord (Point desc) =>
Point desc -> Query query -> indexer desc -> m (Result query) -> m (Result query)



Expand Down Expand Up @@ -291,15 +317,12 @@ instance
rewindInStore = MaybeT . rewind p

instance
( Monad m
, ResumableResult InMemory desc (Result query) m
( ResumableResult InMemory desc query m
, Queryable store desc query m
) =>
Queryable (MixedIndexer InMemory store) desc query m where

query valid q indexer = do
res <- query valid q $ indexer ^. inDatabase
resumeResult res $ indexer ^. inMemory
query valid q indexer = resumeResult valid q (indexer ^. inMemory) (query valid q (indexer ^. inDatabase))


-- * Indexer transformer: Add effects
Expand Down Expand Up @@ -519,3 +542,69 @@ instance Rewindable CoordinatorIndex desc IO where
-- There is no point in providing a 'Queryable' interface for 'CoordinatorIndex' though,
-- as it's sole interest would be to get the latest synchronisation points,
-- but 'query' requires a 'Point' to provide a result.


-- * Common query interfaces

-- ** Get Event at a given point in time

-- | Get the event stored by the indexer at a given point in time
data EventAt desc

data instance Query (EventAt desc) = EventAtQuery

-- | The result of EventAtQuery is always an event.
-- The error cases are handled by the query interface.
-- in time
newtype instance Result (EventAt desc) =
EventAt {getEvent :: Event desc}

instance (MonadError (QueryError (EventAt desc)) m) =>
Queryable InMemory desc (EventAt desc) m where

query p EventAtQuery ix = do
let isAtPoint e p' = e ^. point == p'
check <- isNotAheadOfSync p ix
if check
then maybe
(throwError NotStoredAnymore) -- If we can't find the point and if it's in the past, we probably moved it
(pure . EventAt)
$ ix ^? events . folded . filtered (`isAtPoint` p) . event
else throwError $ AheadOfLastSync Nothing

instance Queryable indexer desc (EventAt desc) m =>
ResumableResult indexer desc (EventAt desc) m where

resumeResult p q indexer result = result `catchError` \case
_inDatabaseError -> query p q indexer -- If we didn't find a result in db, try in memory

-- ** Filtering available events

-- | Query an indexer to find all events that match a given predicate
data EventsMatching desc

newtype instance Query (EventsMatching desc) = EventsMatchingQuery {predicate :: Event desc -> Bool}

-- | The result of an @EventMatchingQuery@
newtype instance Result (EventsMatching desc) = EventsMatching {filteredEvents :: [Event desc]}

deriving newtype instance Semigroup (Result (EventsMatching desc))

instance (MonadError (QueryError (EventsMatching desc)) m, Applicative m) =>
Queryable InMemory desc (EventsMatching desc) m where

query p q ix = do
let isBefore e p' = e ^. point <= p'
let result = EventsMatching $ ix ^.. events . folded . filtered (`isBefore` p) . event . filtered (predicate q)
check <- isNotAheadOfSync p ix
if check
then pure result
else throwError . AheadOfLastSync . Just $ result

instance Queryable indexer desc (EventsMatching desc) m =>
ResumableResult indexer desc (EventsMatching desc) m where

resumeResult p q indexer result = result `catchError` \case
-- If we find an incomplete result in db, add the in memory result to it
AheadOfLastSync (Just r) -> (<> r) <$> query p q indexer
inDatabaseError -> throwError inDatabaseError -- For any other error, forward it

0 comments on commit 64998e9

Please sign in to comment.