Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5059: Implement Transactional Coordinator #2849

Closed
wants to merge 59 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Apr 13, 2017

No description provided.

guozhangwang and others added 23 commits April 11, 2017 14:05
* add transaction log message format
* add transaction timeout to initPid request
* collapse to one message type
* sub-package transaction and group classes within coordinator
* add loading and cleaning up logic
* add transaction configs
* add all broker-side configs
* check for transaction timeout value
* added one more exception type
* handling add offsets to txn
* add a pending state with prepareTransition / completeTransaction / abortTransition of state
* refactor handling logic for multiple in-flight requests
 1. Notable conflicts are with the small API changes to
DelayedOperation and the newly introduced purgeDataBefore PR.

 2. Jason's update to support streaming decompression required a bit of
an overhaul to the way we handle aborted transactions on the consumer.
Add tests for TransactionMarkerRequestCompletionHandler
@dguy
Copy link
Contributor Author

dguy commented Apr 13, 2017

@ijuma @junrao @guozhangwang @apurvam @mjsax for reviews please.
Note: this is not the complete TransactionCoordinator, i.e., transaction expiration, TransactionalId -> PID mapping expiration, and recovery in handling initPidRequest all haven't been done yet

@asfbot
Copy link

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2928/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2923/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2924/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2924/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy : Thanks for the updated patch. Made another pass of the non-test files and added some more comments. Some of the issues can potentially be addressed in a followup patch, as long as we mark that clearly. Also, it seems that we don't have the code to (1) abort a long transaction; (2) expire a transactional id not being actively used some time?

}
def sendResponseCallback(error: Errors) {
val responseBody = new EndTxnResponse(error)
trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId()}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be consistent on whether to use () when calling clientId() ?

}

private def loadTransactionMetadata(topicPartition: TopicPartition) {
def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be addressed. The loading in GroupCoordinator has the same issue.

} else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about all those independent checks on transactional state w/o any coordinator level locking. For example, in theory, a coordinator emigration and immigration could have happened after the check in line 104. Then, the appendMetadataToLog()/initPidWithExistingMetadata() call could mess up some state.

