Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable protection against server-bug induced resets #1299

Merged
merged 5 commits into from
Mar 10, 2021

Conversation

jyates
Copy link
Contributor

@jyates jyates commented Dec 28, 2020

Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References #1286

@jyates
Copy link
Contributor Author

jyates commented Dec 28, 2020

can't see where/how my changes broke the docs, but does look like we would need a minor version bump for the change in the ConsumerSettings to support the new option.

@jyates
Copy link
Contributor Author

jyates commented Dec 28, 2020

Not sure at all how verifyDocs is passing in master, given my code doesn't touch those lines at all. That said, looks like an occurance of scala/scala-collection-compat#203

@jyates
Copy link
Contributor Author

jyates commented Dec 29, 2020

Not sure what is up with the tests now, but running locally...seems to work fine

$ sbt "tests/testOnly *.RebalanceExtSpec"
[info] Loading global plugins from /Users/jyates/.sbt/1.0/plugins
[info] Loading settings for project alpakka-kafka-build from plugins.sbt ...
[info] Loading project definition from /Users/jyates/dev/oss/alpakka-kafka/project
[info] Loading settings for project alpakka-kafka from build.sbt ...
[info] 
[info] ** Welcome to the Alpakka Kafka connector! **
...
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Formatting 2 Scala sources...
[info] Compiling 3 Scala sources to /Users/jyates/dev/oss/alpakka-kafka/core/target/scala-2.13/classes ...
[info] Compiling 1 Java source to /Users/jyates/dev/oss/alpakka-kafka/testkit/target/scala-2.13/classes ...
[info] Compiling 55 Scala sources and 17 Java sources to /Users/jyates/dev/oss/alpakka-kafka/tests/target/scala-2.13/test-classes ...
[info] RebalanceExtSpec:
[info] Fetched records
[info] - must no messages should be lost when two consumers consume from one topic and two partitions and one consumer aborts mid-stream (12 seconds, 367 milliseconds)
[info] ScalaTest
[info] Run completed in 1 minute, 52 seconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Same for $ sbt "++2.12.11 tests/testOnly *.RebalanceExtSpec" and sbt "++2.13.2 tests/testOnly *.RebalanceExtSpec"

Running on jdk8

$ java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

@jyates
Copy link
Contributor Author

jyates commented Dec 29, 2020

Looks like the tests are getting stuck just spinning up the KafkaContainerCluster, not sure how to go about managing that. Maybe busy build infra at the time?

@jyates
Copy link
Contributor Author

jyates commented Dec 29, 2020

Any thoughts @ennru?

@ennru
Copy link
Member

ennru commented Jan 7, 2021

Looks like something in TestContainers didn't work correctly. I need to look closer...

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this intricate work. I wonder how we can document it so that users that might need it can find and understand it.

The hierarchy of ConsumerProgressTracking is a bit challenging with the delegation.
I tried out some suggestions locally, let me know what you think.

The CI had a hick-up when you pushed, I restarted it and it failed in more expected ways.

import scala.jdk.CollectionConverters._

