Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka4811: ReplicaFetchThread may fail to create due to existing metric #2606

Closed
wants to merge 3 commits into from

Conversation

@huxihx
Copy link
Contributor

huxihx commented Feb 28, 2017

Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId, but did not consider the case where port is changed.

Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId
@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Feb 28, 2017

@junrao please have a review on this PR. Thanks.

@asfbot

This comment has been minimized.

Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1880/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1878/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1877/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Contributor

junrao left a comment

@amethystic : Thanks for submitting the patch so quickly. Looks good overall. Just a couple of minor comments.

fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
fetcherThreadMap.get(BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)) match {

This comment has been minimized.

Copy link
@junrao

junrao Feb 28, 2017

Contributor

Could we reuse brokerIdAndFetcherId?

This comment has been minimized.

Copy link
@huxihx

huxihx Mar 1, 2017

Author Contributor

Seems that partitionsPerFetcher will be keyed off BrokerAndFetcherId after the groupBy operation, so in the later iteration, we have to create a new BrokerIdAndFetcherId for each BrokerAndFetcherId key.

This comment has been minimized.

Copy link
@junrao

junrao Mar 2, 2017

Contributor

I was just saying that we already defined a val brokerIdAndFetcherId in line 86. Could we just use that in line 87 instead of creating a new BrokerIdAndFetcherId object?

This comment has been minimized.

Copy link
@huxihx

huxihx Mar 2, 2017

Author Contributor

@junrao Thank you the comment. Already addressed. Please review again.

case Some(f) => fetcherThread = f
val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
fetcherThreadMap.get(BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)) match {
case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host =>

This comment has been minimized.

Copy link
@junrao

junrao Feb 28, 2017

Contributor

As you suggested in the jira, it would be useful to verify both host and port.

Added the port check for whether a new FetcherThread should be created
@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Mar 1, 2017

@junrao added the port check. Please take some time to review again. Thanks.

@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1908/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1910/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1907/
Test PASSed (JDK 7 and Scala 2.10).

@junrao

This comment has been minimized.

Copy link
Contributor

junrao commented Mar 2, 2017

@amethystic : Thanks for the new patch. Added a comment to your reply.

Removed unnecessary BrokerIdAndFetcher instance creation.
@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1943/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1946/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

This comment has been minimized.

Copy link

asfbot commented Mar 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1944/
Test FAILed (JDK 8 and Scala 2.12).

@junrao

This comment has been minimized.

Copy link
Contributor

junrao commented Mar 2, 2017

@amethystic : Thanks for the patch. LGTM

@asfgit asfgit closed this in 1b902b4 Mar 2, 2017
@huxihx huxihx deleted the huxihx:kafka4811_ReplicaFetchThread_fail_create branch Mar 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.