diff --git a/marconi-core/src/Marconi/Core/Experiment.hs b/marconi-core/src/Marconi/Core/Experiment.hs index ffe4041209..5b0f15d9f1 100644 --- a/marconi-core/src/Marconi/Core/Experiment.hs +++ b/marconi-core/src/Marconi/Core/Experiment.hs @@ -39,7 +39,7 @@ module Marconi.Core.Experiment where import Control.Concurrent (QSemN) import Control.Concurrent qualified as Con -import Control.Lens (makeLenses, view, (%~), (&), (<<.~), (?~), (^.)) +import Control.Lens (folded, makeLenses, view, (%~), (&), (<<.~), (?~), (^.), (^..)) import Control.Monad (forever, guard) import Control.Tracer (Tracer, traceWith) @@ -66,17 +66,24 @@ type family Point desc type family Event desc data family Result query +data TimedEvent desc = + TimedEvent + { _point :: !(Point desc) + , _event :: !(Event desc) + } + +makeLenses 'TimedEvent class Monad m => IsIndex indexer desc m where -- | index an event at a given point in time index :: Eq (Point desc) => - Point desc -> Event desc -> indexer desc -> m (indexer desc) + TimedEvent desc -> indexer desc -> m (indexer desc) -- | Index a bunch of points, associated to their event, in an indexer indexAll :: (Eq (Point desc), Foldable f) => - f (Point desc, Event desc) -> indexer desc -> m (indexer desc) - indexAll = flip (foldrM (uncurry index)) + f (TimedEvent desc) -> indexer desc -> m (indexer desc) + indexAll = flip (foldrM index) -- | Last sync of the indexer lastSyncPoint :: indexer desc -> m (Maybe (Point desc)) @@ -120,7 +127,7 @@ class Resumable indexer desc m where -- | Full in memory indexer, it uses list because I was too lazy to port the 'Vector' implementation. -- If we wanna move to these indexer, we should switch the implementation to the 'Vector' one. data InMemory desc = InMemory - { _events :: ![(Point desc, Event desc)] -- ^ Stored 'Event', associated with their history 'Point' + { _events :: ![TimedEvent desc] -- ^ Stored 'Event', associated with their history 'Point' , _latest :: !(Maybe (Point desc)) -- ^ Ease access to the latest datapoint } @@ -128,9 +135,9 @@ makeLenses 'InMemory instance (Monad m) => IsIndex InMemory desc m where - index p e ix = pure $ ix - & events %~ ((p, e):) - & latest ?~ p + index timedEvent ix = pure $ ix + & events %~ (timedEvent:) + & latest ?~ (timedEvent ^. point) lastSyncPoint = pure . view latest @@ -149,13 +156,13 @@ instance Applicative m => Rewindable InMemory desc m where cleanRecentEvents = events %~ dropWhile isEventAfterRollback isIndexBeforeRollback :: InMemory desc -> Bool isIndexBeforeRollback x = maybe True (p >=) $ x ^. latest - isEventAfterRollback :: (Point desc, a) -> Bool - isEventAfterRollback = (p <) . fst + isEventAfterRollback :: TimedEvent desc -> Bool + isEventAfterRollback = (p <) . view point instance Applicative m => Resumable InMemory desc m where syncPoints ix = let - indexPoints = fst <$> (ix ^. events) + indexPoints = ix ^.. events . folded . point -- if the latest point of the index is not a stored event, we add it to the list of points addLatestIfNeeded Nothing ps = ps addLatestIfNeeded (Just p) [] = [p] @@ -205,14 +212,14 @@ instance , IsIndex store desc m ) => IsIndex (MixedIndexer InMemory store) desc m where - index point e indexer = do + index timedEvent indexer = do let maxMemSize = fromIntegral $ indexer ^. slotsInMemory currentSize = length (indexer ^. inMemory . events) if currentSize >= maxMemSize then do indexer' <- flush indexer - inMemory (index point e) indexer' - else inMemory (index point e) indexer + inMemory (index timedEvent) indexer' + else inMemory (index timedEvent) indexer lastSyncPoint = lastSyncPoint . view inMemory @@ -246,8 +253,7 @@ instance -- | Remarkable events of an indexer data IndexerNotification desc = Rollback !(Point desc) - | Process !(Point desc) - | Issue !(Event desc) + | Index !(TimedEvent desc) -- | A tracer modifier that adds tracing to an existing indexer data WithTracer m indexer desc = @@ -262,10 +268,9 @@ instance (Monad m, IsIndex index desc m) => IsIndex (WithTracer m index) desc m where - index point event indexer = do - res <- tracedIndexer (index point event) indexer - traceWith (indexer ^. tracer) (Issue event) - traceWith (indexer ^. tracer) (Process point) + index timedEvent indexer = do + res <- tracedIndexer (index timedEvent) indexer + traceWith (indexer ^. tracer) $ Index timedEvent pure res lastSyncPoint = lastSyncPoint . view tracedIndexer @@ -295,7 +300,7 @@ data RunnerM m input point = Runner { runnerState :: !(TMVar (indexer desc)) , identifyRollback :: !(input -> m (Maybe (Point desc))) - , extractEvent :: !(input -> m (Maybe (Point desc, Event desc))) + , extractEvent :: !(input -> m (Maybe (TimedEvent desc))) } type Runner = RunnerM IO @@ -308,7 +313,7 @@ createRunner :: , point ~ Point desc) => indexer desc -> (input -> IO (Maybe point)) -> - (input -> IO (Maybe (point, Event desc))) -> + (input -> IO (Maybe (TimedEvent desc))) -> IO (TMVar (indexer desc), Runner input point) createRunner ix rb f = do mvar <- STM.atomically $ STM.newTMVar ix @@ -328,12 +333,12 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do unlockCoordinator = Con.signalQSemN tokens 1 - indexEvent p e = do + indexEvent timedEvent = do indexer <- STM.atomically $ STM.takeTMVar ix indexerLastPoint <- lastSyncPoint indexer - if maybe True (< p) indexerLastPoint + if maybe True (< timedEvent ^. point) indexerLastPoint then do - indexer' <- index p e indexer + indexer' <- index timedEvent indexer STM.atomically $ STM.putTMVar ix indexer' else STM.atomically $ STM.putTMVar ix indexer @@ -341,8 +346,8 @@ startRunner chan tokens (Runner ix isRollback extractEvent) = do me <- extractEvent input case me of Nothing -> pure () - Just (point, event) -> do - indexEvent point event + Just timedEvent -> do + indexEvent timedEvent handleRollback p = do indexer <- STM.atomically $ STM.takeTMVar ix @@ -411,7 +416,8 @@ makeLenses 'CoordinatorIndex -- A coordinator can be consider as an indexer that forwards the input to its runner instance IsIndex CoordinatorIndex desc IO where - index point event = coordinator $ \x -> step (const point) x event + index timedEvent = coordinator $ + \x -> step (const $ timedEvent ^. point) x $ timedEvent ^. event lastSyncPoint indexer = pure $ indexer ^. coordinator . lastSync @@ -441,3 +447,7 @@ instance Rewindable CoordinatorIndex desc IO where (STM.atomically (STM.putTMVar runnerState indexer) $> Nothing) ((Just r <$) . STM.atomically . STM.putTMVar runnerState) res + +-- There is no point in providing a 'Queryable' interface for 'CoordinatorIndex' though, +-- as it's sole interest would be to get the latest synchronisation points, +-- but 'query' requires a 'Point' to provide a result.