/**
* Track the current state of the consumer: what offsets its has requested and committed, filtering by the current
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Track the current state of the consumer: what offsets its has requested and committed, filtering by the current
* Track the current state of the consumer: what offsets it has requested and committed, filtering by the current


/**
* Track the current state of the consumer: what offsets its has requested and committed, filtering by the current
* assignments to the consumer. When a partition is assigned the consumer for the first time, its assigned offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* assignments to the consumer. When a partition is assigned the consumer for the first time, its assigned offset
* assignments to the consumer. When a partition is assigned to the consumer for the first time, its assigned offset

Comment on lines 434 to 438
progressTracker = Some(progressTracker match {
case Some(tracker) => tracker
case None => new ConsumerProgressTrackerImpl()
})
progressTracker.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks more nested than it needs to.

A no-op variant would work here as well:

    if (progressTracker == ConsumerProgressTrackingNoop) {
      progressTracker = new ConsumerProgressTrackerImpl()
    }
    progressTracker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

def addProgressTrackingCallback(callback: ConsumerProgressTracking): Unit = {}
}

class ConsumerProgressTrackerImpl extends ConsumerProgressTracking {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@InternalApi final

if (threshold > 0) new Impl(ref, log, threshold, progress()) else new Noop()
}

private final class Noop() extends ConsumerResetProtection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class Noop() extends ConsumerResetProtection {
private object Noop extends ConsumerResetProtection {

Comment on lines 417 to 418
commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, ensureProgressTracker)
resetProtection = ConsumerResetProtection(self, log, settings.resetProtectionThreshold, ensureProgressTracker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this show the intention better?

    val progressTrackingFactory: () => ConsumerProgressTracking = ensureProgressTracker
    commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, progressTrackingFactory)
    resetProtection = ConsumerResetProtection(self, log, settings.resetProtectionThreshold, progressTrackingFactory)

Comment on lines 50 to 66
val previouslyRequestedOffset = requested.offset()
val threshold = previouslyRequestedOffset - allowedMaxDeltaFromRequest
// if there are records found outside the threshold:
// 1. Drop the records returned for the partition (they are outside the threshold)
// 2. Request a seek to the latest committed offset of the partition (known to be "safe").
// 3. New records for the partition arrive with the next poll, which should be within the threshold
if (recordsExceedThreshold(threshold, partitionRecords)) {
// requested and committed are assumed to be kept in-sync, so this _should_ be safe. Fails
// catastrophically if this is not the case
val committed = progress.committedOffsets(tp)
log.warning(
s"Dropping offsets for partition $tp - received an offset which is less than "
+ s"allowed $threshold from the last requested offset "
+ s"(threshold: $allowedMaxDeltaFromRequest). "
+ s"Seeking to the latest known safe (committed or assigned) offset: $committed"
)
consumer ! Seek(Map(tp -> committed.offset()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the central piece. I think it deserves a method.

Would it be better to pass the actor reference to protect instead of the instance value? (Comes a bit as a surprise that it sends messages back to the actor that called it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back on forth on that API - didn't have a good feel on style for the actor refs, can do!

Comment on lines 114 to 117
private final class Impl(commitRefreshInterval: FiniteDuration, progress: ConsumerProgressTracking)
extends CommitRefreshing
with ConsumerProgressTracking {
progress.addProgressTrackingCallback(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This delegation gives me a hard time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference was to the looser coupling, rather than having some sort of wonky inheritance thing. Open to suggestions though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constraints I came up with for the tracking were:

  • only a single object should track the requested/committed/assigned state for the consumer
  • other objects can query that state, but should not be making updates (outside of calls by the ConsumerActor)
  • commit refreshing needs to know about the changes in consumer state so it can update its internal state.

That's how i ended up with a parent tracker that calls listeners for calls to those update methods. Wanted to avoid another type in the hierarchy for the base (which listeners would implement) and then a separate one for the tracker that lets objects access the consumer progress state.

But yeah, its a bit weird that CommitRefreshing$Impl has committedOffsets/requestedOffsets. My expectation was that it is hidden away by the interface, so its not actually that big of a deal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we implement two interfaces with similar things it becomes blurry which method is here for wich reason.

Maybe an anonymous class make that more clear?

      progress.addProgressTrackingCallback(new ConsumerProgressTracking {
        // required by the progress tracking trait, but not actually used or exposed as this type should only be
        //accessed by its CommitRefreshing interface
        override def committedOffsets: Map[TopicPartition, OffsetAndMetadata] = null
        override def requestedOffsets: Map[TopicPartition, OffsetAndMetadata] = null

        override def revoke(revokedTps: Set[TopicPartition]): Unit = {
          refreshDeadlines = refreshDeadlines -- revokedTps
        }

        override def assignedPositions(assignedTps: Set[TopicPartition],
                                       assignedOffsets: Map[TopicPartition, Long]): Unit = {
          // assigned the partitions, so update all the of deadlines
          refreshDeadlines = refreshDeadlines ++ assignedTps.map(_ -> commitRefreshInterval.fromNow)
        }
      })

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having similar trouble understanding this type hierarchy. I think it would make sense to have a separate type for the listener that is implemented outside this source file and seal the rest of traits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is sounds similar to what I have in the latest. But im the first to admit the weakness in my scala-fu.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was in this type:

trait ConsumerProgressTrackingListener {
  def requested(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {}
  def received[K, V](records: ConsumerRecords[K, V]): Unit = {}
  def committed(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {}
  def revoke(revokedTps: Set[TopicPartition]): Unit = {}
  def assignedPositions(assignedTps: Set[TopicPartition], assignedOffsets: Map[TopicPartition, Long]): Unit = {}
}

We have default impls for the first 4 methods, but they're only ever implemented by ConsumerProgressTracking. Do they belong there instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that if we are going to have a listener for progress, it should be able to listen to all the progress events, if it wants. Felt "lame" to only allow listening for a subset of events. That said, unless you need it YAGNI... i'll drop them. Easy enough to add later :)

Comment on lines 19 to 20
var committedOffsets: Map[TopicPartition, OffsetAndMetadata] = _
var requestedOffsets: Map[TopicPartition, OffsetAndMetadata] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be defs and only the implementation have access to change the values?

@jyates
Copy link
Contributor Author

jyates commented Jan 8, 2021

Thank you for taking a look @ennru!

I wonder how we can document it so that users that might need it can find and understand it.

Hopefully we can use the description in the original ticket as a basis for explanation and then the fact that its a simple field in the config for "try to fix it if it goes over this value" should be easy enough to grok. But something we can definitely run by folks with a "blind" understanding...at the same time, its a pretty "deep" issue and i have a hard time figuring out if we can create a sane "always on" default that will work for all setups.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates.

Hopefully we can use the description in the original ticket as a basis for explanation and then the fact that its a simple field in the config for "try to fix it if it goes over this value" should be easy enough to grok.

Please add a section to the docs (maybe in "Error handling"?), the longer log warnings could refer to the docs section.


/**
* We found that we have previously requested the [[OffsetAndMetadata]] for the [[TopicPartition]], check it
* against the partition records top ensure that we haven't exceeded the threshold. If the records are within the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* against the partition records top ensure that we haven't exceeded the threshold. If the records are within the
* against the partition records to ensure that we haven't exceeded the threshold. If the records are within the

* We found that we have previously requested the [[OffsetAndMetadata]] for the [[TopicPartition]], check it
* against the partition records top ensure that we haven't exceeded the threshold. If the records are within the
* threshold, we just return the given records. If there are records found outside the threshold:
* 1. Drop the records returned for the partition (they are outside the threshold) by returning None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* 1. Drop the records returned for the partition (they are outside the threshold) by returning None)
* 1. Drop the records returned for the partition (they are outside the threshold) by returning `None`

* 3. New records for the partition arrive with the next poll, which should be within the threshold.
*
*/
private[internal] def protectPartition[K, V](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private[internal] def protectPartition[K, V](
private def protectPartition[K, V](

* seeking back to the last committed offset.
* @return [[Some]] records if they can be safely passed to the consumer, returns [[None]] otherwise.
*/
private[internal] def maybeProtectRecords[K, V](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private[internal] def maybeProtectRecords[K, V](
private def maybeProtectRecords[K, V](

Comment on lines 114 to 117
private final class Impl(commitRefreshInterval: FiniteDuration, progress: ConsumerProgressTracking)
extends CommitRefreshing
with ConsumerProgressTracking {
progress.addProgressTrackingCallback(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we implement two interfaces with similar things it becomes blurry which method is here for wich reason.

Maybe an anonymous class make that more clear?

      progress.addProgressTrackingCallback(new ConsumerProgressTracking {
        // required by the progress tracking trait, but not actually used or exposed as this type should only be
        //accessed by its CommitRefreshing interface
        override def committedOffsets: Map[TopicPartition, OffsetAndMetadata] = null
        override def requestedOffsets: Map[TopicPartition, OffsetAndMetadata] = null

        override def revoke(revokedTps: Set[TopicPartition]): Unit = {
          refreshDeadlines = refreshDeadlines -- revokedTps
        }

        override def assignedPositions(assignedTps: Set[TopicPartition],
                                       assignedOffsets: Map[TopicPartition, Long]): Unit = {
          // assigned the partitions, so update all the of deadlines
          refreshDeadlines = refreshDeadlines ++ assignedTps.map(_ -> commitRefreshInterval.fromNow)
        }
      })

@jyates
Copy link
Contributor Author

jyates commented Jan 29, 2021

Appologies for the delay, got busy at the office. Latest version is squashed of last changes addressing comments, onto latest master.

It also includes support for using timestamp deltas as well as offsets (the former being much easier to understand); did require adding another hook for tracking 'received' message high-water marks, but fits in the same model.

@jyates
Copy link
Contributor Author

jyates commented Jan 29, 2021

looks like flaky build system for many of the test failures - failure to setup clusters and such.

Copy link
Member

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good @jyates. I find the type hierarchy a little confusing, maybe we can separate that out a little. More types might be more readable in this case.

Apologies for all the nits. There's a lot of code!


import scala.jdk.CollectionConverters._

trait ConsumerResetProtection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait ConsumerResetProtection {
@InternalApi
sealed trait ConsumerResetProtection {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one got missed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sealed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a link to #1286 here so it's easy to look up.

@seglo seglo added this to the 2.1.0-RC1 milestone Feb 3, 2021
@seglo
Copy link
Member

seglo commented Feb 7, 2021

I'd like to include this in the 2.1.0-RC1 milestone. I don't have a date, but I'm aiming for some time this month.

BTW, if you rebase on master you'll pull in changes that should reduce the number of client timeouts in the build.

@jyates
Copy link
Contributor Author

jyates commented Feb 8, 2021

@seglo thanks for taking a look! Been swamped with work, but planning to take some time this week to address your comments.

@seglo
Copy link
Member

seglo commented Feb 9, 2021

Thanks @jyates.

Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
@seglo
Copy link
Member

seglo commented Feb 12, 2021

Thanks for the updates @jyates. I think you may have missed some of my review comments. GitHub was kind enough to hide some of them so you'll need to expand the hidden review comments to see them (thanks GitHub! :)

One of the CI checks failed too.

https://github.com/akka/alpakka-kafka/pull/1299/checks?check_run_id=1883909741#step:6:98

* commit - [[commitRequested]] - without regard for what had previously been assigned
* or revoked from the consumer. Thus, care should be taken when managing state of the consumer and making updates.
*
* The only case we try and be "smart" is during [[received]], where we will filter out offsets that have not been
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seglo when going through the behavior in tests, this stood out as a bit strange to me. Either feels like we should be smart for all entry to this class or not smart at all (none of this filter-sometimes behavior). I prefer to be 'dumb' here if we can and push the logic into the KafkaConsumerActor if we can. If so, then we could do the "is-assigned" filtering where we also do the reset protection (https://github.com/akka/alpakka-kafka/pull/1299/files#diff-c05b458cf33e1bf97dadc6ea4b01c56daf01a6ddb72bd207c62a7d8331441d60R617 - line 617), and drop the SourceLogicBuffer.

However, it is quite hard to not do the filtering either here or in the actor as we want to we might receive offsets that are not assigned anymore due to the nature of poll results. In that case, we would want to just filter out everything that is not assigned for all the methods here.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seglo when going through the behavior in tests, this stood out as a bit strange to me. Either feels like we should be smart for all entry to this class or not smart at all (none of this filter-sometimes behavior). I prefer to be 'dumb' here if we can and push the logic into the KafkaConsumerActor if we can.

Good point. I'm leaning towards the "dumb" behaviour as well. What would you move to KafkaConsumerActor if we went this route?

If so, then we could do the "is-assigned" filtering where we also do the reset protection (#1299 (files) - line 617), and drop the SourceLogicBuffer.

The SourceLogicBuffer filtering is important because a buffer exists between the KafkaConsumerActor and the first user stage. If partitions are revoked for messages already in the buffer then we'll send late messages, so it would be preferred to keep that logic as well. I'm not sure I understood your point correctly though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping this. I think this is the last significant point to resolve. Would love to get this merged and released with 2.1.0-RC1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, was on vacation last week....

I also seem to have forgotten the cleverness i was considering when i wrote that comment. However, im realizing that there might be a subtle bug here - if we haven't committed the partition yet, we won't have it in requestedOffsetsImpl (which tracks the commits that have been requested...and clearly needs a slight rename) which means we won't track it yet. That is a little unexpected behavior from the outside - you would think received would just track things we receive.

All of this to say, we should probably not try and be clever here (clearly its hard) and just eat the miniscule amount of overhead of tracking some more partitions; its a topic name, partition ID and a long value for each, not really that bad in terms of memory when compared with all the other moving parts here. If its does become bad (too much memory) we could have a chaining map that does topic -> partition -> offset...but that also seems overkill at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now that i look more closely, if we just track the currently assigned partitions, we can filter on that instead. Since revokes and assignments are synchronous, we should be able to use that as our filtering mechanism; which to me makes at least logical sense from the outside.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me.

Copy link
Member

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. I think everything major has been addressed. Just some small nits.

core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
) {

def this(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump. Let's drop this overload and add the MiMa exception.


import scala.jdk.CollectionConverters._

trait ConsumerResetProtection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sealed


import scala.jdk.CollectionConverters._

trait ConsumerResetProtection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a link to #1286 here so it's easy to look up.

docs/src/main/paradox/errorhandling.md Outdated Show resolved Hide resolved
Copy link
Member

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks a lot for adding this feature @jyates !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants