-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9608: Transaction Event Simulation Test #8725
Conversation
Random drop message support add request completion temporal committed offsets
2a3261c
to
61414e9
Compare
61414e9
to
4c352e0
Compare
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I like the approach at a high level. I left a few suggestions to make the tests more interesting.
...rc/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
Show resolved
Hide resolved
.../test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
Show resolved
Hide resolved
void runOnce(boolean dropMessage) { | ||
Queue<ClientRequest> incomingRequests = networkClient.requests(); | ||
|
||
final boolean faultInject = faultInjectRandom.nextBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if it would be more useful if we can control the faults more explicitly. For example, we could add a hook to make the coordinator temporarily unavailable and to restore it later.
pendingOffsets.put(key, partition.committedOffset()); | ||
errors.put(key, Errors.NONE); | ||
} else { | ||
errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the client has send TxnOffsetCommit before the partition has been added to the transaction, which is an illegal state transition. Could we throw an assertion error or something directly to cause the test to fail?
); | ||
} | ||
|
||
private AbstractResponse handleProduce(ProduceRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this really interesting, we would need to add some sequence number bookkeeping. Really its the sequence/epoch bookkeeping which makes the implementation so complex.
// Trigger KIP-360 path. | ||
errors.put(topicPartition, new PartitionResponse(Errors.UNKNOWN_PRODUCER_ID)); | ||
} else { | ||
List<Record> sentRecords = pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the offset commit path, it would be useful to validate here that each partition that was written to was first added to the transaction properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made an attempt on that, but since we don't have a retry mechanism yet, it seems a bit hard to assume AddPartition
to be successful before produce.
return new ProduceResponse(errors, throttleTimeMs); | ||
} | ||
|
||
private EndTxnResponse handleEndTxn(EndTxnRequest request, final boolean faultInject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another class of failure that we can simulate is when a request reaches the broker and gets handled, but the connection is lost before the response is sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought we do have that coverage on L126:
networkClient.respond(response, disconnect);
Start the basic template for transaction event simulation.
Committer Checklist (excluded from commit message)