diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index f949b75a81c..0fbe8010db6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -131,10 +131,20 @@ public long readOffset(final MessageQueue mq, final ReadOffsetType type) { @Override public void persistAll(Set mqs) { - if (null == mqs || mqs.isEmpty()) + if (null == mqs || mqs.isEmpty()) { return; + } + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = readLocalOffset(); + } catch (MQClientException e) { + log.error("readLocalOffset exception", e); + return; + } - OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); + if (offsetSerializeWrapper == null) { + offsetSerializeWrapper = new OffsetSerializeWrapper(); + } for (Map.Entry entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { AtomicLong offset = entry.getValue(); @@ -154,11 +164,40 @@ public void persistAll(Set mqs) { @Override public void persist(MessageQueue mq) { + if (mq == null) { + return; + } + AtomicLong offset = this.offsetTable.get(mq); + if (offset != null) { + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = readLocalOffset(); + } catch (MQClientException e) { + log.error("readLocalOffset exception", e); + return; + } + if (offsetSerializeWrapper == null) { + offsetSerializeWrapper = new OffsetSerializeWrapper(); + } + offsetSerializeWrapper.getOffsetTable().put(mq, offset); + String jsonString = offsetSerializeWrapper.toJson(true); + if (jsonString != null) { + try { + MixAll.string2File(jsonString, this.storePath); + } catch (IOException e) { + log.error("persist consumer offset exception, " + this.storePath, e); + } + } + } } @Override public void removeOffset(MessageQueue mq) { - + if (mq != null) { + this.offsetTable.remove(mq); + log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq, + offsetTable.size()); + } } @Override diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java index a705b30fc3b..e3f2cc070da 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer.store; import java.io.File; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -85,4 +86,48 @@ public void testCloneOffset() throws Exception { assertThat(cloneOffsetTable.size()).isEqualTo(1); assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024); } + + @Test + public void testPersist() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + + MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue0, 1024, false); + offsetStore.persist(messageQueue0); + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + + MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1); + assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1); + } + + @Test + public void testPersistAll() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + + MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue0, 1024, false); + offsetStore.persistAll(new HashSet(Collections.singletonList(messageQueue0))); + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + + MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1); + MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2); + offsetStore.updateOffset(messageQueue1, 1025, false); + offsetStore.updateOffset(messageQueue2, 1026, false); + offsetStore.persistAll(new HashSet(Arrays.asList(messageQueue1, messageQueue2))); + + assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024); + assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025); + assertThat(offsetStore.readOffset(messageQueue2, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026); + } + + @Test + public void testRemoveOffset() throws Exception { + OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0); + offsetStore.updateOffset(messageQueue, 1024, false); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024); + + offsetStore.removeOffset(messageQueue); + assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1); + } } \ No newline at end of file