Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5147: Add missing synchronization to TransactionManager #3132

Conversation

apurvam
Copy link
Contributor

@apurvam apurvam commented May 24, 2017

The basic idea is that exactly three collections, ie. pendingRequests, newPartitionsToBeAddedToTransaction, and partitionsInTransaction are accessed from the context of application threads. The first two are modified from the application threads, and the last is read from those threads.

So to make the TransactionManager truly thread safe, we have to ensure that all accesses to these three members are done in a synchronized block. I inspected the code, and I believe this patch puts the synchronization in all the correct places.

@apurvam
Copy link
Contributor Author

apurvam commented May 24, 2017

cc @hachikuji

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4338/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4325/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link

@apurvam Please rebase when you get a chance. Thanks!

@apurvam apurvam force-pushed the KAFKA-5147-transaction-manager-synchronization-fixes branch from 55b7197 to 3f3b036 Compare May 25, 2017 05:36
@apurvam
Copy link
Contributor Author

apurvam commented May 25, 2017

@hachikuji I rebased onto trunk and added more synchronization for both consistency and correctness as we discussed today.

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4375/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4361/
Test FAILed (JDK 8 and Scala 2.12).

@@ -769,7 +769,7 @@ boolean isEndTxn() {
}

@Override
public void handleResponse(AbstractResponse response) {
public synchronized void handleResponse(AbstractResponse response) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the other handlers should synchronize on TransactionManager.this, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Fixed.

@hachikuji
Copy link

One other thing I noticed. The lastError field in TransactionalRequestResult seems like it should be volatile.

@apurvam
Copy link
Contributor Author

apurvam commented May 25, 2017

@hachikuji I addressed your latest comments. Please take a look.

@@ -534,7 +534,9 @@ public void onComplete(ClientResponse response) {
fatalError(response.versionMismatch());
} else if (response.hasResponse()) {
log.trace("Got transactional response for request:" + requestBuilder());
handleResponse(response.responseBody());
synchronized (TransactionManager.this) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@@ -515,7 +515,7 @@ void fail(RuntimeException e) {
result.done();
}

void reenqueue() {
synchronized void reenqueue() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one too should synchronize on TransactionManager.this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Thanks for pointing these out. I am learning things!

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji
Copy link

I'll fix the checkstyle error when I merge.

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4395/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4409/
Test FAILed (JDK 7 and Scala 2.11).

asfgit pushed a commit that referenced this pull request May 25, 2017
The basic idea is that exactly three collections, ie. `pendingRequests`, `newPartitionsToBeAddedToTransaction`, and `partitionsInTransaction` are accessed from the context of application threads. The first two are modified from the application threads, and the last is read from those threads.

So to make the `TransactionManager` truly thread safe, we have to ensure that all accesses to these three members are done in a synchronized block. I inspected the code, and I believe this patch puts the synchronization in all the correct places.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3132 from apurvam/KAFKA-5147-transaction-manager-synchronization-fixes

(cherry picked from commit 02c0c3b)
Signed-off-by: Jason Gustafson <jason@confluent.io>
@asfgit asfgit closed this in 02c0c3b May 25, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants