Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4034: Avoid unnecessary consumer coordinator lookup #1720

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -997,9 +997,6 @@ public ConsumerRecords<K, V> poll(long timeout) {
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorReady();

// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
Expand Down Expand Up @@ -1429,11 +1426,22 @@ private void close(boolean swallowException) {
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);

if (!subscriptions.hasAllFetchPositions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that both calls to this function is covered by this check already (subscriptions.position(partition) == null as similar effect), but I think no harm to add here just in case.

// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy

// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();

// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}

/*
Expand Down
Expand Up @@ -22,6 +22,10 @@ public class RetriableCommitFailedException extends RetriableException {

private static final long serialVersionUID = 1L;

public RetriableCommitFailedException(Throwable t) {
super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
}

public RetriableCommitFailedException(String message) {
super(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These exceptions are public classes so I don't think we can remove the existing constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll bring them back.

}
Expand Down
Expand Up @@ -98,6 +98,8 @@ public abstract class AbstractCoordinator implements Closeable {
protected String protocol;
protected int generation;

private RequestFuture<Void> findCoordinatorFuture = null;

/**
* Initialize the coordination manager.
*/
Expand Down Expand Up @@ -175,7 +177,7 @@ protected abstract void onJoinComplete(int generation,
*/
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();
RequestFuture<Void> future = lookupCoordinator();
client.poll(future);

if (future.failed()) {
Expand All @@ -189,8 +191,25 @@ public void ensureCoordinatorReady() {
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}

protected RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
findCoordinatorFuture = sendGroupCoordinatorRequest();
findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
findCoordinatorFuture = null;
}

@Override
public void onFailure(RuntimeException e) {
findCoordinatorFuture = null;
}
});
}
return findCoordinatorFuture;
}

/**
Expand All @@ -205,6 +224,10 @@ protected boolean needRejoin() {
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();

if (!needRejoin())
return;

Expand Down
Expand Up @@ -366,7 +366,36 @@ public void close() {
}


public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}

@Override
public void onFailure(RuntimeException e) {
callback.onComplete(offsets, new RetriableCommitFailedException(e));
}
});
}

// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
Expand All @@ -381,17 +410,12 @@ public void onSuccess(Void value) {
@Override
public void onFailure(RuntimeException e) {
if (e instanceof RetriableException) {
cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
cb.onComplete(offsets, new RetriableCommitFailedException(e));
} else {
cb.onComplete(offsets, e);
}
}
});

// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}

/**
Expand Down
Expand Up @@ -154,6 +154,18 @@ public void onFailure(RuntimeException e) {
}
}

/**
* Lookup and set offsets for any partitions which are awaiting an explicit reset.
* @param partitions the partitions to reset
*/
public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
resetOffset(tp);
}
}

/**
* Update the fetch positions for the provided partitions.
* @param partitions the partitions to update positions for
Expand All @@ -165,7 +177,6 @@ public void updateFetchPositions(Set<TopicPartition> partitions) {
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue;

// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isOffsetResetNeeded(tp)) {
resetOffset(tp);
} else if (subscriptions.committed(tp) == null) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public class SubscriptionState {

private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
};
}

/* the type of subscription */
private SubscriptionType subscriptionType;
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void testConstructorClose() throws Exception {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
Expand Down Expand Up @@ -454,6 +455,38 @@ public boolean matches(ClientRequest request) {
assertTrue(heartbeatReceived.get());
}

@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
String topic = "topic";
final TopicPartition partition = new TopicPartition(topic, 0);
int sessionTimeoutMs = 3000;
int heartbeatIntervalMs = 2000;
int autoCommitIntervalMs = 1000;

Time time = new MockTime();
MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
client.setNode(node);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
PartitionAssignor assignor = new RoundRobinAssignor();

final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
consumer.assign(Arrays.asList(partition));
consumer.seekToBeginning(Arrays.asList(partition));

// there shouldn't be any need to lookup the coordinator or fetch committed offsets.
// we just lookup the starting position and send the record fetch.
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code()));
client.prepareResponse(fetchResponse(partition, 50L, 5));

ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(partition));
}

@Test
public void testCommitsFetchedDuringAssign() {
String topic = "topic";
Expand Down Expand Up @@ -669,6 +702,15 @@ private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
return new OffsetFetchResponse(partitionData).toStruct();
}

private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
Collections.singletonList(partitionOffset.getValue())));
}
return new ListOffsetResponse(partitionData).toStruct();
}

private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
for (int i = 0; i < count; i++)
Expand Down
Expand Up @@ -336,13 +336,15 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
}

@Test
def testConsumeWithNoGroupAccess(): Unit = {
def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()

addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
// note this still depends on group access because we haven't set offsets explicitly, which means
// they will first be fetched from the consumer coordinator (which requires group access)
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
Expand All @@ -351,6 +353,20 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
}
}

@Test
def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()

addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)

// in this case, we do an explicit seek, so there should be no need to query the coordinator at all
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.seekToBeginning(List(tp).asJava)
consumeRecords(this.consumers.head)
}

@Test
def testConsumeWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
Expand Down