I was thinking that another way of doing this is to maintain a read/write lock for the coordinator partition. Immigration/emigration will hold the write lock while setting the state. Other calls like initPid will hold the read lock, do the proper coordinator state check, initiate the process like appending to the log and then release the read lock (we already have such a partition level lock in Partition, not sure if it's easily reusable). This will potentially give us better protection and make things easier to reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao thanks - yeah that makes sense. I was thinking about locking, too, but wasn't sure of the correct level to do it at, but the partition level seems ok. Will look into it. Thanks for the suggestion

Copy link
Contributor Author

@dguy dguy Apr 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao the TC maintains multiple partitions, so we'd need to have a lock per partition. You mentioned that there is a read/write lock on partition - i believe you are referring to leaderIsrUpdateLock... I can't see any other locks in Partition. Anyway, do we want to expose this for other classes to use? I'd probably think not.

If we maintain a lock per partition then perhaps it should be done by the TransactionStateManager and then we'd need to add/remove locks in the immigration/emigration. I think we'd also need to add another method on TransactionStateManager, say partitionLock(partitionId) that returns an Option[ReentrantReadWriteLock]. The calls in TransactionCoordinator to isCoordinatorFor could then be replaced with calls to partitionLock(partitionId) - if the lock exists they take a read lock. If it doesn't exist then respond with Errors.NOT_COORDINATOR

Does this seem sensible?

@guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, maybe we can have a read-write lock on the txn metadata cache and only release the read lock after the txn log has been appended locall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang per partition? or a global lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, i'll add a single read/write lock in the coordinator as it is much simpler than having to maintain multiple. If that is not ok, we can revisit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most operations just need to hold a read lock. Only emigration/immigration need to hold a write lock. So, perhaps having a single lock per broker is also fine as long as we don't hold the lock for too long (i.e., we should mostly be just setting critical states while holding the lock. Any expensive stuff should be done outside the lock).


// there might be a concurrent thread that has just updated the mapping
// with the transactional id at the same time; in this case we will
// treat it as the metadata has existed and update it accordingly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, appendMetadataToLog() is not doing exactly the same as if the metadata has existed. It seems that we will be missing all those checks on metadata state in initPidWithExistingMetadata()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what was meant by the comment, but i think you are correct in that we should do initPidWithExistingMetadata() in the case that they aren't the same. @guozhangwang any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @junrao 's comment is that we did some checking on the txn metadata's state in initPidWithExistingMetadata whereas we did not do such checking before calling appendMetadataToLog. Have explained to him that it is because at line 129 we are assured that the metadata is just newly created and hence it's always Ongoing. Maybe the comment itself has been outdated after the addition of the initPidWithExistingMetadata logic.

// with the transactional id at the same time; in this case we will
// treat it as the metadata has existed and update it accordingly
metadata synchronized {
if (!metadata.equals(newMetadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just do the eq check on reference instead?

case Some(partitionInfo) =>
val brokerId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
if (currentBrokers.add(brokerId)) {
// TODO: What should we do if we get BrokerEndPointNotAvailableException?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just wait until the target broker is available.

brokerId
case None =>
// TODO: there is a rare case that the producer gets the partition info from another broker who has the newer information of the
// partition, while the TC itself has not received the propagated metadata update; do we need to block when this happens?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, instead of throwing IllegalStateException, it seems that we should just keep retrying until successful?

completionCallback(Errors.INVALID_TXN_STATE)
else {
val delayedTxnMarker = new DelayedTxnMarker(metadataToWrite, completionCallback)
txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(metadata.pid))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the timeout in DelayedTxnMarker is infinite. I am wondering if we really need a txnMarkerPurgatory. In TransactionMarkerRequestCompletionHandler, we are already updating the pending partitions as the client response comes back. The response that removes the last pending partition can just trigger the calling of completionCallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In initPidWithExistingMetadata we also need to wait on the transaction to complete if there is an inflight transaction in the PrepareAbort or PrepareCommit phase.


def appendTransactionToLog(transactionalId: String,
txnMetadata: TransactionMetadata,
responseCallback: Errors => Unit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the broker's message format is < V2, currently, when appending to the log, we simply convert it to an old format. In this case, we want to error out and respond to the client with a TransactionNotSupported error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao i'm not really sure what i'm supposed to be checking here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about the inter-broker protocol version. Details are here:

http://kafka.apache.org/documentation/#upgrade_10_1

Maybe just leave a TODO marker and I can address it in a follow-up PR, so we would not drag too long for this one?

request.request,
now,
true,
completionHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are sending new type of requests across the brokers, we need to check inter broker protocol and error out if the new request is not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would i do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded in another comment. Let's do this incrementally in another PR and just leave a TODO in this PR. Otherwise we would be looking at a 10K diff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3157/
Test PASSed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented Apr 25, 2017

@junrao thanks for taking the time to review again.
Regarding:

Also, it seems that we don't have the code to (1) abort a long transaction; (2) expire a transactional id not being actively used some time?

Correct they have not been done yet.

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3163/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3157/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the added non-test code beyond my commits, also places that I got pinged.

@@ -285,7 +285,7 @@ public void forceClose() {

private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
String nodeId = node.idString();
InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
InitPidRequest.Builder builder = new InitPidRequest.Builder(null, Integer.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case do we really need this change in this PR? Maybe we can just remove this change as it is actually doing the same still.

@@ -38,11 +38,12 @@
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes intentional? The original ordering seems OK to me.

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
return coordinatorEpoch == that.coordinatorEpoch &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, I wonder if the coordinatorEpoch should also be in the internal entry as well, since different txn log partition leader's epoch hence the coordinator epoch would be different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: in the existing branch I have already made those changes a while back: https://github.com/guozhangwang/kafka/blob/KEOS-transactions-coordinator-network-thread/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkerRequest.java

The design doc however is not updated.

I saw you did a groupBy on the coordinatorEpoch instead, so that each write marker request will only contain one coordinatorEpoch, but since on the broker side, this coordinator epoch is checked inside the Log layer anyways I felt it is better to change this field as a per-marker-entry field in the protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this should change back to what you previously had? We originally had your code, but during the merge with other changes it was probably removed. That is why i did the groupBy on coordinatorEpoch.

metrics,
time,
"txn-marker-channel",
Map("broker-id" -> config.brokerId.toString).asJava,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this in the original commit but: since this thread is owned by the broker only, we do not need this tag. Instead we can just pass an empty tag map.

new ManualMetadataUpdater(),
threadName,
1,
50,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Currently we do not have a broker-side reconnect.backoff config yet so different modules just hand-code different values. But moving forward I felt we may want to introduce a new config for inter-broker reconnect backoff.


if (responseError != Errors.NONE) {
debug(s"Updating $transactionalId's transaction state to $txnMetadata for $transactionalId failed after the transaction message " +
s"has been appended to the log since the metadata does not match anymore.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log entry was wrong, maybe just "since the the appended log did not successfully replicate to all replicas". Does that sound better?

completionHandler.onComplete(disConnectedResponse)
}
}
networkClient.poll(pollTimeout, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now have one queue per broker, and 1) we drain all the elements in the queue whenever trying to send; 2) we wake up the client whenever we are adding new elements to the queue; I think it is not as critical to set lower values?

} else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, maybe we can have a read-write lock on the txn metadata cache and only release the read lock after the txn log has been appended locall?


// there might be a concurrent thread that has just updated the mapping
// with the transactional id at the same time; in this case we will
// treat it as the metadata has existed and update it accordingly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @junrao 's comment is that we did some checking on the txn metadata's state in initPidWithExistingMetadata whereas we did not do such checking before calling appendMetadataToLog. Have explained to him that it is because at line 129 we are assured that the metadata is just newly created and hence it's always Ongoing. Maybe the comment itself has been outdated after the addition of the initPidWithExistingMetadata logic.

}

private[transaction]
def drainQueuedTransactionMarkers(txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): Iterable[RequestAndCompletionHandler] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @junrao What's the motivation of trying to drain all the queued elements? Since the max inflight request is only 1 in the network client, even if we construct multiple requests for a certain destination only the first request will succeed in sending right? In that case could just do the 1) peek-first 2) if-ready-send-and-pop pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang
Copy link
Contributor

guozhangwang commented Apr 26, 2017

Collapsed all commits and merged to trunk.

@asfgit asfgit closed this in f69d941 Apr 26, 2017
@ijuma
Copy link
Contributor

ijuma commented Apr 26, 2017

For large PRs like this, we should run the system tests before we merge. Can we post a link to a successful run for future record (assuming we've done that)?

@hachikuji
Copy link

@ijuma I've kicked off a build here: https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder-2/275/. Let's cross our fingers since it's already merged!

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3190/
Test PASSed (JDK 7 and Scala 2.10).

@ijuma
Copy link
Contributor

ijuma commented Apr 27, 2017

Thanks @hachikuji, build passed. :)

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3189/
Test PASSed (JDK 8 and Scala 2.12).

@ijuma ijuma deleted the exactly-once-tc branch January 25, 2020 16:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants