Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14938: Fixing flaky test testConnectorBoundary #13646

Merged
merged 2 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,12 @@ public void testConnectorBoundary() throws Exception {
props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);

// expect all records to be consumed and committed by the connector
connectorHandle.expectedRecords(MINIMUM_MESSAGES);
connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// the connector aborts some transactions, which causes records that it has emitted (and for which
// SourceTask::commitRecord has been invoked) to be invisible to consumers; we expect the task to
// emit at most 233 records in total before 100 records have been emitted as part of one or more
// committed transactions
connectorHandle.expectedRecords(233);
connectorHandle.expectedCommits(233);

// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
Expand All @@ -413,10 +416,10 @@ public void testConnectorBoundary() throws Exception {
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// consume all records from the source topic or fail, to ensure that they were correctly produced
ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka().consumeAll(
CONSUME_RECORDS_TIMEOUT_MS,
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
null,
topic
CONSUME_RECORDS_TIMEOUT_MS,
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
sourceRecords.count() >= MINIMUM_MESSAGES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,17 @@ private void maybeDefineTransactionBoundary(SourceRecord record) {
if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
return;
}
long transactionSize = nextTransactionBoundary - priorTransactionBoundary;

// If the transaction boundary ends on an even-numbered offset, abort it
// Otherwise, commit
boolean abort = nextTransactionBoundary % 2 == 0;
calculateNextBoundary();
if (abort) {
log.info("Aborting transaction of {} records", transactionSize);
context.transactionContext().abortTransaction(record);
} else {
log.info("Committing transaction of {} records", transactionSize);
context.transactionContext().commitTransaction(record);
}
}
Expand Down