Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12763 NoSuchElementException during checkpointLogStartOffsets #10650

Closed
wants to merge 1 commit into from

Conversation

soarez
Copy link
Member

@soarez soarez commented May 7, 2021

The following exception shows that log.logSegments may be empty during checkpointLogStartOffsets.
logSegments.headOption should be used instead of logSegments.head to prevent this exception.

{
  "class": "java.util.NoSuchElementException",
  "msg": null,
  "stack": [
    "java.util.concurrent.ConcurrentSkipListMap$ValueIterator.next(ConcurrentSkipListMap.java:2123)",
    "scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.next(JavaCollectionWrappers.scala:38)",
    "scala.collection.IterableOps.head(Iterable.scala:218)",
    "scala.collection.IterableOps.head$(Iterable.scala:218)",
    "scala.collection.AbstractIterable.head(Iterable.scala:920)",
    "kafka.log.LogManager$$anonfun$4.applyOrElse(LogManager.scala:640)",
    "kafka.log.LogManager$$anonfun$4.applyOrElse(LogManager.scala:639)",
    "scala.collection.Iterator$$anon$7.hasNext(Iterator.scala:516)",
    "scala.collection.mutable.Growable.addAll(Growable.scala:61)",
    "scala.collection.mutable.Growable.addAll$(Growable.scala:59)",
    "scala.collection.mutable.HashMap.addAll(HashMap.scala:111)",
    "scala.collection.mutable.HashMap$.from(HashMap.scala:549)",
    "scala.collection.mutable.HashMap$.from(HashMap.scala:542)",
    "scala.collection.MapFactory$Delegate.from(Factory.scala:425)",
    "scala.collection.MapOps.collect(Map.scala:283)",
    "scala.collection.MapOps.collect$(Map.scala:282)",
    "scala.collection.AbstractMap.collect(Map.scala:375)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$2(LogManager.scala:639)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1(LogManager.scala:636)",
    "kafka.log.LogManager.checkpointLogStartOffsetsInDir(LogManager.scala:635)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsets$1(LogManager.scala:600)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsets$1$adapted(LogManager.scala:600)",
    "scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)",
    "scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)",
    "scala.collection.AbstractIterable.foreach(Iterable.scala:920)",
    "kafka.log.LogManager.checkpointLogStartOffsets(LogManager.scala:600)",
    "kafka.log.LogManager.$anonfun$startup$6(LogManager.scala:426)",
    "kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)",
    "java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)",
    "java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)",
    "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)",
    "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)",
    "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)",
    "java.lang.Thread.run(Thread.java:834)"
  ]
}

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…sets

`log.logSegments` may be empty during `checkpointLogStartOffsets`

```
{
  "class": "java.util.NoSuchElementException",
  "msg": null,
  "stack": [
    "java.util.concurrent.ConcurrentSkipListMap$ValueIterator.next(ConcurrentSkipListMap.java:2123)",
    "scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.next(JavaCollectionWrappers.scala:38)",
    "scala.collection.IterableOps.head(Iterable.scala:218)",
    "scala.collection.IterableOps.head$(Iterable.scala:218)",
    "scala.collection.AbstractIterable.head(Iterable.scala:920)",
    "kafka.log.LogManager$$anonfun$4.applyOrElse(LogManager.scala:640)",
    "kafka.log.LogManager$$anonfun$4.applyOrElse(LogManager.scala:639)",
    "scala.collection.Iterator$$anon$7.hasNext(Iterator.scala:516)",
    "scala.collection.mutable.Growable.addAll(Growable.scala:61)",
    "scala.collection.mutable.Growable.addAll$(Growable.scala:59)",
    "scala.collection.mutable.HashMap.addAll(HashMap.scala:111)",
    "scala.collection.mutable.HashMap$.from(HashMap.scala:549)",
    "scala.collection.mutable.HashMap$.from(HashMap.scala:542)",
    "scala.collection.MapFactory$Delegate.from(Factory.scala:425)",
    "scala.collection.MapOps.collect(Map.scala:283)",
    "scala.collection.MapOps.collect$(Map.scala:282)",
    "scala.collection.AbstractMap.collect(Map.scala:375)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$2(LogManager.scala:639)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsetsInDir$1(LogManager.scala:636)",
    "kafka.log.LogManager.checkpointLogStartOffsetsInDir(LogManager.scala:635)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsets$1(LogManager.scala:600)",
    "kafka.log.LogManager.$anonfun$checkpointLogStartOffsets$1$adapted(LogManager.scala:600)",
    "scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)",
    "scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)",
    "scala.collection.AbstractIterable.foreach(Iterable.scala:920)",
    "kafka.log.LogManager.checkpointLogStartOffsets(LogManager.scala:600)",
    "kafka.log.LogManager.$anonfun$startup$6(LogManager.scala:426)",
    "kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)",
    "java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)",
    "java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)",
    "java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)",
    "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)",
    "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)",
    "java.lang.Thread.run(Thread.java:834)"
  ]
}
```
@soarez
Copy link
Member Author

