Skip to content

Commit

Permalink
KAFKA-3960 - Committed offset not set after first assign
Browse files Browse the repository at this point in the history
  • Loading branch information
13h3r committed Jul 22, 2016
1 parent 7a70c1a commit 9b20085
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
Expand Up @@ -170,6 +170,7 @@ public void assignFromUser(Collection<TopicPartition> partitions) {
this.assignment.keySet().retainAll(this.userAssignment);

this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true;
}

/**
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -453,6 +454,55 @@ public boolean matches(ClientRequest request) {
assertTrue(heartbeatReceived.get());
}

@Test
public void testCommitsFetchedDuringAssign() {
String topic = "topic";
final TopicPartition partition1 = new TopicPartition(topic, 0);
final TopicPartition partition2 = new TopicPartition(topic, 1);

long offset1 = 10000;
long offset2 = 20000;

int sessionTimeoutMs = 3000;
int heartbeatIntervalMs = 2000;
int autoCommitIntervalMs = 1000;

Time time = new MockTime();
MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
client.setNode(node);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
PartitionAssignor assignor = new RoundRobinAssignor();

final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
consumer.assign(Arrays.asList(partition1));

// lookup coordinator
client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

// fetch offset for one topic
client.prepareResponseFrom(
offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()),
coordinator);

assertEquals(offset1, consumer.committed(partition1).offset());

consumer.assign(Arrays.asList(partition1, partition2));

// fetch offset for two topics
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(partition1, offset1);
offsets.put(partition2, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator);

assertEquals(offset1, consumer.committed(partition1).offset());
assertEquals(offset2, consumer.committed(partition2).offset());
}

@Test
public void testAutoCommitSentBeforePositionUpdate() {
String topic = "topic";
Expand Down Expand Up @@ -611,6 +661,14 @@ private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
return new SyncGroupResponse(error, buf).toStruct();
}

private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error));
}
return new OffsetFetchResponse(partitionData).toStruct();
}

private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
for (int i = 0; i < count; i++)
Expand Down
Expand Up @@ -46,6 +46,8 @@ public void partitionAssignment() {
state.assignFromUser(Arrays.asList(tp0));
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
assertFalse(state.partitionAssignmentNeeded());
assertFalse(state.hasAllFetchPositions());
assertTrue(state.refreshCommitsNeeded());
state.committed(tp0, new OffsetAndMetadata(1));
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
Expand Down

0 comments on commit 9b20085

Please sign in to comment.