Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync #666

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,10 @@ public long position(TopicPartition partition) {
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
offset = this.subscriptions.consumed(partition);
offset = this.subscriptions.position(partition);
}
return offset;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
if (!subscriptions.isPaused(entry.getKey())) {
List<ConsumerRecord<K, V>> recs = entry.getValue();
if (!recs.isEmpty())
this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
this.subscriptions.position(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
}
}

Expand Down Expand Up @@ -229,10 +229,10 @@ public long position(TopicPartition partition) {
ensureNotClosed();
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPosition(partition);
offset = this.subscriptions.consumed(partition);
offset = this.subscriptions.position(partition);
}
return offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
continue;
}
Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && entry.getValue().equals(consumed))
Long position = subscriptions.position(entry.getKey());
// ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
if (position != null && entry.getValue().equals(position))
currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
}
this.offsetOutOfRangePartitions.clear();
Expand Down Expand Up @@ -401,18 +401,15 @@ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {

// note that the consumed position should always be available
// as long as the partition is still assigned
long consumed = subscriptions.consumed(part.partition);
long position = subscriptions.position(part.partition);
if (!subscriptions.isFetchable(part.partition)) {
// this can happen when a partition consumption paused before fetched records are returned to the consumer's poll call
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);

// we also need to reset the fetch positions to pretend we did not fetch
// this partition in the previous request at all
subscriptions.fetched(part.partition, consumed);
} else if (part.fetchOffset == consumed) {
} else if (part.fetchOffset == position) {
long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;

log.trace("Returning fetched records for assigned partition {} and update consumed position to {}", part.partition, nextOffset);
log.debug("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, part.partition, nextOffset);

List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
Expand All @@ -421,11 +418,13 @@ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
} else {
records.addAll(part.records);
}
subscriptions.consumed(part.partition, nextOffset);

subscriptions.position(part.partition, nextOffset);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
part.partition, part.fetchOffset, position);
}
}
this.records.clear();
Expand Down Expand Up @@ -513,11 +512,9 @@ private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
fetchable.put(node, fetch);
}

long fetched = this.subscriptions.fetched(partition);
long consumed = this.subscriptions.consumed(partition);
// Only fetch data for partitions whose previously fetched data has been consumed
if (consumed == fetched)
fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
log.debug("Added fetch request for partition {} at offset {}", partition, position);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to consider dropping some of these log.debugs to log.trace. Some of the logs in error conditions make sense at debug, but logging every fetch request and response at debug might make changing from info to debug a bit overwhelming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace level is fine with me.

}
}

Expand Down Expand Up @@ -550,29 +547,25 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {

// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long consumed = subscriptions.consumed(tp);
if (consumed == null) {
continue;
} else if (consumed != fetchOffset) {
// the fetched position has gotten out of sync with the consumed position
// (which might happen when a rebalance occurs with a fetch in-flight),
// so we need to reset the fetch position so the next fetch is right
subscriptions.fetched(tp, consumed);
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
continue;
}

int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
for (LogEntry logEntry : records) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}

if (!parsed.isEmpty()) {
log.debug("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (buffer.limit() > 0) {
Expand All @@ -594,7 +587,7 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
subscriptions.needOffsetReset(tp);
else
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
log.warn("Not authorized to read from topic {}.", tp.topic());
unauthorizedTopics.add(tp.topic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,6 @@ public Set<String> groupSubscription() {
return this.groupSubscription;
}

public Long fetched(TopicPartition tp) {
return assignedState(tp).fetched;
}

public void fetched(TopicPartition tp, long offset) {
assignedState(tp).fetched(offset);
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.get(tp);
if (state == null)
Expand Down Expand Up @@ -270,20 +262,20 @@ public boolean partitionsAutoAssigned() {
return !this.subscription.isEmpty();
}

public void consumed(TopicPartition tp, long offset) {
assignedState(tp).consumed(offset);
public void position(TopicPartition tp, long offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it seems like we could have also solved this problem by fixing the places where the fetched and consumed get into inconsistent states. I like that we've simplified things, but I'm wondering if this also means we're ruling out (or making more complicated) possible improvements (e.g., prefetching larger chunks of data by allowing fetch requests even when consumed < fetched) or if such changes would require effectively reverting the bulk of this patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is the main question with this simplification. However, keep in mind that we cannot send more than one fetch request at a time because we don't know the following offset to fetch from. So to get any advantage from allowing the fetched position to get farther than a single fetch ahead of the consumed position, we would need to initiate fetches after the last fetch returned from the server, but before the results were returned to the user. I don't see a lot of opportunity for optimization here, but I could be missing something. I think instead the way for consumers to tune the amount of data fetched is with fetch.min.bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I was trying to figure out under what conditions it would be useful to allow for multiple fetch requests while you haven't asked for the data back. I can think of two cases:

  1. Traffic is very bursty but you still want low latency, so you might prefer a smaller fetch.min.bytes, but also to be able to buffer up data so that when it's coming in fast, you'll .
  2. Processing speed is variable (or messages vary significantly in size), so 1 fetch with a fixed max bytes setting won't necessarily keep you fed with enough data.

Both are probably fairly niche and you probably need to hit an extreme case to warrant adjusting consumer settings much, but I don't think we have a way to control it at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind of cases are probably best left to be handled in user code. The user can collect the messages in their own batch until they have enough data to process. Until we have a really compelling use case, I think I prefer the simpler approach in this patch since maintaining consumed and fetched has proven error-prone.

There is one unfortunate side effect of this change which doesn't appear to impact current code, but should mentioned anyway. If you call initFetches while you have records available in the internal buffer, you will end up sending an unnecessary fetch. I'm checking if there's a simple way to fix this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to consider is memory limit on the consumer (we currently do not have such management as we did in producer, but it is tracked in KAFKA-2045). I agree that pre-fetch would be helpful with bursty network, or more generally speaking you would probably want to get some data in each selector.poll() even if there is data buffered for all partitions, as long as buffered data does not exceed the memory limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also worth saying that if we need to bring back this distinction in order to implement prefetching improvements, it's straightforward to bring it back. I'm also in favour of simplifying for now.

assignedState(tp).position(offset);
}

public Long consumed(TopicPartition tp) {
return assignedState(tp).consumed;
public Long position(TopicPartition tp) {
return assignedState(tp).position;
}

public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
TopicPartitionState state = entry.getValue();
if (state.hasValidPosition)
allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.consumed));
allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));
}
return allConsumed;
}
Expand Down Expand Up @@ -356,8 +348,7 @@ public ConsumerRebalanceListener listener() {
}

