-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Scalable slice queries for many consumers #31957
Conversation
|
||
final class SlowConsumerException(message: String) extends RuntimeException(message) with NoStackTrace | ||
|
||
final case class FirehoseKey(pluginId: String, entityType: String, sliceRange: Range) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intend to create a follow up ticket about colocating shards. So when several projections are started with ShardedDaemonProcess they would try to allocate slice ranges to the same nodes, so that firehose for a slice range is not started in more places than necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was #31967
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far (also reviewed several rounds in the original PR).
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Show resolved
Hide resolved
...y/src/main/scala/akka/persistence/query/typed/javadsl/EventsBySliceFirehoseReadJournal.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass last Friday. I have a bunch of questions but need to re-review and refine them.
I'm leaving a nitpick formatting suggestion though. 😄
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good.
Leaving some comments already.
I want to do a second pass to review more in detail the slow consumer detection.
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
else if (diffFastest < 0) s"ahead of fastest [$diffFastest] ms" // not possible | ||
else "same as fastest" | ||
val diffSlowest = slowestConsumer.offsetTimestamp.toEpochMilli - tracking.offsetTimestamp.toEpochMilli | ||
val diffSlowestStr = | ||
if (diffSlowest > 0) s"behind slowest [$diffSlowest] ms" // not possible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it's not possible, but what if we have a bug and it shows up in logs. Wouldn't that be confusing?
Shall we log something else or fail the stream?
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding it a really cool feature.
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More detailed review of the slow-consumer-detection.
.../src/main/scala/akka/persistence/query/typed/scaladsl/EventsBySliceFirehoseReadJournal.scala
Outdated
Show resolved
Hide resolved
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
if (existing.history.size <= settings.broadcastBufferSize) | ||
existing.history :+ offset | ||
else | ||
existing.history.tail :+ offset // drop one, add one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expensive ops with vector tail + append for each element, is there a better data structure we could use, some form of queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think tail
is that bad, still listed as eC
. Scala immutable Queue is probably faster for this, but then we also use size
, which is not good for Queue since it's delegating to the underlying Lists.
I think I'll leave it as Vector for now.
val slowestConsumer = trackingValues.minBy(_.offsetTimestamp) | ||
val fastestConsumer = trackingValues.maxBy(_.offsetTimestamp) | ||
|
||
val behind = elementsBehind(fastestConsumer.history, slowestConsumer.history) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, the slowest consumer will hold back the fastest so that it should always have the timestamp of the slower in its history, because we keep as long offset history as the size of the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. Not great to have to keep the history for each consumer but I couldn't think of another way. It's just small objects, and probably held in memory by other things.
There is a case when the fastest is a new consumer and then it wouldn't have the full history yet. Then it keeps going.
...tence-query/src/main/scala/akka/persistence/query/typed/internal/EventsBySliceFirehose.scala
Outdated
Show resolved
Hide resolved
* it's also possible to have more than one events-by-slice-firehose, each can have it's own underlying plugin, and creating its own Firehose instance
Co-authored-by: Francisco Lopez-Sancho <franciscolopezsancho@gmail.com> Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
96acc5c
to
338edcb
Compare
Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work @patriknw
No description provided.