Skip to content

Commit

Permalink
KAFKA-4034; Avoid unnecessary consumer coordinator lookup
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1720 from hachikuji/KAFKA-4034
  • Loading branch information
hachikuji authored and ijuma committed Aug 12, 2016
1 parent fc55f80 commit be36b32
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 19 deletions.
Expand Up @@ -997,9 +997,6 @@ public ConsumerRecords<K, V> poll(long timeout) {
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorReady();

// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
Expand Down Expand Up @@ -1429,11 +1426,22 @@ private void close(boolean swallowException) {
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);

if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy

// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();

// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}

/*
Expand Down
Expand Up @@ -22,6 +22,10 @@ public class RetriableCommitFailedException extends RetriableException {

private static final long serialVersionUID = 1L;

public RetriableCommitFailedException(Throwable t) {
super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
}

public RetriableCommitFailedException(String message) {
super(message);
}
Expand Down
Expand Up @@ -98,6 +98,8 @@ public abstract class AbstractCoordinator implements Closeable {
protected String protocol;
protected int generation;

private RequestFuture<Void> findCoordinatorFuture = null;

/**
* Initialize the coordination manager.
*/
Expand Down Expand Up @@ -175,7 +177,7 @@ protected abstract void onJoinComplete(int generation,
*/
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupCoordinatorRequest();
RequestFuture<Void> future = lookupCoordinator();
client.poll(future);

if (future.failed()) {
Expand All @@ -189,8 +191,25 @@ public void ensureCoordinatorReady() {
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}

protected RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
findCoordinatorFuture = sendGroupCoordinatorRequest();
findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
findCoordinatorFuture = null;
}

@Override
public void onFailure(RuntimeException e) {
findCoordinatorFuture = null;
}
});
}
return findCoordinatorFuture;
}

/**
Expand All @@ -205,6 +224,10 @@ protected boolean needRejoin() {
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();

if (!needRejoin())
return;

Expand Down
Expand Up @@ -366,7 +366,36 @@ public void close() {
}


public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}

@Override
public void onFailure(RuntimeException e) {
callback.onComplete(offsets, new RetriableCommitFailedException(e));
}
});
}

// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
Expand All @@ -381,17 +410,12 @@ public void onSuccess(Void value) {
@Override
public void onFailure(RuntimeException e) {
if (e instanceof RetriableException) {
cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
cb.onComplete(offsets, new RetriableCommitFailedException(e));
} else {
cb.onComplete(offsets, e);
}
}
});

// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}

/**
Expand Down
Expand Up @@ -154,6 +154,18 @@ public void onFailure(RuntimeException e) {
}
}

/**
* Lookup and set offsets for any partitions which are awaiting an explicit reset.
* @param partitions the partitions to reset
*/
public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
resetOffset(tp);
}
}

/**
* Update the fetch positions for the provided partitions.
* @param partitions the partitions to update positions for
Expand All @@ -165,7 +177,6 @@ public void updateFetchPositions(Set<TopicPartition> partitions) {
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue;

// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isOffsetResetNeeded(tp)) {
resetOffset(tp);
} else if (subscriptions.committed(tp) == null) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public class SubscriptionState {

private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
};
}

/* the type of subscription */
private SubscriptionType subscriptionType;
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void testConstructorClose() throws Exception {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
Expand Down Expand Up @@ -454,6 +455,38 @@ public boolean matches(ClientRequest request) {
assertTrue(heartbeatReceived.get());
}

@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
String topic = "topic";
final TopicPartition partition = new TopicPartition(topic, 0);
int sessionTimeoutMs = 3000;
int heartbeatIntervalMs = 2000;
int autoCommitIntervalMs = 1000;

Time time = new MockTime();
MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
client.setNode(node);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
PartitionAssignor assignor = new RoundRobinAssignor();

final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
consumer.assign(Arrays.asList(partition));
consumer.seekToBeginning(Arrays.asList(partition));

// there shouldn't be any need to lookup the coordinator or fetch committed offsets.
// we just lookup the starting position and send the record fetch.
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code()));
client.prepareResponse(fetchResponse(partition, 50L, 5));

ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(partition));
}

@Test
public void testCommitsFetchedDuringAssign() {
String topic = "topic";
Expand Down Expand Up @@ -669,6 +702,15 @@ private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
return new OffsetFetchResponse(partitionData).toStruct();
}

private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
Collections.singletonList(partitionOffset.getValue())));
}
return new ListOffsetResponse(partitionData).toStruct();
}

private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
for (int i = 0; i < count; i++)
Expand Down
Expand Up @@ -340,13 +340,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}

@Test
def testConsumeWithNoGroupAccess(): Unit = {
def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()

addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
// note this still depends on group access because we haven't set offsets explicitly, which means
// they will first be fetched from the consumer coordinator (which requires group access)
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
Expand All @@ -355,6 +357,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}

@Test
def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()

addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)

// in this case, we do an explicit seek, so there should be no need to query the coordinator at all
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.seekToBeginning(List(tp).asJava)
consumeRecords(this.consumers.head)
}

@Test
def testConsumeWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
Expand Down

0 comments on commit be36b32

Please sign in to comment.