private static class TopicPartitionState {
private Long consumed; // offset exposed to the user
private Long fetched; // current fetch position
private Long position;
private OffsetAndMetadata committed; // last committed position

private boolean hasValidPosition; // whether we have valid consumed and fetched positions
Expand All @@ -367,8 +358,7 @@ private static class TopicPartitionState {

public TopicPartitionState() {
this.paused = false;
this.consumed = null;
this.fetched = null;
this.position = null;
this.committed = null;
this.awaitingReset = false;
this.hasValidPosition = false;
Expand All @@ -378,29 +368,21 @@ public TopicPartitionState() {
private void awaitReset(OffsetResetStrategy strategy) {
this.awaitingReset = true;
this.resetStrategy = strategy;
this.consumed = null;
this.fetched = null;
this.position = null;
this.hasValidPosition = false;
}

private void seek(long offset) {
this.consumed = offset;
this.fetched = offset;
this.position = offset;
this.awaitingReset = false;
this.resetStrategy = null;
this.hasValidPosition = true;
}

private void fetched(long offset) {
private void position(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
this.fetched = offset;
}

private void consumed(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions");
this.consumed = offset;
this.position = offset;
}

private void committed(OffsetAndMetadata offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public void testFetchNormal() {
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
assertEquals(4L, (long) subscriptions.consumed(tp));
assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position
long offset = 1;
for (ConsumerRecord<byte[], byte[]> record : records) {
assertEquals(offset, record.offset());
Expand Down Expand Up @@ -149,8 +148,7 @@ public void testFetchNonContinuousRecords() {
consumerClient.poll(0);
consumerRecords = fetcher.fetchedRecords().get(tp);
assertEquals(3, consumerRecords.size());
assertEquals(31L, (long) subscriptions.fetched(tp)); // this is the next fetching position
assertEquals(31L, (long) subscriptions.consumed(tp));
assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position

assertEquals(15L, consumerRecords.get(0).offset());
assertEquals(20L, consumerRecords.get(1).offset());
Expand Down Expand Up @@ -267,8 +265,7 @@ public void testFetchOffsetOutOfRange() {
consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(null, subscriptions.fetched(tp));
assertEquals(null, subscriptions.consumed(tp));
assertEquals(null, subscriptions.position(tp));
}

@Test
Expand Down Expand Up @@ -316,8 +313,7 @@ public void testFetchDisconnected() {
// disconnects should have no affect on subscription state
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(0, (long) subscriptions.fetched(tp));
assertEquals(0, (long) subscriptions.consumed(tp));
assertEquals(0, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -329,8 +325,7 @@ public void testUpdateFetchPositionToCommitted() {

fetcher.updateFetchPositions(Collections.singleton(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -343,8 +338,7 @@ public void testUpdateFetchPositionResetToDefaultOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -357,8 +351,7 @@ public void testUpdateFetchPositionResetToLatestOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -371,8 +364,7 @@ public void testUpdateFetchPositionResetToEarliestOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -390,8 +382,7 @@ public void testUpdateFetchPositionDisconnect() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,11 @@ public void partitionAssignment() {
public void partitionReset() {
state.assignFromUser(Arrays.asList(tp0));
state.seek(tp0, 5);
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
assertEquals(5L, (long) state.position(tp0));
state.needOffsetReset(tp0);
assertFalse(state.isFetchable(tp0));
assertTrue(state.isOffsetResetNeeded(tp0));
assertEquals(null, state.fetched(tp0));
assertEquals(null, state.consumed(tp0));
assertEquals(null, state.position(tp0));

// seek should clear the reset and make the partition fetchable
state.seek(tp0, 0);
Expand Down Expand Up @@ -114,33 +112,20 @@ public void commitOffsetMetadata() {
}

@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
public void invalidPositionUpdate() {
state.subscribe(Arrays.asList(topic), rebalanceListener);
state.assignFromSubscribed(asList(tp0));
state.consumed(tp0, 0);
state.position(tp0, 0);
}

@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
state.subscribe(Arrays.asList(topic), rebalanceListener);
state.assignFromSubscribed(asList(tp0));
state.fetched(tp0, 0);
public void cantChangePositionForNonAssignedPartition() {
state.position(tp0, 1);
}

@Test(expected = IllegalStateException.class)
public void cantChangeFetchPositionForNonAssignedPartition() {
state.fetched(tp0, 1);
}

@Test(expected = IllegalStateException.class)
public void cantChangeConsumedPositionForNonAssignedPartition() {
state.consumed(tp0, 1);
}

public void assertAllPositions(TopicPartition tp, Long offset) {
assertEquals(offset.longValue(), state.committed(tp).offset());
assertEquals(offset, state.fetched(tp));
assertEquals(offset, state.consumed(tp));
assertEquals(offset, state.position(tp));
}

@Test(expected = IllegalStateException.class)
Expand Down