Skip to content

Commit

Permalink
[ISSUE #7740] Optimize LocalFileOffsetStore (#7745)
Browse files Browse the repository at this point in the history
* Fix LocalFileOffsetStore persistAll and persist

* Fix LocalFileOffsetStore removeOffset

* Add test case
  • Loading branch information
redlsz committed Jan 29, 2024
1 parent f9f5465 commit 99bb415
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,20 @@ public long readOffset(final MessageQueue mq, final ReadOffsetType type) {

@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
if (null == mqs || mqs.isEmpty()) {
return;
}
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper = readLocalOffset();
} catch (MQClientException e) {
log.error("readLocalOffset exception", e);
return;
}

OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
if (offsetSerializeWrapper == null) {
offsetSerializeWrapper = new OffsetSerializeWrapper();
}
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
Expand All @@ -154,11 +164,40 @@ public void persistAll(Set<MessageQueue> mqs) {

@Override
public void persist(MessageQueue mq) {
if (mq == null) {
return;
}
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper = readLocalOffset();
} catch (MQClientException e) {
log.error("readLocalOffset exception", e);
return;
}
if (offsetSerializeWrapper == null) {
offsetSerializeWrapper = new OffsetSerializeWrapper();
}
offsetSerializeWrapper.getOffsetTable().put(mq, offset);
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persist consumer offset exception, " + this.storePath, e);
}
}
}
}

@Override
public void removeOffset(MessageQueue mq) {

if (mq != null) {
this.offsetTable.remove(mq);
log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
offsetTable.size());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer.store;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -85,4 +86,48 @@ public void testCloneOffset() throws Exception {
assertThat(cloneOffsetTable.size()).isEqualTo(1);
assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
}

@Test
public void testPersist() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);

MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue0, 1024, false);
offsetStore.persist(messageQueue0);
assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);

MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
}

@Test
public void testPersistAll() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);

MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue0, 1024, false);
offsetStore.persistAll(new HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);

MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
offsetStore.updateOffset(messageQueue1, 1025, false);
offsetStore.updateOffset(messageQueue2, 1026, false);
offsetStore.persistAll(new HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));

assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
assertThat(offsetStore.readOffset(messageQueue2, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
}

@Test
public void testRemoveOffset() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue, 1024, false);
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);

offsetStore.removeOffset(messageQueue);
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
}
}

0 comments on commit 99bb415

Please sign in to comment.