soarez commented May 7, 2021

@soarez
Copy link
Member Author

soarez commented May 7, 2021

@ijuma @junrao please take a look at this one liner.

@soarez
Copy link
Member Author

soarez commented May 18, 2021

@ijuma can you have a look at this one?

@dhruvilshah3
Copy link
Contributor

@soarez do you know why the log was left without any segments? Typically, we expect the log to have the active segment at the least. If you have broker logs from around the time when the issue happened, those may be useful to help understand the issue a bit better.

@soarez
Copy link
Member Author

soarez commented May 18, 2021

Hi @dhruvilshah3

This happened when a broker that was offline for a long time came back up. I agree this shouldn't happen in the first place. I have logs, but it isn't clear how this happened.

Still, given that it can happen, it seems to me that it is still better to avoid this exception. And I see no downsides in adding the extra check.

@ijuma
Copy link
Contributor

ijuma commented May 18, 2021

@soarez the concern is that it may hide a problem. So, it's important to understand the sequence of events.

@kowshik
Copy link
Contributor

kowshik commented May 20, 2021

@soarez Thanks for the PR. The JIRA says the issue was encountered in Kafka version 2.6.2. Are you able to also reproduce this issue on AK trunk (latest code) and/or in versions released after 2.6.2?

@soarez
Copy link
Member Author

soarez commented May 21, 2021

@ijuma I can see it makes to not hide the underlying issue. But at the moment, the exception provides no helpful information either. What do you think about adding the topic name and partition number to the exception message when this failure happens?

@kowshik No. This is a tricky one to reproduce, it looks like a race condition. However, there are some clues about what happened.

The exception happened shortly after the broker that had been offline for over a week was started again in a busy cluster. And just before this NoSuchElementException was thrown there were many warnings of this sort:

[Log partition=foo-1, dir=/d3/data] Could not find offset index file corresponding to log file /d3/data/foo-1/00000000000003424067.log, recovering segment and rebuilding index files...

Looking at the 2.6.2 source, this message is only produced in Log.loadSegmentFiles, which is only called from Log.loadSegments. And right before that call there is a segments.clear() which will cause logSegments to be empty. I think that's how this happened.

Looking at trunk the segments are also cleared in the same way.

@kowshik
Copy link
Contributor

kowshik commented May 21, 2021

@soarez Nice explanation. Regarding the segments getting cleared, note that the call to recoverLog() will eventually ensure that there is at least the active segment, see these lines. So it is surprising that you saw the exception.

@soarez
Copy link
Member Author

soarez commented May 21, 2021

@kowshik I'm not sure it is surprising. The NoSuchElementException is thrown from the scheduled task kafka-log-start-offset-checkpoint - see original stack trace - which runs in a separate thread, and as far as I can see there is no synchronization logic to ensure that it won't access segments between the segments.clear() I pointed out, and the subsequent lines you refer to. There a time window there were the segments are actually empty.

There is an initial delay on the scheduled task of 30 seconds. I'm guessing that initial delay is there to let all the logs load up and recover correctly. But if there are a lot of logs to recover - like in the case of an inactive broker coming back after a long time in a busy cluster - that may not be enough.

@kowshik
Copy link
Contributor

kowshik commented May 22, 2021

@soarez The Log.loadSegments() call happens in Log constructor. I'm curious how would the segments.clear() call from there race with the kafka-log-start-offset-checkpoint periodic work, because the periodic work can't see the Log instance until it is instantiated and made discoverable by adding to LogManager.currentLogs or LogManager.futureLogs.

@soarez
Copy link
Member Author

soarez commented May 22, 2021

@kowshik I see, that makes sense. My theory about how this happens does not hold. The race must happen after the constructor exits.

I'm going to patch our brokers to log extra information when if this exception is thrown and will try to reproduce in the same cluster.

@soarez
Copy link
Member Author

soarez commented Oct 20, 2022

Closing due to #11950

@soarez soarez closed this Oct 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants