Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14938: Fixing flaky test testConnectorBoundary #13646

Merged
merged 2 commits into from
Jul 12, 2023

Conversation

sambhav-jain-16
Copy link
Contributor

@sambhav-jain-16 sambhav-jain-16 commented Apr 27, 2023

Fixing org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary test by using consume() in place of consumeAll() since we can specify the minimum records to consume in consume() which is sufficient to assert the test.

Ran the test successfully ~100 times

BUILD SUCCESSFUL in 11s
75 actionable tasks: 1 executed, 74 up-to-date
Completed run: 110

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…xactlyOnceSourceIntegrationTest#testConnectorBoundary
@sagarrao12
Copy link

Thanks @sambhav-jain-16 . This seems to have fixed the test as such going by the build but I am still curious as to why it used to fail expecting 100+ but got 72. How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail?

@sambhav-jain-16
Copy link
Contributor Author

sambhav-jain-16 commented Apr 28, 2023

@sagarrao12
The consumeAll method consumes the available records at the moment we start consuming, i.e. It will store the end offsets initially before consuming and then start to consume up until the end offsets stored (for each partition).
The consume method has no such restriction, it will consume until it receives the specified number of records or till the timeout.
.
.
What I suspect is happening is that when the method is initially storing the end offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector.

@sambhav-jain-16
Copy link
Contributor Author

sambhav-jain-16 commented Apr 28, 2023

How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail?

Yes it will fail, but consumeAll is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming.

If it was timing out, the method would have thrown an Exception.

@yashmayya
Copy link
Contributor

Thanks @sambhav-jain-16

What I suspect is happening is that when the method is initially storing the end offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector.

I'm not sure how this is is possible given that we're waiting for MINIMUM_MESSAGES to be committed first?

connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
log.info("Waiting for records to be provided to worker by task");
// wait for the connector tasks to produce enough records
connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
log.info("Waiting for records to be committed to Kafka by worker");
// wait for the connector tasks to commit enough records
connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
(i.e. SourceTask::commitRecord is called MINIMUM_MESSAGES number of times).

Records are "committed" only after the producer transaction is committed successfully -

if (error == null) {
try {
// Commit the transaction
// Blocks until all outstanding records have been sent and ack'd
producer.commitTransaction();
} catch (Throwable t) {
log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t);
flushError.compareAndSet(null, t);
}
transactionOpen = false;
}
error = flushError.get();
if (error != null) {
recordCommitFailure(time.milliseconds() - started, null);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
error
);
}
transactionMetrics.commitTransaction();
long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
// Synchronize in order to guarantee that writes on other threads are picked up by this one
synchronized (commitableRecords) {
commitableRecords.forEach(this::commitTaskRecord);

@vamossagar12
Copy link
Collaborator

Yeah I agree with @yashmayya . Moreover this

Yes it will fail, but consumeAll is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming.

is not entirely correct i think. I agree what gets thrown in an AssertionError but thats because the number of sourceRecords returned by consumeAll didn't meet the desired number of records within 60s. For starters, can you try increasing CONSUME_RECORDS_TIMEOUT_MS to 100s or as such and see if it even works? Basically, we need to check if consumer is lagging or are enough records being produced? I i think it would mostly be the former because as Yash said, we are anyways waiting for 100 records to be committed. It's not an ideal fix but let's first see if it works and if needed we can dig deeper.

@sambhav-jain-16
Copy link
Contributor Author

hi @vamossagar12

For starters, can you try increasing CONSUME_RECORDS_TIMEOUT_MS to 100s or as such and see if it even works?

Actually I tried it and it failed for me that's why I started to look into the consumeAll method.

@sambhav-jain-16
Copy link
Contributor Author

sambhav-jain-16 commented May 3, 2023

I think I get what is happening.
The consumer is polling the records here

ConsumerRecords<byte[], byte[]> recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs));

However while getting the lastConsumedOffset

long lastConsumedOffset = consumer.position(topicPartition);

