Skip to content

Commit

Permalink
apache#135 fix IllegalArgumentException for KafkaConsumer#offsetsForT…
Browse files Browse the repository at this point in the history
…imes (apache#137)

 Fix IllegalArgumentException for  KafkaConsumer#offsetsForTimes under kafka versions of 2.1.0 , 2.2.0 and 2.3.0
  • Loading branch information
aloyszhang committed Apr 9, 2020
1 parent 9d79d6a commit d70dde9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
@Slf4j
@Getter
public class KafkaRequestHandler extends KafkaCommandDecoder {
public static final long DEFAULT_TIMESTAMP = 0L;

private final PulsarService pulsarService;
private final KafkaServiceConfiguration kafkaConfig;
Expand Down Expand Up @@ -694,7 +695,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
RecordBatch.NO_TIMESTAMP,
DEFAULT_TIMESTAMP,
MessageIdUtils
.getOffset(position.getLedgerId(), entryId == -1 ? 0 : entryId)));
}
Expand All @@ -715,7 +716,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
RecordBatch.NO_TIMESTAMP,
DEFAULT_TIMESTAMP,
MessageIdUtils.getOffset(position.getLedgerId(), position.getEntryId())));
}

Expand Down Expand Up @@ -764,7 +765,7 @@ public void findEntryComplete(Position position, Object ctx) {
} else {
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
RecordBatch.NO_TIMESTAMP,
DEFAULT_TIMESTAMP,
MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -283,7 +282,7 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E
ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE);
assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset));
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(NO_TIMESTAMP));
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(0));
}

// these 2 test cases test Read Commit / UnCommit.
Expand Down Expand Up @@ -352,7 +351,7 @@ public void testConsumerListOffsetLatest() throws Exception {
ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE);
assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset));
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(NO_TIMESTAMP));
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(0));
}

/// Add test for FetchRequest
Expand Down

0 comments on commit d70dde9

Please sign in to comment.