fix: aggregator restart missing events#737
Conversation
restart from the beginning of conclusion events to avoid skipping on restarts. need to figure out why memory table contains conclusion_event_order that is less than the data written to event_states still, and hopefully restart where we left off. for now, we just ignore things we've seen before.
… whole we split the batch into models/mids and were writing both pieces to disk separately. The entire batch is orded by conclusion_event_order, but each group could overlap. So we could flush the cache for the first write and then we'd have a cache that was "going backward" in conclusion_event_order. On restart, those events were missed and events failed to aggregate correctly
519f887 to
492c7b3
Compare
m0ar
left a comment
There was a problem hiding this comment.
I think this looks good, just some clarification requests and nitpicks ✨
| let models = self.validate_models(models).context("validating models")?; | ||
| let mut models = self | ||
| .store_event_states(models) | ||
| .store_event_states(models, false) |
There was a problem hiding this comment.
Could you elaborate on why we prevent flush here, but not for the mids below?
There was a problem hiding this comment.
so this is the crux of the fix of what I tried to explain in the second bullet point (verbosely and imprecisely). basically, we put the entire batch of conclusion events into memory (models here and MIDs below) and only flush once so that we can't end up with the in memory order being behind the on disk order.
| .with_column_renamed("event_height", "previous_height")?; | ||
|
|
||
| Ok(conclusion_events | ||
| let conclusion_events = conclusion_events |
There was a problem hiding this comment.
Did we lose a pattern-matching error check here?
There was a problem hiding this comment.
No, I just shadowed the name while doing some more filtering (to be idempotent if we received the conclusion event a second time).. instead of returning Ok(select_things().await?) we now assign and then return Ok(conclusion_events) at the end.
| if allow_cache_flush { | ||
| self.flush_cache().await?; | ||
| let count = self.count_cache().await?; | ||
| tracing::debug!(%count, will_flush = %count >= self.max_cached_rows, "counts for mem table"); | ||
| // If we have enough data cached in memory write it out to persistent store | ||
| if count >= self.max_cached_rows { | ||
| self.flush_cache().await?; | ||
| } |
There was a problem hiding this comment.
Would be good to document why this check is important. I don't really understand it outside of testing use cases, in particular the block on l301 🤔
There was a problem hiding this comment.
hmm, I think this comment is reasonable but open to improvements: If we have enough data cached in memory write it out to persistent store
I'm trying to figure out how to clarify.. we used to flush the cache every time we got here if we had more rows in the mem table than our max_cached_rows value. Now, I added a allow_cache_flush flag because we call this twice while processing a single batch of conclusion events, and we don't want to flush the cache in the middle of the batch and now do it only once at the end.
If we only processed the conclusion events in order this wouldn't be necessary, but the conclusion_event order is the arrival on the node, which only deals with stream ordering (hence the guarantee that any conclusion_event is after all events in its stream but not necessarily its model, as we accept and persist events without the model present).
| // splice the 3 events together making sure each vec isn't reordered but not all in a row | ||
| let events = &events; |
There was a problem hiding this comment.
Not sure if this comment refers to the right thing
There was a problem hiding this comment.
whoops, yep, I modified this behavior... removed and moved the model into the middle to be more likely to replicate the real cause
Fix issue where the aggregator could restart and miss events, causing validation errors on valid streams.
This includes two changes:
process_conclusion_events_batchandjoin_event_statesidempotent and exclude duplicate incoming events by preferring the on disk events when they are duplicated in the batch. This means we could go backward/see events again and not cause errors.conclusion_event_orderin memory, so on restart we'd skip them, as we started from our previous on disk max. As we'd have missing events (usually init), this would cause aggregator errors and streams would be full of validation errors. This was due to the fact that we write events from the batch to disk in phases (Models, MIDs). Although our batch was from [X, Y], the two internal batches could have interleaving order, so if the first batch caused a cache flush, we'd leave the remaining events in memory. Now we only flush the cache once per conclusion events batch, so they are all written to disk together. This means our cache table can grow even more over it's allotted size, but it's short lived and worth it.With this, we shouldn't need the shutdown task to flush but I don't see any harm in keeping it so it's still there.