the value is coming more than the actual consumed records in the last loop and that's why it is stopping prematurely here.

we need to check if consumer is lagging

Even if consumer is lagging, the method was designed to consume atleast endoffsets before exiting which is clearly not happening in this case.

IMO we should fix the consumeAll method itself in this PR.

@yashmayya @vamossagar12

@vamossagar12
Copy link
Collaborator

Thanks @sambhav-jain-16 . So, one thing that I note is that the readEndOffsets method is using IsolationLevel.READ_UNCOMMITTED when reading the messages while the test uses read_committed. Ideally, read_uncommitted should yield more records than read_committed so this is a bit weird. Can you try setting readEndOffsets to use read_committed and see if that has the same behaviour as you described above?

@C0urante C0urante added connect tests Test fixes (including flaky tests) labels May 3, 2023
@sambhav-jain-16
Copy link
Contributor Author

Hi @vamossagar12,
I can set that but actually endoffsets is getting populated with right values. The issue is with the way lastConsumedOffset is being used.

@sambhav-jain-16
Copy link
Contributor Author

sambhav-jain-16 commented May 15, 2023

I was able to know the reason why the position reported by consumer.position() is more than the actual records consumed.

Transactional producer produces commit messages into the user topic itself which is marked as a "Control" batch. However when these messages are consumed, the consumer deliberately skips these messages and doesn't add them to the returned records list. However, the offsets are moved forward to continue for further batches.

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}

Now the issue with consumeAll method is that it exits when it reaches particular offsets rather than consuming certain number of messages which consume does. I think if our use-case of this test is to consume minimum messages rather than offsets, we should use consume. However, if the intention of consumeAll is to do something in the same lines (which looks like from the java doc of the method), we should change the consumeAll method.
@C0urante WDYT?

@sambhav-jain-16
Copy link
Contributor Author

Hi @kirktrue
I have looked upon the changes from your refactoring PR here. Although i couldn't find any changes in the business logic of the code from before, Can you confirm here that there are no changes in the logic of fetchRecords() method. TIA

@vamossagar12
Copy link
Collaborator

Thanks @sambhav-jain-16 , @kirktrue , just to give more context, there are ITs in connect for EOS which rely upon the position() API to assert some messages ebing read. But they seem to have be failing now. What the tests do is to read the end offsets for Topic Partitions and keep consuming messages. The moment the last consumed offset exceeds the end offset, the test exits. Basically this block of code:

Map<TopicPartition, Long> endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = topicPartitions.stream()
.collect(Collectors.toMap(
Function.identity(),
tp -> new ArrayList<>()
));
consumer.assign(topicPartitions);
while (!endOffsets.isEmpty()) {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition);
if (lastConsumedOffset >= endOffset) {
// We've reached the end offset for the topic partition; can stop polling it now
it.remove();
} else {
remainingTimeMs = endTimeMs - System.currentTimeMillis();
if (remainingTimeMs <= 0) {
throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms");
}
// We haven't reached the end offset yet; need to keep polling
ConsumerRecords<byte[], byte[]> recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs));
recordBatch.partitions().forEach(tp -> records.get(tp)
.addAll(recordBatch.records(tp))
);

In this case, the assertion fails because even though the last consumed offsets zooms past the endOffset read upfront, the number of records aren't what we are expecting it to be (even though we have already ensured that those many number of records are produced successfully). We can go ahead and look at other ways of fixing this tests but wanted to understand if there's a fundamental thing which has changed or we are missing?

PS: this is the flaky test in question

@sambhav-jain-16 sambhav-jain-16 marked this pull request as ready for review May 16, 2023 05:29
@sambhav-jain-16
Copy link
Contributor Author

Hi @C0urante @kirktrue
Can you PTAL at the above comments when available. TIA

@sambhav-jain-16
Copy link
Contributor Author

Hi @kirktrue @C0urante
Can you PTAL at the above comments when available. TIA

@C0urante
Copy link
Contributor

