Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException #8705

Merged

Conversation

rajinisivaram
Copy link
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712
Copy link
Contributor

How about making a collection copy of completedReceives when traversing completedReceives?

@rajinisivaram
Copy link
Contributor Author

@chia7712 I was tempted to do that initially, but that is not the pattern we use for everything else in Selector and it has always been this way (for several years), so adding tests to make sure we don't break it made more sense.

@@ -1742,6 +1746,12 @@ class SocketServerTest {
selector = Some(testableSelector)
testableSelector
}

override private[network] def processException(errorMessage: String, throwable: Throwable, isUncaught: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isUncaught is used by testing only so it is a bit awkward to production code. Could you check the errorMessage instead of adding new argument? for example:

if (errorMessage == "Processor got uncaught exception.") uncaughtExceptions += 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny you should say that, I was initially checking the error message and then felt that the test wouldn't fail if the error message was changed. But since then I had also updated the test which triggers the uncaught exception code path, so it is actually safe now to check the error message. Have updated the code.

@ijuma
Copy link
Contributor

ijuma commented May 25, 2020

@rajinisivaram What's the implication of not removing the completed receive in doClose?

@rajinisivaram rajinisivaram force-pushed the KAFKA-10029-completed-receive branch from a551d91 to 09589be Compare May 26, 2020 10:02
@rajinisivaram
Copy link
Contributor Author

@ijuma By not updating completedReceives when the channel is closed, we retain a reference to the channel until the next poll(). In terms of receives themselves, there is no impact. Looking at the code again, it seems unnecessary to use KafkaChannel as the key for the receive map. Like all other maps in Selector that store poll state for the channel, we can use the channel id instead. Updated the code.

@ijuma
Copy link
Contributor

ijuma commented May 27, 2020

@rajinisivaram We also retain a reference to the NetworkReceive, which is probably a bigger deal, right?

@ijuma
Copy link
Contributor

ijuma commented May 27, 2020

This change is probably OK, but the way we call close while iterating over completedReceives seems a bit fragile. It would probably be safer to collect the items we need to close and close them in a separate iteration. What do you think @rajinisivaram?

@ijuma
Copy link
Contributor

ijuma commented May 27, 2020

One more thing, can we improve KafkaChannel.hashCode/equals to avoid unnecessary work? The calls to Objects.equals and Objects.hash seem pointless.

@rajinisivaram
Copy link
Contributor Author

@ijuma Based on our discussion, I have added Selector#clearCompletedSends() and Selector#clearCompletedReceives() for SocketServer to clear buffers after they are processed. Also updated KafkaChannel. Can you review please? Thank you.

@rajinisivaram rajinisivaram requested a review from ijuma May 27, 2020 16:07
channelOpt.foreach { channel => map.put(channel.id, receive) }
}
cachedCompletedSends.currentPollValues.foreach(super.completedSends.add)
cachedDisconnected.currentPollValues.foreach { case (id, state) => super.disconnected.put(id, state) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment explaining what we're trying to do here? It's not clear why we do (for example):

cachedCompletedSends.update(super.completedSends.asScala) followed by
cachedCompletedSends.currentPollValues.foreach(super.completedSends.add)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the second line that updates current result into update(). They were done separately because each type uses slightly different format, but it is clearer if they are together. Added comments as well.

@@ -1807,6 +1817,7 @@ class SocketServerTest {
currentPollValues ++= newValues
} else
deferredValues ++= newValues
newValues.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the goal of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return minPerPoll results together, so the current values are cleared and then populated as necessary. The code is now in the same place, so hopefully that is clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I was asking is that we call toBuffer before calling this method for a couple of cases. That will basically create a copy, so then this clear won't do anything useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I refactored a bit to clear the original buffer in each case.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, looks good overall. I had some questions about the tests and one nit suggestion.

* Clear the results from the prior poll
* Clears completed receives. This is used by SocketServer to remove references to
* receive buffers after processing completed receives, without waiting for the next
* poll() after all results have been processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe after all results have been processed is a bit redundant? Same for the clearCompletedSends docs.

@rajinisivaram
Copy link
Contributor Author

@ijuma Thanks for the review, have addressed the comments.

cachedCompletedSends.update(super.completedSends.asScala)
cachedDisconnected.update(super.disconnected.asScala.toBuffer)

val map: util.Map[String, NetworkReceive] = JTestUtils.fieldValue(this, classOf[Selector], "completedReceives")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this completedReceivesMap or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thanks for the review, updated.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. Just one minor suggestion below, no need for re-review.

val channelOpt = Option(super.channel(receive.source)).orElse(Option(super.closingChannel(receive.source)))
channelOpt.foreach { channel => completedReceivesMap.put(channel.id, receive) }
}

// For each result type (completedReceives/completedSends/disconnected), defer the result to a subsequent poll()
// if `minPerPoll` results are not yet available. When sufficient results are available, all available results
// including previously deferred results are returned. This allows tests to process `minPerPoll` elements as the
// results of a single poll iteration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your refactoring has added the comments to other places. Maybe we can add "This allows tests to process minPerPoll elements as the results of a single poll iteration" to the update method and remove this?

@rajinisivaram
Copy link
Contributor Author

@ijuma @chia7712 Thanks for the reviews, merging to trunk and 2.5.

@rajinisivaram rajinisivaram merged commit 1fd195e into apache:trunk May 29, 2020
rajinisivaram added a commit that referenced this pull request May 29, 2020
…to avoid ConcurrentModificationException (#8705)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
rajinisivaram added a commit that referenced this pull request May 29, 2020
…to avoid ConcurrentModificationException (#8705)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 30, 2020
* 'trunk' of github.com:apache/kafka: (36 commits)
  Remove redundant `containsKey` call in KafkaProducer (apache#8761)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  MINOR: Log the reason for coordinator discovery failure (apache#8747)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  MINOR: remove unnecessary timeout for admin request (apache#8738)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  MINOR: Update documentation.html to refer to 2.6 (apache#8745)
  MINOR: Update documentation.html to refer to 2.5 (apache#8744)
  KAFKA-9673: Filter and Conditional SMTs (apache#8699)
  KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720)
  KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735)
  MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728)
  KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736)
  KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731)
  KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589)
  ...

# Conflicts:
#	core/src/main/scala/kafka/log/Log.scala
ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 3, 2020
* apache-github/2.6: (32 commits)
  KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786)
  KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737)
  KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695)
  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777)
  MINOR: Remove unused variable to fix spotBugs failure (apache#8779)
  MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773)
  KAFKA-10030: Allow fetching a key from a single partition (apache#8706)
  Kafka-10064 Add documentation for KIP-571 (apache#8760)
  MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750)
  KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668)
  KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956)
  KAFKA-10074: Improve performance of `matchingAcls` (apache#8769)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants