/
Experiment.hs
443 lines (354 loc) · 16.9 KB
/
Experiment.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
{- |
This module propose an alternative to the index implementation proposed in 'RewindableIndex.Storable'.
The point we wanted to address are the folowing:
- 'Storable' implementation is designed in a way that strongly promote indexer that rely on a mix of database and
in-memory storage. We try to propose a more generic design that would allow
- full in-memory indexers
- indexer backed by a simple file
- mock indexer, for testing purpose, with predefined behaviour
- group of indexers, synchronised as a single indexer
- implement in-memory/database storage that rely on other query heuristic
- We want to be able to compose easily indexers to build new ones. For example, the original indexer design can be
seen as the combination of two indexers, a full in-memory indexer, and a full in database indexer.
- The original implementation considered the 'StorablePoint' as a data that can be derived from 'Event',
leading to the design of synthetic events to deal with indexer that didn't index enough data.
- In marconi, the original design use an exotic callback design to handle `MVar` modification,
we wanted to address this point as well.
What's include in this module:
- Base type classes to define an indexer, its query interface, and the required plumbing to handle rollback
- A full in-memory indexer (naive), an indexer that compose it with a SQL layer for persistence
- Tracing, as a modifier to an existing indexer (it allows us to opt in for traces if we want, indexer by indexer)
- A coordinator for indexers, that can be exposed as an itdexer itself
In this module @desc@ is the descriptor of an indexer,
it's usually an uninhabited type used to define the corresponding type families.
The idea behind @desc@ is that you may want to provide different indexer for the same descriptor,
or to provide an indexer implementation that works for different descriptors.
-}
module Marconi.Core.Experiment where
import Control.Concurrent (QSemN)
import Control.Concurrent qualified as Con
import Control.Lens (makeLenses, view, (%~), (&), (<<.~), (?~), (^.))
import Control.Monad (forever, guard)
import Control.Tracer (Tracer, traceWith)
import Control.Concurrent qualified as STM
import Control.Concurrent.STM (TChan, TMVar)
import Control.Concurrent.STM qualified as STM
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Data.Foldable (foldlM, foldrM, traverse_)
import Data.Functor (($>))
import Data.List (intersect)
import Database.SQLite.Simple qualified as SQL
-- | A point in time, the concrete type of a point is now derived from an indexer descriptor,
-- instead of an event.
-- The reason is that you may not want to always carry around a point when you manipulate an event.
type family Point desc
-- |
-- A an element that you want to capture from a given input.
-- A given point in time will always correspond to an event.
-- As a consequence if a point in time can be associated with no event,
-- wrap it in a `Maybe` type,
-- if several events need to be associated to the same point in time, wrap a `List`, etc.
type family Event desc
data family Result query
class Monad m => IsIndex indexer desc m where
-- | index an event at a given point in time
index :: Eq (Point desc) =>
Point desc -> Event desc -> indexer desc -> m (indexer desc)
-- | Index a bunch of points, associated to their event, in an indexer
indexAll :: (Eq (Point desc), Foldable f) =>
f (Point desc, Event desc) -> indexer desc -> m (indexer desc)
indexAll = flip (foldrM (uncurry index))
-- | Last sync of the indexer
lastSyncPoint :: indexer desc -> m (Maybe (Point desc))
-- | The indexer can answer a Query to produce the corresponding result of that query
class Queryable indexer desc query m where
-- | Query an indexer at a given point in time
-- It can be read as:
-- "With the knowledge you have at that point in time,
-- what is your answer to this query?"
--
-- Note: Not sure we shouldn't explicitly handle error when we query an invalid point here
query :: Point desc -> query -> indexer desc -> m (Result query)
-- | We can reset an indexer to a previous `Point`
class Rewindable indexer desc m where
rewind :: Ord (Point desc) => Point desc -> indexer desc -> m (Maybe (indexer desc))
-- | The indexer can aggregate old data.
-- The main purpose is to speed up query processing.
-- If the indexer is 'Rewindable', 'Aggregable' can't 'rewind' behind the 'aggregationPoint',
-- the idea is to call 'aggregate' on points that can't be rollbacked anymore.
class Aggregable indexer desc m where
-- Aggregate events of the indexer up to a given point in time
aggregate :: Point desc -> indexer desc -> m (indexer desc)
-- The latest aggregation point (aggregation up to the result are aggregated)
aggregationPoint :: indexer desc -> m (Point desc)
-- | Points from which we can restract safely
class Resumable indexer desc m where
-- | Last sync of the indexer
syncPoints :: Ord (Point desc) => indexer desc -> m [Point desc]
-- | Full in memory indexer, it uses list because I was too lazy to port the 'Vector' implementation.
-- If we wanna move to these indexer, we should switch the implementation to the 'Vector' one.
data InMemory desc = InMemory
{ _events :: ![(Point desc, Event desc)] -- ^ Stored 'Event', associated with their history 'Point'
, _latest :: !(Maybe (Point desc)) -- ^ Ease access to the latest datapoint
}
makeLenses 'InMemory
instance (Monad m) => IsIndex InMemory desc m where
index p e ix = pure $ ix
& events %~ ((p, e):)
& latest ?~ p
lastSyncPoint = pure . view latest
instance Applicative m => Rewindable InMemory desc m where
rewind p ix = pure . pure
$ if isIndexBeforeRollback ix
then ix
else ix
& cleanRecentEvents
& adjustLatestPoint
where
adjustLatestPoint :: InMemory desc -> InMemory desc
adjustLatestPoint = latest ?~ p
cleanRecentEvents :: InMemory desc -> InMemory desc
cleanRecentEvents = events %~ dropWhile isEventAfterRollback
isIndexBeforeRollback :: InMemory desc -> Bool
isIndexBeforeRollback x = maybe True (p >=) $ x ^. latest
isEventAfterRollback :: (Point desc, a) -> Bool
isEventAfterRollback = (p <) . fst
instance Applicative m => Resumable InMemory desc m where
syncPoints ix = let
indexPoints = fst <$> (ix ^. events)
-- if the latest point of the index is not a stored event, we add it to the list of points
addLatestIfNeeded Nothing ps = ps
addLatestIfNeeded (Just p) [] = [p]
addLatestIfNeeded (Just p) ps@(p':_) = if p == p' then ps else p:ps
in pure $ addLatestIfNeeded (ix ^. latest) indexPoints
newtype InDatabase desc = InDatabase { _con :: SQL.Connection }
makeLenses 'InDatabase
-- | An indexer that has at most '_slotsInMemory' events in memory and put the older one in database.
-- The query interface for this indexer will alwys go through the database first and then aggregate
-- results presents in memory.
data MixedIndexer mem store desc = MixedIndexer
{ _slotsInMemory :: !Word -- ^ How many slots do we keep in memory
, _inMemory :: !(mem desc) -- ^ The fast storage for latest elements
, _inDatabase :: !(store desc) -- ^ In database storage, should be similar to the original indexer
}
makeLenses 'MixedIndexer
-- | The indexer can take a result and complete it with its events
-- It's useful for the in memory part of a 'MixedIndexer', as it specify
-- how we can complete the database result with the memory content.
class ResumableResult indexer desc result m where
resumeResult :: result -> indexer desc -> m result
-- | Flush the in-memory events to the database, keeping track of the latest index
flush ::
( Monad m
, IsIndex store desc m
, Eq (Point desc)
) => MixedIndexer InMemory store desc ->
m (MixedIndexer InMemory store desc)
flush indexer = let
flushMemory = inMemory . events <<.~ []
(eventsToFlush, indexer') = flushMemory indexer
in inDatabase (indexAll eventsToFlush) indexer'
instance
( Monad m
, IsIndex InMemory desc m
, IsIndex store desc m
) => IsIndex (MixedIndexer InMemory store) desc m where
index point e indexer = do
let maxMemSize = fromIntegral $ indexer ^. slotsInMemory
currentSize = length (indexer ^. inMemory . events)
if currentSize >= maxMemSize
then do
indexer' <- flush indexer
inMemory (index point e) indexer'
else inMemory (index point e) indexer
lastSyncPoint = lastSyncPoint . view inMemory
instance
( Monad m
, Rewindable store desc m
) => Rewindable (MixedIndexer InMemory store) desc m where
rewind p indexer = do
mindexer <- runMaybeT $ inMemory rewindInStore indexer
case mindexer of
Just ix -> if not $ null $ ix ^. inMemory . events
then pure $ pure ix -- if there are still events in memory, no need to rewind the database
else runMaybeT $ inDatabase rewindInStore ix
Nothing -> pure Nothing
where
rewindInStore :: Rewindable index desc m => index desc -> MaybeT m (index desc)
rewindInStore = MaybeT . rewind p
instance
( Monad m
, ResumableResult InMemory desc (Result query) m
, Queryable store desc query m
) =>
Queryable (MixedIndexer InMemory store) desc query m where
query valid q indexer = do
res <- query valid q $ indexer ^. inDatabase
resumeResult res $ indexer ^. inMemory
-- | Remarkable events of an indexer
data IndexerNotification desc
= Rollback !(Point desc)
| Process !(Point desc)
| Issue !(Event desc)
-- | A tracer modifier that adds tracing to an existing indexer
data WithTracer m indexer desc =
WithTracer
{ _tracedIndexer :: !(indexer desc)
, _tracer :: !(Tracer m (IndexerNotification desc))
}
makeLenses 'WithTracer
instance
(Monad m, IsIndex index desc m) =>
IsIndex (WithTracer m index) desc m where
index point event indexer = do
res <- tracedIndexer (index point event) indexer
traceWith (indexer ^. tracer) (Issue event)
traceWith (indexer ^. tracer) (Process point)
pure res
lastSyncPoint = lastSyncPoint . view tracedIndexer
instance
( Monad m
, Rewindable index desc m
) => Rewindable (WithTracer m index) desc m where
rewind p indexer = do
res <- runMaybeT $ tracedIndexer (MaybeT . rewind p) indexer
maybe (pure Nothing) traceSuccessfulRewind res
where
traceSuccessfulRewind indexer' = do
traceWith (indexer' ^. tracer) (Rollback p)
pure $ Just indexer'
-- | A runner encapsulate an indexer in an opaque type, that allows to plug different indexers to the same stream of
-- input data
data RunnerM m input point =
forall indexer desc.
( IsIndex indexer desc m
, Resumable indexer desc m
, Rewindable indexer desc m
, Point desc ~ point
) =>
Runner
{ runnerState :: !(TMVar (indexer desc))
, identifyRollback :: !(input -> m (Maybe (Point desc)))
, extractEvent :: !(input -> m (Maybe (Point desc, Event desc)))
}
type Runner = RunnerM IO
-- | create a runner for an indexer, retuning the runner and the 'MVar' it's using internally
createRunner ::
( IsIndex indexer desc IO
, Resumable indexer desc IO
, Rewindable indexer desc IO
, point ~ Point desc) =>
indexer desc ->
(input -> IO (Maybe point)) ->
(input -> IO (Maybe (point, Event desc))) ->
IO (TMVar (indexer desc), Runner input point)
createRunner ix rb f = do
mvar <- STM.atomically $ STM.newTMVar ix
pure (mvar, Runner mvar rb f)
-- | The runner start waiting fo new event and process them as they come
startRunner :: Ord point => TChan input -> QSemN -> Runner input point -> IO ()
startRunner chan tokens (Runner ix isRollback extractEvent) = do
chan' <- STM.atomically $ STM.dupTChan chan
input <- STM.atomically $ STM.readTChan chan'
forever $ do
unlockCoordinator
rollBackPoint <- isRollback input
maybe (handleInsert input) handleRollback rollBackPoint
where
unlockCoordinator = Con.signalQSemN tokens 1
indexEvent p e = do
indexer <- STM.atomically $ STM.takeTMVar ix
indexerLastPoint <- lastSyncPoint indexer
if maybe True (< p) indexerLastPoint
then do
indexer' <- index p e indexer
STM.atomically $ STM.putTMVar ix indexer'
else STM.atomically $ STM.putTMVar ix indexer
handleInsert input = do
me <- extractEvent input
case me of
Nothing -> pure ()
Just (point, event) -> do
indexEvent point event
handleRollback p = do
indexer <- STM.atomically $ STM.takeTMVar ix
mindexer <- rewind p indexer
maybe
(STM.atomically $ STM.putTMVar ix indexer)
(STM.atomically . STM.putTMVar ix)
mindexer
data Coordinator input point = Coordinator
{ _lastSync :: !(Maybe point) -- ^ the last common sync point for the runners
, _runners :: ![Runner input point] -- ^ the list of runners managed by this coordinator
, _tokens :: !QSemN -- ^ use to synchronise the runner
, _channel :: !(TChan input) -- ^ to dispatch input to runners
, _nbRunners :: !Int -- ^ how many runners are we waiting for, should always be equal to @length runners@
}
makeLenses 'Coordinator
-- | Get the common syncPoints of a group or runners
runnerSyncPoints :: Ord point => [Runner input point] -> IO [point]
runnerSyncPoints [] = pure []
runnerSyncPoints (r:rs) = do
ps <- getSyncPoints r
foldlM (\acc r' -> intersect acc <$> getSyncPoints r') ps rs
where
getSyncPoints :: Ord point => Runner input point -> IO [point]
getSyncPoints (Runner ix _ _) = do
indexer <- STM.atomically $ STM.takeTMVar ix
res <- syncPoints indexer
STM.atomically $ STM.putTMVar ix indexer
pure res
-- | create a coordinator with started runners
start :: Ord point => [Runner input point] -> IO (Coordinator input point)
start runners' = do
let nb = length runners'
tokens' <- STM.newQSemN 0 -- starts empty, will be filled when the runners will start
channel' <- STM.newBroadcastTChanIO
startRunners channel' tokens'
pure $ Coordinator Nothing runners' tokens' channel' nb
where
startRunners channel' tokens' = traverse_ (startRunner channel' tokens') runners'
-- A coordinator step (send an input, wait for an ack of every runner that it's processed)
step :: (input -> point) -> Coordinator input point -> input -> IO (Coordinator input point)
step getPoint coordinator input = do
dispatchNewInput
waitRunners $> setLastSync coordinator
where
waitRunners = Con.waitQSemN (coordinator ^. tokens) (coordinator ^. nbRunners)
dispatchNewInput = STM.atomically $ STM.writeTChan (coordinator ^. channel) input
setLastSync c = c & lastSync ?~ getPoint input
-- A coordinator can be seen as an indexer
newtype CoordinatorIndex desc =
CoordinatorIndex
{ _coordinator :: Coordinator (Event desc) (Point desc)
}
makeLenses 'CoordinatorIndex
-- A coordinator can be consider as an indexer that forwards the input to its runner
instance IsIndex CoordinatorIndex desc IO where
index point event = coordinator $ \x -> step (const point) x event
lastSyncPoint indexer = pure $ indexer ^. coordinator . lastSync
-- | To rewind a coordinator, we try and rewind all the runners.
instance Rewindable CoordinatorIndex desc IO where
rewind p = runMaybeT . coordinator rewindRunners
where
rewindRunners ::
Coordinator (Event desc) (Point desc) ->
MaybeT IO (Coordinator (Event desc) (Point desc))
rewindRunners c = do
availableSyncs <- lift $ runnerSyncPoints $ c ^. runners
-- we start by checking if the given point is a valid sync point
guard $ p `elem` availableSyncs
runners (traverse $ MaybeT . rewindRunner) c
rewindRunner ::
Runner (Event desc) (Point desc) ->
IO (Maybe (Runner (Event desc) (Point desc)))
rewindRunner r@Runner{runnerState} = do
indexer <- STM.atomically $ STM.takeTMVar runnerState
res <- rewind p indexer
maybe
-- the Nothing case should not happen
-- as we check that the sync point is a valid one
(STM.atomically (STM.putTMVar runnerState indexer) $> Nothing)
((Just r <$) . STM.atomically . STM.putTMVar runnerState)
res