@sambhav-jain-16 thanks for looking into this. I'm prioritizing a few other KIP-related PRs at the moment but will try to make time for this sometime in the next couple of weeks.

@sambhav-jain-16
Copy link
Contributor Author

Hi @C0urante, Did you get a chance to take a look at it? TIA

@sudeshwasnik
Copy link

sudeshwasnik commented Jun 14, 2023

hi @sambhav-jain-16 @C0urante , here's what i think may be happening,
the test is written in a way that ,

  1. Assumes/expects that when recordsToCommitLatch is decremented X times (in awaitCommits), there should be X records in topics.
  2. Source connector is never stopped. It is continously running in the background, producing more records into the topic.

The test doesn't do awaitCommit and asserts consumed-records at the same time. If awaitCommit was checked at timestamp t1, and records were consumed at t2 -> between (t1, t2) timegap there might have been more records produced into the topic. Thus the assertion is not strong enough. (we should try to stop source-connector at-or-before t1). IMO (t1,t2) is reason for flakiness, because assumption 1 is wrong itself. But may have passed some time.

Reason why assumption 1 is wrong, and we can't expect X records to be present in topic when recordsToCommitLatch is decremented X times.

Say r1, ... rY were sent by producer sucessfully in a transaction txn1 (not completed yet). Then all r1..rY are stored in commitableRecords from this line.
Now, if rY says txn1 has to be aborted, txn1 is aborted, and all records r1, ..rY are dropped. (essentially unreadble).
But we still return true (link) for shouldCommitTransactionForRecord. Which means we still try to do commitTransaction. In commitTransaction -> we end up calling commitTaskRecord for each record in commitableRecords (r1, .. rY). For each such commitRecord - eventually recordsToCommitLatch is decremented.
So for the above case -> [r1, .. rY] were never produced (since no consumers can read them, they are from aborted transaction), ... but this assertion will still be true.

Since assumption 1 is incorrect, we should change the test where it doesn't expect every record that decrements recordsRemainingLatch also must have been produced.

imo flakiness is just due to source connector running in background continuously producing records.

@vamossagar12
Copy link
Collaborator

Thanks @sudeshwasnik , for the assessment. Continuing the line of thought, if you check the MonitorableSourceConnector#poll method, it appears to me that we decrement the recordsRemainingLatch latch equal to the number of records in the batch irrespective of the fact that whether the records are going to be part of a transaction that can be aborted here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java#L215-L216

Because of this, the awaitRecords method is able to have the latch decremented enough number of times to be counted down to 0 within the timeout.

However, as you also rightly pointed out, the recordToCommitLatch is decremented for either cases of abort or commit of the txn which means awaitCommits also passes within the timeout.

Could this make the test flaky since the bound on the recordsRemainingLatch doesn't seem to be strong enough to ensure that the actual number of records in the topic equals that? Would it help if we decrement the recordsRemainingLatch for cases of committed transactions only?

@sudeshwasnik
Copy link

sudeshwasnik commented Jun 15, 2023

hi @sagarrao12 , small correction in my comment earlier -

Since assumption 1 is incorrect, we should change the test where it doesn't expect every record that decrements recordsToCommitLatch also must have been produced.

it seems that recordsRemainingLatch only tries to wait until X records have been returned by source-task to framework. Say MINIMUM_MESSAGES = 1000, but MESSAGES_PER_POLL is configured 100, so we need to wait until source-task delivers 1000 messages to framework. This helps in not including production time in awaitCommits assertion timeout (?).... wdyt ?

Also, the reason this PR passes the test now is because, it doesn't validate X records are present in topic when X countDown for awaitCommits is done. It'll wait until X records are produced into the topic -> by then there could've been many more commitRecords.
This assertion
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(), sourceRecords.count() >= MINIMUM_MESSAGES);

doesn't help now because connector is running continously, so there WILL be MINIMUM_MESSAGES produced eventually.

@C0urante
Copy link
Contributor

Thanks for the detailed analysis, everyone!

Root cause

I believe @sudeshwasnik's latest theory is correct: the Connect runtime invokes SourceTask::commitRecord even for records in aborted transactions, which causes ConnectorHandle::awaitCommits to return before the expected number of (non-aborted) records has been produced to Kafka.

Consume-all implementation

I haven't been able to find any issues with EmbeddedKafkaCluster::consumeAll. The use of the read_uncommitted isolation level for fetching end offsets and the read_committed isolation level for consuming is intentional, and mirrors logic we use elsewhere in the Connect runtime (see TopicAdmin::endOffsets, which is used by the KafkaBasedLog class when reading to the end of a topic, even if the log's consumer is configured with the read_committed isolation level). This ensures that, when reading to the end of a topic, we reach the end of any in-progress transactions on the topic.

Consume vs. consume all

With regards to the change in the PR--the concern with consuming a fixed number of records from the topic is that we can potentially see a gap in sequence numbers if the topic has multiple partitions, since we wouldn't be guaranteed to consume records in the same order they were produced (which is why I implemented and used EmbeddedKafkaCluster::consumeAll when writing these tests initially; you can find the discussion here).

Could we stick with using consumeAll and instead bump the number of expected records/commits? I drafted this change locally and it seemed to work well:

        // the connector aborts some transactions, which causes records that it has emitted (and for which
        // SourceTask::commitRecord has been invoked) to be invisible to consumers; we expect the task to
        // emit at most 233 records in total before 100 records have been emitted as part of one or more
        // committed transactions
        connectorHandle.expectedRecords(233);
        connectorHandle.expectedCommits(233);

(This would replace the existing code here.)

Transaction size logging

Also, as an aside--it was really helpful to know how many records were in each aborted/committed transaction while investigating this test. I tweaked MonitorableSourceConnector.MonitorableSourceTask::maybeDefineTransactionBoundary to provide this info; if you agree that it'd be useful, feel free to add it to this PR (regardless of which fix we use):

        private void maybeDefineTransactionBoundary(SourceRecord record) {
            if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
                return;
            }
            long transactionSize = nextTransactionBoundary - priorTransactionBoundary;
            // If the transaction boundary ends on an even-numbered offset, abort it
            // Otherwise, commit
            boolean abort = nextTransactionBoundary % 2 == 0;
            calculateNextBoundary();
            if (abort) {
                log.info("Aborting transaction of {} records", transactionSize);
                context.transactionContext().abortTransaction(record);
            } else {
                log.info("Committing transaction of {} records", transactionSize);
                context.transactionContext().commitTransaction(record);
            }
        }
    }

And in fact, if we believe this would be useful for all connectors, we could even add this kind of logging to the ExactlyOnceWorkerSourceTask class. But that should be done in a separate PR.

@sambhav-jain-16
Copy link
Contributor Author

Thanks @C0urante for the response.

I have applied the suggested changes. PTAL

And in fact, if we believe this would be useful for all connectors, we could even add this kind of logging to the ExactlyOnceWorkerSourceTask class. But that should be done in a separate PR.

I'll create a ticket for the same and attach it in this PR comments'

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sambhav-jain-16! LGTM

@C0urante C0urante merged commit d114d8e into apache:trunk Jul 12, 2023
1 check failed
C0urante pushed a commit that referenced this pull request Jul 12, 2023
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, 
Sudesh Wasnik <swasnik@confluent.io>, Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Jul 12, 2023
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>,
Sudesh Wasnik <swasnik@confluent.io>, Chris Egerton <chrise@aiven.io>
@sambhav-jain-16 sambhav-jain-16 deleted the KAFKA-14938 branch July 13, 2023 05:47
Cerchie pushed a commit to Cerchie/kafka that referenced this pull request Jul 25, 2023
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, 
Sudesh Wasnik <swasnik@confluent.io>, Chris Egerton <chrise@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connect tests Test fixes (including flaky tests)
Projects
None yet
6 participants