Skip to content

[KAFKA-7132] [WIP] Consider adding a faster form of rebalance #5340

Closed
ConcurrencyPractitioner wants to merge 93 commits into
apache:trunkfrom
ConcurrencyPractitioner:KIP-333
Closed

[KAFKA-7132] [WIP] Consider adding a faster form of rebalance #5340
ConcurrencyPractitioner wants to merge 93 commits into
apache:trunkfrom
ConcurrencyPractitioner:KIP-333

Conversation

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor

@ConcurrencyPractitioner ConcurrencyPractitioner commented Jul 6, 2018

Currently, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data. To avoid this from happening, one option would be to allow the current consumer to start processing not from the last checkpointed offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start reading from offset 70 in concurrency with the old process, and will be terminated once it reaches 120. In this manner, a considerable amount of lag can be avoided, particularly since the old consumer could proceed as if nothing had happened.

Here is the design doc for the pull request:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

cc @becketqin @hachikuji

private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics;
private final AtomicInteger pendingAsyncCommits;
private List<RebalanceConsumerCoordinator> rebalanceCoordinators = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RebalanceConsumerCoordinator -> ConsumerRebalancingCoordinator

//fetch start offset
//fetch end offset (from which the consumer will start polling)
//create new RebalanceConsumer
rebalanceInProgress.compareAndSet(true, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the return value be checked ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the current design right now ... I will need to check.

}

public void setValue(final boolean useMultithreadRebalancing) {
if (this.useMultithreadRebalancing == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this.useMultithreadRebalancing and useMultithreadRebalancing carry different values (one being true and the other being false) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this setup is to make this.useMultithreadRebalancing effectively final. Once set to a non-null value, it can no longer be changed.

} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
rebalanceInProgress.compareAndSet(false, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is better set in LeaveGroupResponseHandler where we know whether the LeaveGroup request succeeded or not

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a point, I will make the change. :)

true,
new ApiVersions(),
throttleTimeSensor,
time, true, new ApiVersions(), throttleTimeSensor,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to keep the parameters aligned (having same indentation)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPath Complexity is a boon. Checkstyle believes KafkaConsumer's constructor contains too many lines. I did this to make the method shorter.

consumerThread = new Thread(rebalanceConsumer);
}
if (coordinator.isRebalancing(false)) {
System.out.println("Starting thread");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use log object if this is to be kept

null,
new HashMap<>(),
new HashMap<>());
consumerThread = new Thread(rebalanceConsumer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerThread -> rebalancingConsumerThread

private RebalanceKafkaConsumer.RequestResult pollForResults(final long timeoutMs, final long now) {
long elapsed = time.milliseconds() - now;
boolean condition = remainingTimeAtLeastZero(timeoutMs, elapsed) != 0;
while (result == null && condition) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since condition is just a comparison, you can put the comparison here directly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I did this to make it shorter. (Wanted to make it more readable)

while (result == null && condition) {
try {
Thread.sleep(retryBackoffMs);
} catch (InterruptedException exc) { }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restore interrupt status by calling Thread.currentThread().interrupt();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sure I will do that.

return result;
}

private ConsumerRecords<K, V> mergeRecords(final ConsumerRecords<K, V> records1, final ConsumerRecords<K, V> records2) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved to ConsumerRecords class

}

private ConsumerRecords<K, V> mergeRecords(final ConsumerRecords<K, V> records1, final ConsumerRecords<K, V> records2) {
final HashMap<TopicPartition, List<ConsumerRecord<K, V>>> map = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when records2 is empty, you can return immediately.

return new ConsumerRecords<>(map);
}

private ConsumerRecords<K, V> processRecords(final long timeoutMs,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add javadoc for the parameters

result == null ? (ConsumerRecords<K, V>) pollForResults(timeoutMs, checkRebalanceStart).value
: (ConsumerRecords<K, V>) result.value;
if (offsetLagRecords == null) {
return this.interceptors.onConsume(new ConsumerRecords<>(records));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract the call to this.interceptors.onConsume(new ConsumerRecords<>(records)) above line 1258 - its result would always be used

* @return true if the secondary consumer thread created is alive.
* false if not alive or has null value
*/
public boolean childConsumerIsAlive() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebalanceConsumerIsAlive

final long hashCode2 = childConsumerMetadata.hashCode();

rebalanceConsumer.setOptionalInputArgument(childConsumerMetadata, hashCode1, hashCode2);
rebalanceConsumer.sendRequest(null,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if only one of this call and the commitOffsetsAsync call on line 1642 succeeds ?

final Map<TopicPartition, Long> offsets1 = fetcher.endOffsets(partitions, timeout.toMillis());
final Map<TopicPartition, Long> offsets2 = (Map<TopicPartition, Long>) pollForResults(timeout.toMillis(), time.milliseconds()).value;
final Map<TopicPartition, Long> result = new HashMap<>();
for (final TopicPartition partition : partitions) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if you can refactor this code which is similar to what beginningOffsets has (apart from the condition between pos1 and pos2)

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.PriorityBlockingQueue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why switch data structure ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the current policy I have set up in the KIP, this part is adapted to accomadate for OffsetCommitCallback. Currently, we split offsets being committed into two, so now we have two CommitCallbacks. If both of them are called separately, then we have two calls to the callback. We cannot let that happen to ensure that Kafka's behavior remains the same. Therefore, we need both OffsetCommitCompletions in PriorityBlockingQueue before calling OffsetCommitCallback as one call instead of two. The Hashcodes are used to compare two OffsetCommitCompletions.

addMetadataListener();
}

// method will automatically set to false upon retrieving value
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value is assigned instead of false, right ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, need to update.

completions.get(completions.size() - 1);
boolean containsMatch =
!completions.isEmpty() && previous.hashCode == completion.hashCode;
if (completion.hashCode == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would hashCode be 0 ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default. When we are not splitting the offsets between the two consumers for committing, then we are setting the hashCode to zero.

completions.add(completion);
}
}
for (OffsetCommitCompletion completion : completions) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using java.util.Collections.addAll()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addAll() is not thread safe for PriorityBlockingQueue. Unlike add(). This way we could make it more thread safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants