Skip to content

Commit

Permalink
KAFKA-2978: consumer stops fetching when consumed and fetch positions…
Browse files Browse the repository at this point in the history
… get out of sync

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Michal Turek, Ismael Juma, Guozhang Wang

Closes #666 from hachikuji/KAFKA-2978
  • Loading branch information
hachikuji authored and guozhangwang committed Dec 14, 2015
1 parent 3fed579 commit e08b922
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,10 @@ public long position(TopicPartition partition) {
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
offset = this.subscriptions.consumed(partition);
offset = this.subscriptions.position(partition);
}
return offset;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
if (!subscriptions.isPaused(entry.getKey())) {
List<ConsumerRecord<K, V>> recs = entry.getValue();
if (!recs.isEmpty())
this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
this.subscriptions.position(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
}
}

Expand Down Expand Up @@ -229,10 +229,10 @@ public long position(TopicPartition partition) {
ensureNotClosed();
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPosition(partition);
offset = this.subscriptions.consumed(partition);
offset = this.subscriptions.position(partition);
}
return offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
continue;
}
Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && entry.getValue().equals(consumed))
Long position = subscriptions.position(entry.getKey());
// ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
if (position != null && entry.getValue().equals(position))
currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
}
this.offsetOutOfRangePartitions.clear();
Expand Down Expand Up @@ -401,18 +401,15 @@ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {

// note that the consumed position should always be available
// as long as the partition is still assigned
long consumed = subscriptions.consumed(part.partition);
long position = subscriptions.position(part.partition);
if (!subscriptions.isFetchable(part.partition)) {
// this can happen when a partition consumption paused before fetched records are returned to the consumer's poll call
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);

// we also need to reset the fetch positions to pretend we did not fetch
// this partition in the previous request at all
subscriptions.fetched(part.partition, consumed);
} else if (part.fetchOffset == consumed) {
} else if (part.fetchOffset == position) {
long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;

log.trace("Returning fetched records for assigned partition {} and update consumed position to {}", part.partition, nextOffset);
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, part.partition, nextOffset);

List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
Expand All @@ -421,11 +418,13 @@ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
} else {
records.addAll(part.records);
}
subscriptions.consumed(part.partition, nextOffset);

subscriptions.position(part.partition, nextOffset);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
part.partition, part.fetchOffset, position);
}
}
this.records.clear();
Expand Down Expand Up @@ -513,11 +512,9 @@ private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
fetchable.put(node, fetch);
}

long fetched = this.subscriptions.fetched(partition);
long consumed = this.subscriptions.consumed(partition);
// Only fetch data for partitions whose previously fetched data has been consumed
if (consumed == fetched)
fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
log.trace("Added fetch request for partition {} at offset {}", partition, position);
}
}

Expand Down Expand Up @@ -550,29 +547,25 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {

// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long consumed = subscriptions.consumed(tp);
if (consumed == null) {
continue;
} else if (consumed != fetchOffset) {
// the fetched position has gotten out of sync with the consumed position
// (which might happen when a rebalance occurs with a fetch in-flight),
// so we need to reset the fetch position so the next fetch is right
subscriptions.fetched(tp, consumed);
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
continue;
}

int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
for (LogEntry logEntry : records) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}

if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (buffer.limit() > 0) {
Expand All @@ -594,7 +587,7 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
subscriptions.needOffsetReset(tp);
else
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
log.warn("Not authorized to read from topic {}.", tp.topic());
unauthorizedTopics.add(tp.topic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,6 @@ public Set<String> groupSubscription() {
return this.groupSubscription;
}

public Long fetched(TopicPartition tp) {
return assignedState(tp).fetched;
}

public void fetched(TopicPartition tp, long offset) {
assignedState(tp).fetched(offset);
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.get(tp);
if (state == null)
Expand Down Expand Up @@ -270,20 +262,20 @@ public boolean partitionsAutoAssigned() {
return !this.subscription.isEmpty();
}

public void consumed(TopicPartition tp, long offset) {
assignedState(tp).consumed(offset);
public void position(TopicPartition tp, long offset) {
assignedState(tp).position(offset);
}

public Long consumed(TopicPartition tp) {
return assignedState(tp).consumed;
public Long position(TopicPartition tp) {
return assignedState(tp).position;
}

public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet()) {
TopicPartitionState state = entry.getValue();
if (state.hasValidPosition)
allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.consumed));
allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));
}
return allConsumed;
}
Expand Down Expand Up @@ -356,8 +348,7 @@ public ConsumerRebalanceListener listener() {
}

private static class TopicPartitionState {
private Long consumed; // offset exposed to the user
private Long fetched; // current fetch position
private Long position;
private OffsetAndMetadata committed; // last committed position

private boolean hasValidPosition; // whether we have valid consumed and fetched positions
Expand All @@ -367,8 +358,7 @@ private static class TopicPartitionState {

public TopicPartitionState() {
this.paused = false;
this.consumed = null;
this.fetched = null;
this.position = null;
this.committed = null;
this.awaitingReset = false;
this.hasValidPosition = false;
Expand All @@ -378,29 +368,21 @@ public TopicPartitionState() {
private void awaitReset(OffsetResetStrategy strategy) {
this.awaitingReset = true;
this.resetStrategy = strategy;
this.consumed = null;
this.fetched = null;
this.position = null;
this.hasValidPosition = false;
}

private void seek(long offset) {
this.consumed = offset;
this.fetched = offset;
this.position = offset;
this.awaitingReset = false;
this.resetStrategy = null;
this.hasValidPosition = true;
}

private void fetched(long offset) {
private void position(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
this.fetched = offset;
}

private void consumed(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions");
this.consumed = offset;
this.position = offset;
}

private void committed(OffsetAndMetadata offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public void testFetchNormal() {
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
assertEquals(3, records.size());
assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
assertEquals(4L, (long) subscriptions.consumed(tp));
assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position
long offset = 1;
for (ConsumerRecord<byte[], byte[]> record : records) {
assertEquals(offset, record.offset());
Expand Down Expand Up @@ -149,8 +148,7 @@ public void testFetchNonContinuousRecords() {
consumerClient.poll(0);
consumerRecords = fetcher.fetchedRecords().get(tp);
assertEquals(3, consumerRecords.size());
assertEquals(31L, (long) subscriptions.fetched(tp)); // this is the next fetching position
assertEquals(31L, (long) subscriptions.consumed(tp));
assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position

assertEquals(15L, consumerRecords.get(0).offset());
assertEquals(20L, consumerRecords.get(1).offset());
Expand Down Expand Up @@ -267,8 +265,7 @@ public void testFetchOffsetOutOfRange() {
consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(null, subscriptions.fetched(tp));
assertEquals(null, subscriptions.consumed(tp));
assertEquals(null, subscriptions.position(tp));
}

@Test
Expand Down Expand Up @@ -316,8 +313,7 @@ public void testFetchDisconnected() {
// disconnects should have no affect on subscription state
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(0, (long) subscriptions.fetched(tp));
assertEquals(0, (long) subscriptions.consumed(tp));
assertEquals(0, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -329,8 +325,7 @@ public void testUpdateFetchPositionToCommitted() {

fetcher.updateFetchPositions(Collections.singleton(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -343,8 +338,7 @@ public void testUpdateFetchPositionResetToDefaultOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -357,8 +351,7 @@ public void testUpdateFetchPositionResetToLatestOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -371,8 +364,7 @@ public void testUpdateFetchPositionResetToEarliestOffset() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand All @@ -390,8 +382,7 @@ public void testUpdateFetchPositionDisconnect() {
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
assertEquals(5, (long) subscriptions.position(tp));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,11 @@ public void partitionAssignment() {
public void partitionReset() {
state.assignFromUser(Arrays.asList(tp0));
state.seek(tp0, 5);
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
assertEquals(5L, (long) state.position(tp0));
state.needOffsetReset(tp0);
assertFalse(state.isFetchable(tp0));
assertTrue(state.isOffsetResetNeeded(tp0));
assertEquals(null, state.fetched(tp0));
assertEquals(null, state.consumed(tp0));
assertEquals(null, state.position(tp0));

// seek should clear the reset and make the partition fetchable
state.seek(tp0, 0);
Expand Down Expand Up @@ -114,33 +112,20 @@ public void commitOffsetMetadata() {
}

@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
public void invalidPositionUpdate() {
state.subscribe(Arrays.asList(topic), rebalanceListener);
state.assignFromSubscribed(asList(tp0));
state.consumed(tp0, 0);
state.position(tp0, 0);
}

@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
state.subscribe(Arrays.asList(topic), rebalanceListener);
state.assignFromSubscribed(asList(tp0));
state.fetched(tp0, 0);
public void cantChangePositionForNonAssignedPartition() {
state.position(tp0, 1);
}

@Test(expected = IllegalStateException.class)
public void cantChangeFetchPositionForNonAssignedPartition() {
state.fetched(tp0, 1);
}

@Test(expected = IllegalStateException.class)
public void cantChangeConsumedPositionForNonAssignedPartition() {
state.consumed(tp0, 1);
}

public void assertAllPositions(TopicPartition tp, Long offset) {
assertEquals(offset.longValue(), state.committed(tp).offset());
assertEquals(offset, state.fetched(tp));
assertEquals(offset, state.consumed(tp));
assertEquals(offset, state.position(tp));
}

@Test(expected = IllegalStateException.class)
Expand Down

0 comments on commit e08b922

Please sign in to comment.