Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventsByTag may miss event under certain conditions #166

Closed
notxcain opened this issue Mar 23, 2017 · 4 comments
Closed

eventsByTag may miss event under certain conditions #166

notxcain opened this issue Mar 23, 2017 · 4 comments
Assignees
Labels
Milestone

Comments

@notxcain
Copy link
Contributor

notxcain commented Mar 23, 2017

Xn - event number n of aggregate with id X.
All events are tagged with the same tag. No holes.

This is the order of events in the base table according to timestamp B1, A1, B2.
Mat views are asynchronous, so it is possible that the order in which events appeared in mat view is A1, B1, B2.
Source emits A1, but never emits B1, because actual order in view is B1, A1, B2, i.e. the same as in base table.

@patriknw
Copy link
Member

First, assuming proper cassandra-query-journal.delayed-event-timeout setting, i.e. not 0s.

It's rather easy to write such scenarios in EventsByTagSpec https://github.com/akka/akka-persistence-cassandra/blob/master/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala#L622

    "find delayed events 2" in {
      val t1 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
      val w1 = UUID.randomUUID().toString
      val w2 = UUID.randomUUID().toString

      val t2 = t1.plusSeconds(1)
      val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
      writeTestEvent(t2, eventA1, Set("T7"))

      val src = queries.eventsByTag(tag = "T7", offset = NoOffset)
      val probe = src.runWith(TestSink.probe[Any])
      probe.request(10)
      probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }

      // delayed, timestamp is before A1
      val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
      writeTestEvent(t1, eventB1, Set("T7"))
      val t3 = t1.plusSeconds(2)
      val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
      writeTestEvent(t3, eventB2, Set("T7"))

      probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
      probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }

      probe.cancel()
    }

That will fail as you describe. The reason B1 is missed here is that we don't know what the sequence number of the first B event is. Therefore it is using the sequence number of the first seen B event as starting point, i.e. B2. Therefore it doesn't detect that B1 is missing.

The following test works:

    "find delayed events 3" in {
      val t1 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
      val w1 = UUID.randomUUID().toString
      val w2 = UUID.randomUUID().toString

      val eventB0 = PersistentRepr("B0", 1L, "b", "", writerUuid = w2)
      writeTestEvent(t1.minusSeconds(1), eventB0, Set("T8"))

      val t2 = t1.plusSeconds(1)
      val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
      writeTestEvent(t2, eventA1, Set("T8"))

      val src = queries.eventsByTag(tag = "T8", offset = NoOffset)
      val probe = src.runWith(TestSink.probe[Any])
      probe.request(10)
      probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
      probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }

      // delayed, timestamp is before A1
      val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
      writeTestEvent(t1, eventB1, Set("T8"))
      val t3 = t1.plusSeconds(2)
      val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
      writeTestEvent(t3, eventB2, Set("T8"))

      probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
      probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }

      probe.cancel()
    }

To mitigate this risk we could perhaps populate the SequenceNumbers history by looking back at old events when starting up the query.

@notxcain
Copy link
Contributor Author

notxcain commented Mar 31, 2017

@patriknw thanks for investigation. What strategy of population of SequenceNumbers do you suggest? Folding over SELECT persistence_id, sequence_nr FROM eventsbytag since first timebucket on materialization? Or folding on demand when first event with untracked sequence number is observed?

@patriknw
Copy link
Member

Easiest would be to just use the existing query (EventsByTagFetcher), which is also used for finding delayed events and missing events. We could get better performance by using something that is only looking at the needed fields (e.g. not fetching and deserializing the event payload). It's probably worth doing the latter.

@patriknw patriknw added this to the 0.25 milestone Apr 3, 2017
@patriknw patriknw added the bug label Apr 3, 2017
@patriknw patriknw self-assigned this Apr 3, 2017
patriknw added a commit that referenced this issue Apr 5, 2017
* When the first event for a persistenceId was seen it used
  that seqence number as the first sequence number, but if
  such events are delayed due to the eventual consistency
  it might miss preceding events for that persistenceId.
* The solution is that when first event is seen it aborts
  the current query and performs backtracking query instead.
patriknw added a commit that referenced this issue Apr 5, 2017
improve eventsByTag to find first delayed event, #166
@patriknw patriknw closed this as completed Apr 5, 2017
patriknw added a commit that referenced this issue Apr 5, 2017
* When the first event for a persistenceId was seen it used
  that seqence number as the first sequence number, but if
  such events are delayed due to the eventual consistency
  it might miss preceding events for that persistenceId.
* The solution is that when first event is seen it aborts
  the current query and performs backtracking query instead.

(cherry picked from commit d648a86)
@patriknw
Copy link
Member

patriknw commented Apr 5, 2017

@notxcain 0.25 is released with this improvement included

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants