Skip to content

Commit

Permalink
[ROCKETMQ-265] fix consume queue's data maybe repeat bug
Browse files Browse the repository at this point in the history
Author: 傅冲 <yubao.fyb@alibaba-inc.com>
Author: fuyou001 <fuyou001@gmail.com>

Closes #146 from fuyou001/ROCKETMQ-265.
  • Loading branch information
fuyou001 authored and zhouxinyu committed Sep 5, 2017
1 parent 6a97d28 commit 368e7c8
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final

if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}

if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@

package org.apache.rocketmq.store;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Test;

import java.io.File;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;

public class ConsumeQueueTest {

Expand Down Expand Up @@ -131,6 +130,65 @@ protected void putMsg(DefaultMessageStore master) throws Exception {
}
}

protected void deleteDirectory(String rootPath) {
File file = new File(rootPath);
deleteFile(file);
}

protected void deleteFile(File file) {
File[] subFiles = file.listFiles();
if (subFiles != null) {
for (File sub : subFiles) {
deleteFile(sub);
}
}

file.delete();
}

@Test
public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
DefaultMessageStore messageStore = null;
try {

messageStore = gen();

int totalMessages = 10;

for (int i = 0; i < totalMessages; i++) {
putMsg(messageStore);
}
Thread.sleep(5);

ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);

assertThat(method).isNotNull();

method.setAccessible(true);

SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
assertThat(result != null).isTrue();

DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);

assertThat(cq).isNotNull();

Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());

assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();

} finally {
if (messageStore != null) {
messageStore.shutdown();
messageStore.destroy();
}
deleteDirectory(storePath);
}

}

@Test
public void testConsumeQueueWithExtendData() {
DefaultMessageStore master = null;
Expand Down

0 comments on commit 368e7c8

Please sign in to comment.