From c37adb566f7c0ee9920def4e9cfd02579e6ce4cf Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 6 Jul 2017 14:44:22 +0800 Subject: [PATCH 1/9] fix query message by time and fix get queue offset by time --- .../apache/rocketmq/store/ConsumeQueue.java | 22 ++- .../rocketmq/store/DefaultMessageStore.java | 55 ++++---- .../store/DefaultMessageStoreTest.java | 131 ++++++++++++++++++ 3 files changed, 179 insertions(+), 29 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d03ff0f32eb..8d8e5b508f2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -195,8 +195,26 @@ public long getOffsetInQueueByTime(final long timestamp) { } if (targetOffset != -1) { - - offset = targetOffset; + // Fix messages same store time,get best candidate offset + int candidateOffset = targetOffset; + int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; + int fixedOffset = candidateOffset; + while (fixedOffset >= fixedLow) { + byteBuffer.position(fixedOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + if (phyOffset < minPhysicOffset) { + break; + } + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime < timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; + } + } + offset = candidateOffset; } else { if (leftIndexValue == -1) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 931edc76567..def0c77c372 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -806,10 +806,11 @@ public void executeDeleteFilesManually() { public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult(); - long lastQueryMsgTime = end; + long fixedBeginTime = begin - 1001; + long lastQueryMsgTime = end + 1; for (int i = 0; i < 3; i++) { - QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); + QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, fixedBeginTime, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; } @@ -824,33 +825,28 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon try { - boolean match = true; - MessageExt msg = this.lookMessageByOffset(offset); - if (0 == m) { - lastQueryMsgTime = msg.getStoreTimestamp(); - } + SelectMappedBufferResult result = this.commitLog.getData(offset, false); + if (result != null) { + int size = result.getByteBuffer().getInt(0); + result.getByteBuffer().limit(size); + result.setSize(size); + long storeTime = getMessageStoreTimestamp(result); + + boolean keyMatch = true; + // check key match,now ignore check key duplicate + if (keyMatch) { + if (storeTime >= begin && storeTime <= end) { + queryMessageResult.addMessage(result); + } + } else { + log.warn("queryMessage hash duplicate, {} {}", topic, key); + } -// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); -// if (topic.equals(msg.getTopic())) { -// for (String k : keyArray) { -// if (k.equals(key)) { -// match = true; -// break; -// } -// } -// } - - if (match) { - SelectMappedBufferResult result = this.commitLog.getData(offset, false); - if (result != null) { - int size = result.getByteBuffer().getInt(0); - result.getByteBuffer().limit(size); - result.setSize(size); - queryMessageResult.addMessage(result); + if (0 == m) { + lastQueryMsgTime = storeTime; } - } else { - log.warn("queryMessage hash duplicate, {} {}", topic, key); } + } catch (Exception e) { log.error("queryMessage exception", e); } @@ -860,7 +856,7 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon break; } - if (lastQueryMsgTime < begin) { + if (lastQueryMsgTime < fixedBeginTime) { break; } } @@ -868,6 +864,10 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon return queryMessageResult; } + private long getMessageStoreTimestamp(SelectMappedBufferResult message) { + return message.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + } + @Override public void updateHaMasterAddress(String newAddr) { this.haService.updateMasterAddress(newAddr); @@ -1412,6 +1412,7 @@ class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { + if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 75f1de94128..f4ea7d5d6a8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -20,11 +20,20 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; @@ -129,4 +138,126 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, byte[] filterBitMap, Map properties) { } } + + @Test + public void testQueryByTime() throws Exception { + int totalMsgs = 100; + int randomIndex = new Random().nextInt(10) + 40; + QUEUE_TOTAL = 8; + String topic = "TimeTopic"; + String keys = "testQueryByTime"; + long now = System.currentTimeMillis(); + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + boolean load = master.load(); + assertTrue(load); + + master.start(); + try { + for (int i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); + if (i == randomIndex) { + now = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + } + } + Thread.sleep(2000L); + long end = System.currentTimeMillis(); + QueryMessageResult result = master.queryMessage(topic, keys, totalMsgs, now, end); + for (ByteBuffer byteBuffer : result.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + } + int bufferTotalSize = result.getMessageBufferList().size(); + result.release(); + Assert.assertTrue(totalMsgs - randomIndex - 1 <= bufferTotalSize); + } finally { + master.shutdown(); + master.destroy(); + } + } + + @Test + public void testQueueOffsetByTime() throws Exception { + long totalMsgs = 200; + QUEUE_TOTAL = 1; + String topic = "TimeTopic"; + String keys = "testQueryByTime"; + String consumerGroup = "testQueryByTime"; + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + boolean load = master.load(); + assertTrue(load); + + TreeMap sameTimeCountCache = new TreeMap<>(); + TreeMap sameTimeResultCache = new TreeMap<>(); + long start = 0; + master.start(); + try { + for (long i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); + long storeTimestamp = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + AtomicInteger count = sameTimeCountCache.get(storeTimestamp); + if (count == null) { + count = new AtomicInteger(0); + sameTimeCountCache.put(storeTimestamp, count); + } + count.incrementAndGet(); + } + Thread.sleep(2000L); + + Map.Entry timeCount = sameTimeCountCache.lastEntry(); + start = timeCount.getKey(); + long offsetInQueueByTime = master.getOffsetInQueueByTime(topic, 0, start); + GetMessageResult testQueryByTime = master.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); + + List messageBufferList = testQueryByTime.getMessageBufferList(); + for (ByteBuffer byteBuffer : messageBufferList) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + AtomicInteger cc = sameTimeResultCache.get(messageExt.getStoreTimestamp()); + if (cc == null) { + cc = new AtomicInteger(0); + sameTimeResultCache.put(messageExt.getStoreTimestamp(), cc); + } + cc.incrementAndGet(); + } + testQueryByTime.release(); + } finally { + master.shutdown(); + master.destroy(); + } + + Assert.assertTrue(start > 0); + AtomicInteger cc = sameTimeCountCache.get(start); + AtomicInteger result = sameTimeResultCache.get(start); + Assert.assertEquals(cc.get(), result.get()); + } } From 2c24b6777b2f0d0a85e1e8b0b2f64b1c00d184ef Mon Sep 17 00:00:00 2001 From: lindzh Date: Tue, 11 Jul 2017 11:21:43 +0800 Subject: [PATCH 2/9] fix consumer queue get offset by time high low --- .../org/apache/rocketmq/store/ConsumeQueue.java | 2 +- .../rocketmq/store/DefaultMessageStoreTest.java | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 8d8e5b508f2..5431a0230b5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -225,7 +225,7 @@ public long getOffsetInQueueByTime(final long timestamp) { } else { offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - - rightIndexValue) ? rightOffset : leftOffset; + - rightIndexValue) ? rightOffset - CQ_STORE_UNIT_SIZE : leftOffset + CQ_STORE_UNIT_SIZE; } } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index f4ea7d5d6a8..7fe49f2be9a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -210,10 +210,17 @@ public void testQueueOffsetByTime() throws Exception { TreeMap sameTimeCountCache = new TreeMap<>(); TreeMap sameTimeResultCache = new TreeMap<>(); + long hlTime = System.currentTimeMillis(); + long hlCount = 0; long start = 0; master.start(); try { for (long i = 0; i < totalMsgs; i++) { + if (i == totalMsgs - 20) { + Thread.sleep(1000); + hlTime = System.currentTimeMillis(); + Thread.sleep(500); + } MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); messageExtBrokerInner.setTopic(topic); @@ -250,6 +257,11 @@ public void testQueueOffsetByTime() throws Exception { cc.incrementAndGet(); } testQueryByTime.release(); + + long hlOffset = master.getOffsetInQueueByTime(topic, 0, hlTime); + GetMessageResult hlResult = master.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); + hlCount = hlResult.getMessageCount(); + hlResult.release(); } finally { master.shutdown(); master.destroy(); @@ -259,5 +271,6 @@ public void testQueueOffsetByTime() throws Exception { AtomicInteger cc = sameTimeCountCache.get(start); AtomicInteger result = sameTimeResultCache.get(start); Assert.assertEquals(cc.get(), result.get()); + Assert.assertTrue(19 == hlCount || hlCount == 20); } } From a8835c401df884ec2a4006331770794db0aee8be Mon Sep 17 00:00:00 2001 From: lindzh Date: Wed, 9 Aug 2017 21:08:44 +0800 Subject: [PATCH 3/9] add search offset by time return last position if needed --- .../broker/client/net/Broker2Client.java | 2 +- .../plugin/AbstractPluginMessageStore.java | 4 +- .../processor/AdminBrokerProcessor.java | 4 +- .../header/SearchOffsetRequestHeader.java | 9 +++++ .../apache/rocketmq/store/ConsumeQueue.java | 22 ++++++++--- .../rocketmq/store/DefaultMessageStore.java | 4 +- .../apache/rocketmq/store/MessageStore.java | 2 +- .../store/DefaultMessageStoreTest.java | 38 ++++++++++++++----- 8 files changed, 62 insertions(+), 23 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 863da627e7d..513963a2c40 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -151,7 +151,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { - timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); + timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp, false); } if (timeStampOffset < 0) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 690f70bfc88..44546b30db0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue } @Override - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp); + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp, isGetTimeLast); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f59d2952e1b..226a165e22d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -360,8 +360,10 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, Remot final SearchOffsetRequestHeader requestHeader = (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + boolean isGetTimeLast = requestHeader.getGetTimelast() != null ? requestHeader.getGetTimelast() : false; + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); + requestHeader.getTimestamp(), isGetTimeLast); responseHeader.setOffset(offset); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 113e46f671c..225e4a78e7b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { @CFNotNull private Long timestamp; + private Boolean getTimelast = false; + @Override public void checkFields() throws RemotingCommandException { // TODO Auto-generated method stub @@ -62,4 +64,11 @@ public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } + public Boolean getGetTimelast() { + return getTimelast; + } + + public void setGetTimelast(Boolean getTimelast) { + this.getTimelast = getTimelast; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 5431a0230b5..1385db351a5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -151,7 +151,7 @@ public void recover() { } } - public long getOffsetInQueueByTime(final long timestamp) { + public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) { MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); if (mappedFile != null) { long offset = 0; @@ -206,12 +206,22 @@ public long getOffsetInQueueByTime(final long timestamp) { if (phyOffset < minPhysicOffset) { break; } - long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); - if (storeTime < timestamp) { - break; + if (!isGetTimeLast) { + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime < timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; + } } else { - candidateOffset = fixedOffset; - fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime > timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE; + } } } offset = candidateOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index def0c77c372..b27edd7f3f5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -620,10 +620,10 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue return 0; } - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - return logic.getOffsetInQueueByTime(timestamp); + return logic.getOffsetInQueueByTime(timestamp, isGetTimeLast); } return 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 55572ce10f4..cff7480ab22 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -110,7 +110,7 @@ GetMessageResult getMessage(final String group, final String topic, final int qu * @param timestamp Timestamp to look up. * @return physical offset which matches. */ - long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,boolean isGetTimeLast); /** * Look up the message by given commit log offset. diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 7fe49f2be9a..5d765b81915 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -27,7 +27,6 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.Assert; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -36,9 +35,10 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertTrue; + public class DefaultMessageStoreTest { private final String StoreMessage = "Once, there was a chance for me!"; @@ -68,7 +68,7 @@ public void testWriteAndRead() throws Exception { MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); master.start(); try { @@ -112,7 +112,7 @@ public void testGroupCommit() throws Exception { messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); master.start(); try { @@ -156,7 +156,7 @@ public void testQueryByTime() throws Exception { messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); master.start(); try { @@ -192,7 +192,7 @@ public void testQueryByTime() throws Exception { @Test public void testQueueOffsetByTime() throws Exception { - long totalMsgs = 200; + long totalMsgs = 100; QUEUE_TOTAL = 1; String topic = "TimeTopic"; String keys = "testQueryByTime"; @@ -204,14 +204,17 @@ public void testQueueOffsetByTime() throws Exception { messageStoreConfig.setMaxIndexNum(100 * 10); messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); - MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + DefaultMessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); TreeMap sameTimeCountCache = new TreeMap<>(); TreeMap sameTimeResultCache = new TreeMap<>(); long hlTime = System.currentTimeMillis(); long hlCount = 0; + long endTime = System.currentTimeMillis(); + int endCount1 = 0; + int endCount2 = 0; long start = 0; master.start(); try { @@ -221,6 +224,9 @@ public void testQueueOffsetByTime() throws Exception { hlTime = System.currentTimeMillis(); Thread.sleep(500); } + if (i == totalMsgs - 10) { + endTime = System.currentTimeMillis(); + } MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); messageExtBrokerInner.setTopic(topic); @@ -243,7 +249,7 @@ public void testQueueOffsetByTime() throws Exception { Map.Entry timeCount = sameTimeCountCache.lastEntry(); start = timeCount.getKey(); - long offsetInQueueByTime = master.getOffsetInQueueByTime(topic, 0, start); + long offsetInQueueByTime = master.getOffsetInQueueByTime(topic, 0, start, false); GetMessageResult testQueryByTime = master.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); List messageBufferList = testQueryByTime.getMessageBufferList(); @@ -258,10 +264,21 @@ public void testQueueOffsetByTime() throws Exception { } testQueryByTime.release(); - long hlOffset = master.getOffsetInQueueByTime(topic, 0, hlTime); + long hlOffset = master.getOffsetInQueueByTime(topic, 0, hlTime, false); GetMessageResult hlResult = master.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); hlCount = hlResult.getMessageCount(); hlResult.release(); + + long endOffset1 = master.getOffsetInQueueByTime(topic, 0, endTime, false); + GetMessageResult endResult1 = master.getMessage(consumerGroup, topic, 0, endOffset1, 20, null); + endCount1 = endResult1.getMessageCount(); + endResult1.release(); + + long endOffset2 = master.getOffsetInQueueByTime(topic, 0, endTime, true); + GetMessageResult endResult2 = master.getMessage(consumerGroup, topic, 0, endOffset2, 20, null); + endCount2 = endResult2.getMessageCount(); + endResult2.release(); + } finally { master.shutdown(); master.destroy(); @@ -272,5 +289,6 @@ public void testQueueOffsetByTime() throws Exception { AtomicInteger result = sameTimeResultCache.get(start); Assert.assertEquals(cc.get(), result.get()); Assert.assertTrue(19 == hlCount || hlCount == 20); + Assert.assertTrue(endCount1 >= endCount2); } } From f2f2fba2432534e0fddc316f078e6f3257668c2c Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 10 Aug 2017 10:40:46 +0800 Subject: [PATCH 4/9] add search offset by time last one --- .../rocketmq/client/consumer/DefaultMQPullConsumer.java | 5 +++++ .../rocketmq/client/consumer/DefaultMQPushConsumer.java | 5 +++++ .../org/apache/rocketmq/client/consumer/MQConsumer.java | 2 ++ .../java/org/apache/rocketmq/client/impl/MQAdminImpl.java | 6 +++++- .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 3 ++- .../client/impl/consumer/DefaultMQPullConsumerImpl.java | 6 +++++- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 6 +++++- .../org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java | 5 +++++ .../apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 7 ++++++- .../java/org/apache/rocketmq/tools/admin/MQAdminExt.java | 2 ++ .../tools/command/message/PrintMessageByQueueCommand.java | 2 +- .../tools/command/message/PrintMessageSubCommand.java | 2 +- 12 files changed, 44 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f5016f82f51..aadb7f06f3b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -117,6 +117,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.maxOffset(mq); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 9c9b59ef076..309f1ba6560 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -264,6 +264,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.maxOffset(mq); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index 03ee4d96654..8b3f6fc80f9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -64,4 +64,6 @@ void sendMessageBack(final MessageExt msg, final int delayLevel, final String br * @throws MQClientException */ Set fetchSubscribeMessageQueues(final String topic) throws MQClientException; + + long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 983e5157ea9..7f8459d2af4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -167,6 +167,10 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.doSearchOffset(mq,timestamp,false); + } + + public long doSearchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); @@ -176,7 +180,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti if (brokerAddr != null) { try { return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, - timeoutMillis); + getTimeLast, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4244bddcf2f..6059c26601f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -668,12 +668,13 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) + public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final boolean getTimeLast, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); requestHeader.setTimestamp(timestamp); + requestHeader.setGetTimelast(getTimeLast); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 7d43b372e41..48e1936d679 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -471,8 +471,12 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.searchOffset(mq, timestamp, false); + } + + public long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException { this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 8767964244b..f4325b1d026 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -908,7 +908,11 @@ public void resetOffsetByTimeStamp(long timeStamp) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + return this.searchOffset(mq, timestamp, false); + } + + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 409ea3322f2..2b473fbbd00 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -103,6 +103,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return defaultMQAdminExtImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return defaultMQAdminExtImpl.maxOffset(mq); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 157ae21eba3..f4b9ba2b010 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -490,7 +490,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); } else { resetOffset = - this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, false, timeoutMillis); } @@ -903,6 +903,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 82add92b2d6..71c54e84cb4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -263,4 +263,6 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + + long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index ac51267161c..128a44bd658 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -188,7 +188,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue); + maxOffset = consumer.searchOffset(mq, timeValue, true); } final Map tagCalmap = new HashMap(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index ac485199c04..d2df755eb43 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -132,7 +132,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue); + maxOffset = consumer.searchOffset(mq, timeValue, true); } System.out.printf("minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", " + mq); From d1f2b9d6bcd7324774a0cc75ee577a1726ff9505 Mon Sep 17 00:00:00 2001 From: lindzh Date: Fri, 11 Aug 2017 15:05:01 +0800 Subject: [PATCH 5/9] fix time offset --- .../apache/rocketmq/store/ConsumeQueue.java | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 1385db351a5..b849ba053ce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -195,36 +195,7 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) } if (targetOffset != -1) { - // Fix messages same store time,get best candidate offset - int candidateOffset = targetOffset; - int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; - int fixedOffset = candidateOffset; - while (fixedOffset >= fixedLow) { - byteBuffer.position(fixedOffset); - long phyOffset = byteBuffer.getLong(); - int size = byteBuffer.getInt(); - if (phyOffset < minPhysicOffset) { - break; - } - if (!isGetTimeLast) { - long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); - if (storeTime < timestamp) { - break; - } else { - candidateOffset = fixedOffset; - fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; - } - } else { - long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); - if (storeTime > timestamp) { - break; - } else { - candidateOffset = fixedOffset; - fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE; - } - } - } - offset = candidateOffset; + offset = fixCandidateTime(mappedFile,byteBuffer,targetOffset,isGetTimeLast,minPhysicOffset,timestamp); } else { if (leftIndexValue == -1) { @@ -233,9 +204,13 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) offset = leftOffset; } else { - offset = - Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - - rightIndexValue) ? rightOffset - CQ_STORE_UNIT_SIZE : leftOffset + CQ_STORE_UNIT_SIZE; + if (isGetTimeLast) { + long fixedTimestamp = getStoreTimeStamp(byteBuffer, leftOffset); + offset = fixCandidateTime(mappedFile, byteBuffer, leftOffset, true, minPhysicOffset, fixedTimestamp); + } else { + long fixedTimestamp = getStoreTimeStamp(byteBuffer, rightOffset); + offset = fixCandidateTime(mappedFile, byteBuffer, rightOffset, false, minPhysicOffset, fixedTimestamp); + } } } @@ -248,6 +223,45 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) return 0; } + private long getStoreTimeStamp(ByteBuffer byteBuffer, int targetOffset) { + byteBuffer.position(targetOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + return this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + } + + private int fixCandidateTime(MappedFile mappedFile, ByteBuffer byteBuffer, int targetOffset, boolean isGetTimeLast, long minPhysicOffset, long timestamp) { + int candidateOffset = targetOffset; + int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; + int fixedOffset = candidateOffset; + while (fixedOffset >= fixedLow) { + byteBuffer.position(fixedOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + if (phyOffset < minPhysicOffset) { + break; + } + if (!isGetTimeLast) { + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime < timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; + } + } else { + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime > timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE; + } + } + } + return candidateOffset; + } + public void truncateDirtyLogicFiles(long phyOffet) { int logicFileSize = this.mappedFileSize; From 9ebcd6b11f4b3b41b2b0808d8dc74dacf7f56c86 Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 12 Oct 2017 20:27:01 +0800 Subject: [PATCH 6/9] fix query offset by time and add comment for isGetTimeLast --- .../apache/rocketmq/store/ConsumeQueue.java | 6 +- .../rocketmq/store/DefaultMessageStore.java | 4 + .../store/DefaultMessageStoreTest.java | 192 ++++++++---------- 3 files changed, 86 insertions(+), 116 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 54fa1a33621..1e3fbd21241 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -205,11 +205,9 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) offset = leftOffset; } else { if (isGetTimeLast) { - long fixedTimestamp = getStoreTimeStamp(byteBuffer, leftOffset); - offset = fixCandidateTime(mappedFile, byteBuffer, leftOffset, true, minPhysicOffset, fixedTimestamp); + offset = leftOffset; } else { - long fixedTimestamp = getStoreTimeStamp(byteBuffer, rightOffset); - offset = fixCandidateTime(mappedFile, byteBuffer, rightOffset, false, minPhysicOffset, fixedTimestamp); + offset = rightOffset; } } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 92548c6d654..35cc2da4558 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -637,6 +637,10 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue return 0; } + /** + * As there is many msgs in one timestamp,so add isGetTimeLast parameter to specify. + * if isGetTimeLast is true,return the last offset of this timestamp,else return the first offset. + */ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index d7999682b5c..d0dcdd03ecd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -212,46 +212,30 @@ public void testQueryByTime() throws Exception { String keys = "testQueryByTime"; long now = System.currentTimeMillis(); MessageBody = StoreMessage.getBytes(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); - messageStoreConfig.setMaxHashSlotNum(100); - messageStoreConfig.setMaxIndexNum(100 * 10); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); - messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); - MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); - boolean load = master.load(); - Assert.assertTrue(load); - - master.start(); - try { - for (int i = 0; i < totalMsgs; i++) { - MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); - messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); - messageExtBrokerInner.setTopic(topic); - messageExtBrokerInner.setKeys(keys); - messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); - messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); - messageExtBrokerInner.setStoreHost(StoreHost); - messageExtBrokerInner.setBornHost(BornHost); - messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); - PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); - if (i == randomIndex) { - now = putMessageResult.getAppendMessageResult().getStoreTimestamp(); - } - } - Thread.sleep(2000L); - long end = System.currentTimeMillis(); - QueryMessageResult result = master.queryMessage(topic, keys, totalMsgs, now, end); - for (ByteBuffer byteBuffer : result.getMessageBufferList()) { - MessageExt messageExt = MessageDecoder.decode(byteBuffer); + for (int i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner); + if (i == randomIndex) { + now = putMessageResult.getAppendMessageResult().getStoreTimestamp(); } - int bufferTotalSize = result.getMessageBufferList().size(); - result.release(); - Assert.assertTrue(totalMsgs - randomIndex - 1 <= bufferTotalSize); - } finally { - master.shutdown(); - master.destroy(); } + Thread.sleep(2000L); + long end = System.currentTimeMillis(); + QueryMessageResult result = messageStore.queryMessage(topic, keys, totalMsgs, now, end); + for (ByteBuffer byteBuffer : result.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + } + int bufferTotalSize = result.getMessageBufferList().size(); + result.release(); + Assert.assertTrue(totalMsgs - randomIndex - 1 <= bufferTotalSize); } @Test @@ -262,15 +246,6 @@ public void testQueueOffsetByTime() throws Exception { String keys = "testQueryByTime"; String consumerGroup = "testQueryByTime"; MessageBody = StoreMessage.getBytes(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); - messageStoreConfig.setMaxHashSlotNum(100); - messageStoreConfig.setMaxIndexNum(100 * 10); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); - messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); - DefaultMessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); - boolean load = master.load(); - Assert.assertTrue(load); TreeMap sameTimeCountCache = new TreeMap<>(); TreeMap sameTimeResultCache = new TreeMap<>(); @@ -280,74 +255,67 @@ public void testQueueOffsetByTime() throws Exception { int endCount1 = 0; int endCount2 = 0; long start = 0; - master.start(); - try { - for (long i = 0; i < totalMsgs; i++) { - if (i == totalMsgs - 20) { - Thread.sleep(1000); - hlTime = System.currentTimeMillis(); - Thread.sleep(500); - } - if (i == totalMsgs - 10) { - endTime = System.currentTimeMillis(); - } - MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); - messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); - messageExtBrokerInner.setTopic(topic); - messageExtBrokerInner.setKeys(keys); - messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); - messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); - messageExtBrokerInner.setStoreHost(StoreHost); - messageExtBrokerInner.setBornHost(BornHost); - messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); - PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); - long storeTimestamp = putMessageResult.getAppendMessageResult().getStoreTimestamp(); - AtomicInteger count = sameTimeCountCache.get(storeTimestamp); - if (count == null) { - count = new AtomicInteger(0); - sameTimeCountCache.put(storeTimestamp, count); - } - count.incrementAndGet(); + + for (long i = 0; i < totalMsgs; i++) { + if (i == totalMsgs - 20) { + Thread.sleep(1000); + hlTime = System.currentTimeMillis(); + Thread.sleep(500); } - Thread.sleep(2000L); - - Map.Entry timeCount = sameTimeCountCache.lastEntry(); - start = timeCount.getKey(); - long offsetInQueueByTime = master.getOffsetInQueueByTime(topic, 0, start, false); - GetMessageResult testQueryByTime = master.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); - - List messageBufferList = testQueryByTime.getMessageBufferList(); - for (ByteBuffer byteBuffer : messageBufferList) { - MessageExt messageExt = MessageDecoder.decode(byteBuffer); - AtomicInteger cc = sameTimeResultCache.get(messageExt.getStoreTimestamp()); - if (cc == null) { - cc = new AtomicInteger(0); - sameTimeResultCache.put(messageExt.getStoreTimestamp(), cc); - } - cc.incrementAndGet(); + if (i == totalMsgs - 10) { + endTime = System.currentTimeMillis(); } - testQueryByTime.release(); - - long hlOffset = master.getOffsetInQueueByTime(topic, 0, hlTime, false); - GetMessageResult hlResult = master.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); - hlCount = hlResult.getMessageCount(); - hlResult.release(); - - long endOffset1 = master.getOffsetInQueueByTime(topic, 0, endTime, false); - GetMessageResult endResult1 = master.getMessage(consumerGroup, topic, 0, endOffset1, 20, null); - endCount1 = endResult1.getMessageCount(); - endResult1.release(); - - long endOffset2 = master.getOffsetInQueueByTime(topic, 0, endTime, true); - GetMessageResult endResult2 = master.getMessage(consumerGroup, topic, 0, endOffset2, 20, null); - endCount2 = endResult2.getMessageCount(); - endResult2.release(); - - } finally { - master.shutdown(); - master.destroy(); + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner); + long storeTimestamp = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + AtomicInteger count = sameTimeCountCache.get(storeTimestamp); + if (count == null) { + count = new AtomicInteger(0); + sameTimeCountCache.put(storeTimestamp, count); + } + count.incrementAndGet(); } - + Thread.sleep(2000L); + + Map.Entry timeCount = sameTimeCountCache.lastEntry(); + start = timeCount.getKey(); + long offsetInQueueByTime = messageStore.getOffsetInQueueByTime(topic, 0, start, false); + GetMessageResult testQueryByTime = messageStore.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); + + List messageBufferList = testQueryByTime.getMessageBufferList(); + for (ByteBuffer byteBuffer : messageBufferList) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + AtomicInteger cc = sameTimeResultCache.get(messageExt.getStoreTimestamp()); + if (cc == null) { + cc = new AtomicInteger(0); + sameTimeResultCache.put(messageExt.getStoreTimestamp(), cc); + } + cc.incrementAndGet(); + } + testQueryByTime.release(); + + long hlOffset = messageStore.getOffsetInQueueByTime(topic, 0, hlTime, false); + GetMessageResult hlResult = messageStore.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); + hlCount = hlResult.getMessageCount(); + hlResult.release(); + + long endOffset1 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, false); + GetMessageResult endResult1 = messageStore.getMessage(consumerGroup, topic, 0, endOffset1, 20, null); + endCount1 = endResult1.getMessageCount(); + endResult1.release(); + + long endOffset2 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, true); + GetMessageResult endResult2 = messageStore.getMessage(consumerGroup, topic, 0, endOffset2, 20, null); + endCount2 = endResult2.getMessageCount(); + endResult2.release(); Assert.assertTrue(start > 0); AtomicInteger cc = sameTimeCountCache.get(start); AtomicInteger result = sameTimeResultCache.get(start); From fb510891527c61f7fe25655c13e47ef754668174 Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 14 Dec 2017 19:59:08 +0800 Subject: [PATCH 7/9] add isGetlast JavaDoc and fix queryConsumeQueueOffsetByTime --- .../apache/rocketmq/store/ConsumeQueue.java | 44 +++++++++++++++++-- .../apache/rocketmq/store/MessageStore.java | 1 + 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 1e3fbd21241..98a33a2855f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -151,8 +151,47 @@ public void recover() { } } + private MappedFile getConsumeQueueMappedFileByStoreTime(long messageStoreTime) { + long commitLogMinOffset = this.defaultMessageStore.getMinPhyOffset(); + List mappedFiles = this.mappedFileQueue.getMappedFiles(); + for (MappedFile mappedFile : mappedFiles) { + SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); + if (sbr != null) { + try { + ByteBuffer byteBuffer = sbr.getByteBuffer(); + int max = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; + int min = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; + byteBuffer.position(min); + long minPhyOffset = byteBuffer.getLong(); + int minSize = byteBuffer.getInt(); + + byteBuffer.position(max); + long maxPhyOffset = byteBuffer.getLong(); + if (maxPhyOffset < commitLogMinOffset) { + continue; + } + int maxSize = byteBuffer.getInt(); + + long minStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(minPhyOffset, minSize); + if (messageStoreTime < minStoreTime) { + return null; + } + + long maxStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(maxPhyOffset, maxSize); + if (maxStoreTime < messageStoreTime) { + continue; + } + return mappedFile; + } finally { + sbr.release(); + } + } + } + return null; + } + public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) { - MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); + MappedFile mappedFile = getConsumeQueueMappedFileByStoreTime(timestamp); if (mappedFile != null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; @@ -176,8 +215,7 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) continue; } - long storeTime = - this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index d65f202616b..9f65de07578 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -116,6 +116,7 @@ GetMessageResult getMessage(final String group, final String topic, final int qu * @param topic Topic of the message. * @param queueId Queue ID. * @param timestamp Timestamp to look up. + * @param isGetTimeLast If there is many messages at this given timestamp , return the first offset if isGetTimeLast=false,else return last offset * @return physical offset which matches. */ long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,boolean isGetTimeLast); From 03911db8d819957548d85110a29ee01c0fbbf8a9 Mon Sep 17 00:00:00 2001 From: lindzh Date: Tue, 27 Feb 2018 11:46:24 +0800 Subject: [PATCH 8/9] fix last get --- .../apache/rocketmq/store/ConsumeQueue.java | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 98a33a2855f..faf5d362607 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -190,13 +190,60 @@ private MappedFile getConsumeQueueMappedFileByStoreTime(long messageStoreTime) { return null; } + + private MappedFile getLastConsumeQueueMappedFileByStoreTime(long messageStoreTime) { + long commitLogMinOffset = this.defaultMessageStore.getMinPhyOffset(); + List mappedFiles = this.mappedFileQueue.getMappedFiles(); + MappedFile lastFile = null; + for (MappedFile mappedFile : mappedFiles) { + SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); + if (sbr != null) { + try { + ByteBuffer byteBuffer = sbr.getByteBuffer(); + int max = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; + int min = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; + byteBuffer.position(min); + long minPhyOffset = byteBuffer.getLong(); + int minSize = byteBuffer.getInt(); + + byteBuffer.position(max); + long maxPhyOffset = byteBuffer.getLong(); + if (maxPhyOffset < commitLogMinOffset) { + continue; + } + int maxSize = byteBuffer.getInt(); + + long minStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(minPhyOffset, minSize); + if (messageStoreTime < minStoreTime) { + return lastFile; + } + + long maxStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(maxPhyOffset, maxSize); + if (maxStoreTime < messageStoreTime) { + continue; + } + lastFile = mappedFile; + } finally { + sbr.release(); + } + } + } + return lastFile; + } + public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) { - MappedFile mappedFile = getConsumeQueueMappedFileByStoreTime(timestamp); + MappedFile mappedFile = null; + if (isGetTimeLast) { + mappedFile = getLastConsumeQueueMappedFileByStoreTime(timestamp); + } else { + mappedFile = getConsumeQueueMappedFileByStoreTime(timestamp); + } if (mappedFile != null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; + long lastTime = 0; long leftIndexValue = -1L, rightIndexValue = -1L; long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); @@ -216,6 +263,7 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) } long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + lastTime = storeTime; if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { @@ -248,6 +296,7 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) offset = rightOffset; } } + offset = fixCandidateTime(mappedFile,byteBuffer,(int)offset,!isGetTimeLast,minPhysicOffset,lastTime); } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; From d17a0943b4eda1d9b939eec774f1b91902162297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=81=E8=88=AC?= Date: Mon, 26 Mar 2018 17:50:41 +0800 Subject: [PATCH 9/9] fix method and parameter name --- .../broker/client/net/Broker2Client.java | 21 +++--- .../plugin/AbstractPluginMessageStore.java | 4 +- .../processor/AdminBrokerProcessor.java | 8 +- .../consumer/DefaultMQPullConsumer.java | 4 +- .../consumer/DefaultMQPushConsumer.java | 4 +- .../rocketmq/client/consumer/MQConsumer.java | 2 +- .../rocketmq/client/impl/MQAdminImpl.java | 7 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 4 +- .../consumer/DefaultMQPullConsumerImpl.java | 24 +++--- .../consumer/DefaultMQPushConsumerImpl.java | 7 +- .../common/constant/OffsetConstant.java | 26 +++++++ .../header/SearchOffsetRequestHeader.java | 10 +-- .../apache/rocketmq/store/ConsumeQueue.java | 73 +++++-------------- .../rocketmq/store/DefaultMessageStore.java | 8 +- .../apache/rocketmq/store/MessageStore.java | 4 +- .../store/DefaultMessageStoreTest.java | 9 ++- .../tools/admin/DefaultMQAdminExt.java | 4 +- .../tools/admin/DefaultMQAdminExtImpl.java | 9 ++- .../rocketmq/tools/admin/MQAdminExt.java | 2 +- .../message/PrintMessageByQueueCommand.java | 18 +++-- .../message/PrintMessageSubCommand.java | 3 +- 21 files changed, 125 insertions(+), 126 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index ea9d2e03747..1aa2ad0ff76 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -20,12 +20,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.FileRegion; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -34,8 +28,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueueForC; import org.apache.rocketmq.common.protocol.RequestCode; @@ -47,12 +40,21 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.SelectMappedBufferResult; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + public class Broker2Client { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -151,7 +153,8 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { - timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp, false); + timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp, + OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); } if (timeStampOffset < 0) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 4658e6fd772..6cddfc59a22 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue } @Override - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp, isGetTimeLast); + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, int getLastOrFirstOffset) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp, getLastOrFirstOffset); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 98d1b758f3f..e30183f4eb1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageDecoder; @@ -364,10 +365,13 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, final SearchOffsetRequestHeader requestHeader = (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); - boolean isGetTimeLast = requestHeader.getGetTimelast() != null ? requestHeader.getGetTimelast() : false; + int getLastOrFirstOffset = OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET; + if (requestHeader.getGetLastOrFirstOffset() != null) { + getLastOrFirstOffset = requestHeader.getGetLastOrFirstOffset(); + } long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp(), isGetTimeLast); + requestHeader.getTimestamp(), getLastOrFirstOffset); responseHeader.setOffset(offset); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 73233abb73b..0febd53e069 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -118,8 +118,8 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti } @Override - public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { - return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException { + return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getLastOrFirstOffset); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 842d3098a55..8f2345c44aa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -301,8 +301,8 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti } @Override - public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { - return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException { + return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getLastOrFirstOffset); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index 8cbb168d738..ec88e63ad0f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -49,5 +49,5 @@ void sendMessageBack(final MessageExt msg, final int delayLevel, final String br */ Set fetchSubscribeMessageQueues(final String topic) throws MQClientException; - long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; + long searchOffset(final MessageQueue mq, final long timestamp, final int getLastOrFirstOffset) throws MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 4955d824ae1..fc00f1a1ae4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -167,10 +168,10 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.doSearchOffset(mq,timestamp,false); + return this.doSearchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); } - public long doSearchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + public long doSearchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); @@ -180,7 +181,7 @@ public long doSearchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) if (brokerAddr != null) { try { return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, - getTimeLast, timeoutMillis); + getLastOrFirstOffset, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4901d0ea424..580783bcf51 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -669,13 +669,13 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final boolean getTimeLast, final long timeoutMillis) + public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final int getLastOrFirstOffset, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); requestHeader.setTimestamp(timestamp); - requestHeader.setGetTimelast(getTimeLast); + requestHeader.setGetLastOrFirstOffset(getLastOrFirstOffset); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 57456706ce7..b1c3f604074 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -16,13 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -45,10 +38,10 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -59,10 +52,19 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPullConsumer defaultMQPullConsumer; @@ -471,12 +473,12 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.searchOffset(mq, timestamp, false); + return this.searchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); } - public long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException { + public long searchOffset(final MessageQueue mq, final long timestamp, final int getLastOrFirstOffset) throws MQClientException { this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getLastOrFirstOffset); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 998c9193a94..dc2ba66db1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; @@ -956,11 +957,11 @@ public void resetOffsetByTimeStamp(long timeStamp) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.searchOffset(mq, timestamp, false); + return this.searchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); } - public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); + public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getLastOrFirstOffset); } @Override diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java b/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java new file mode 100644 index 00000000000..0b0e1100a70 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.constant; + +public class OffsetConstant { + + public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET = 1; + + public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET = 2; + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index f78adc6f300..21807f1a8d6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -32,7 +32,7 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { @CFNotNull private Long timestamp; - private Boolean getTimelast = false; + private Integer getLastOrFirstOffset; @Override public void checkFields() throws RemotingCommandException { @@ -63,11 +63,11 @@ public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } - public Boolean getGetTimelast() { - return getTimelast; + public Integer getGetLastOrFirstOffset() { + return getLastOrFirstOffset; } - public void setGetTimelast(Boolean getTimelast) { - this.getTimelast = getTimelast; + public void setGetLastOrFirstOffset(Integer getLastOrFirstOffset) { + this.getLastOrFirstOffset = getLastOrFirstOffset; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index a69a333ca95..8a6aa4e3c5c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.StorePathConfigHelper; @@ -151,9 +152,10 @@ public void recover() { } } - private MappedFile getConsumeQueueMappedFileByStoreTime(long messageStoreTime) { + private MappedFile getConsumeQueueMappedFileByStoreTime(long messageStoreTime,int getLastOrFirstOffset) { long commitLogMinOffset = this.defaultMessageStore.getMinPhyOffset(); List mappedFiles = this.mappedFileQueue.getMappedFiles(); + MappedFile resultFile = null; for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); if (sbr != null) { @@ -174,70 +176,29 @@ private MappedFile getConsumeQueueMappedFileByStoreTime(long messageStoreTime) { long minStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(minPhyOffset, minSize); if (messageStoreTime < minStoreTime) { - return null; + return resultFile; } long maxStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(maxPhyOffset, maxSize); if (maxStoreTime < messageStoreTime) { continue; } - return mappedFile; - } finally { - sbr.release(); - } - } - } - return null; - } - - - private MappedFile getLastConsumeQueueMappedFileByStoreTime(long messageStoreTime) { - long commitLogMinOffset = this.defaultMessageStore.getMinPhyOffset(); - List mappedFiles = this.mappedFileQueue.getMappedFiles(); - MappedFile lastFile = null; - for (MappedFile mappedFile : mappedFiles) { - SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); - if (sbr != null) { - try { - ByteBuffer byteBuffer = sbr.getByteBuffer(); - int max = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; - int min = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; - byteBuffer.position(min); - long minPhyOffset = byteBuffer.getLong(); - int minSize = byteBuffer.getInt(); - - byteBuffer.position(max); - long maxPhyOffset = byteBuffer.getLong(); - if (maxPhyOffset < commitLogMinOffset) { - continue; - } - int maxSize = byteBuffer.getInt(); - - long minStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(minPhyOffset, minSize); - if (messageStoreTime < minStoreTime) { - return lastFile; - } - - long maxStoreTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(maxPhyOffset, maxSize); - if (maxStoreTime < messageStoreTime) { - continue; + if (getLastOrFirstOffset == OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET) { + resultFile = mappedFile; + } else { + resultFile = mappedFile; + break; } - lastFile = mappedFile; } finally { sbr.release(); } } } - return lastFile; + return resultFile; } - public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) { - MappedFile mappedFile = null; - if (isGetTimeLast) { - mappedFile = getLastConsumeQueueMappedFileByStoreTime(timestamp); - } else { - mappedFile = getConsumeQueueMappedFileByStoreTime(timestamp); - } + public long getOffsetInQueueByTime(final long timestamp, int getLastOrFirstOffset) { + MappedFile mappedFile = getConsumeQueueMappedFileByStoreTime(timestamp, getLastOrFirstOffset); if (mappedFile != null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; @@ -281,7 +242,7 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) } if (targetOffset != -1) { - offset = fixCandidateTime(mappedFile,byteBuffer,targetOffset,isGetTimeLast,minPhysicOffset,timestamp); + offset = getFixedFirstOrLastOffsetByTime(mappedFile,byteBuffer,targetOffset,getLastOrFirstOffset,minPhysicOffset,timestamp); } else { if (leftIndexValue == -1) { @@ -290,13 +251,13 @@ public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) offset = leftOffset; } else { - if (isGetTimeLast) { + if (getLastOrFirstOffset == OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET) { offset = leftOffset; } else { offset = rightOffset; } } - offset = fixCandidateTime(mappedFile,byteBuffer,(int)offset,!isGetTimeLast,minPhysicOffset,lastTime); + offset = getFixedFirstOrLastOffsetByTime(mappedFile,byteBuffer,(int)offset,getLastOrFirstOffset,minPhysicOffset,lastTime); } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; @@ -315,7 +276,7 @@ private long getStoreTimeStamp(ByteBuffer byteBuffer, int targetOffset) { return this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); } - private int fixCandidateTime(MappedFile mappedFile, ByteBuffer byteBuffer, int targetOffset, boolean isGetTimeLast, long minPhysicOffset, long timestamp) { + private int getFixedFirstOrLastOffsetByTime(MappedFile mappedFile, ByteBuffer byteBuffer, int targetOffset, int getLastOrFirstOffset, long minPhysicOffset, long timestamp) { int candidateOffset = targetOffset; int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; int fixedOffset = candidateOffset; @@ -326,7 +287,7 @@ private int fixCandidateTime(MappedFile mappedFile, ByteBuffer byteBuffer, int t if (phyOffset < minPhysicOffset) { break; } - if (!isGetTimeLast) { + if (getLastOrFirstOffset == OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET) { long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < timestamp) { break; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1c211b58166..43610082035 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -640,14 +640,10 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue return 0; } - /** - * As there is many msgs in one timestamp,so add isGetTimeLast parameter to specify. - * if isGetTimeLast is true,return the last offset of this timestamp,else return the first offset. - */ - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, int getLastOrFirstOffset) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - return logic.getOffsetInQueueByTime(timestamp, isGetTimeLast); + return logic.getOffsetInQueueByTime(timestamp, getLastOrFirstOffset); } return 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 9f65de07578..529c379cbe7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -116,10 +116,10 @@ GetMessageResult getMessage(final String group, final String topic, final int qu * @param topic Topic of the message. * @param queueId Queue ID. * @param timestamp Timestamp to look up. - * @param isGetTimeLast If there is many messages at this given timestamp , return the first offset if isGetTimeLast=false,else return last offset + * @param getLastOrFirstOffset If there is many messages at this given timestamp , return the first offset if getLastOrFirstOffset=1,else return last offset * @return physical offset which matches. */ - long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,boolean isGetTimeLast); + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp, int getLastOrFirstOffset); /** * Look up the message by given commit log offset. diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index d0dcdd03ecd..7ea5f7a546f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; @@ -287,7 +288,7 @@ public void testQueueOffsetByTime() throws Exception { Map.Entry timeCount = sameTimeCountCache.lastEntry(); start = timeCount.getKey(); - long offsetInQueueByTime = messageStore.getOffsetInQueueByTime(topic, 0, start, false); + long offsetInQueueByTime = messageStore.getOffsetInQueueByTime(topic, 0, start, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); GetMessageResult testQueryByTime = messageStore.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); List messageBufferList = testQueryByTime.getMessageBufferList(); @@ -302,17 +303,17 @@ public void testQueueOffsetByTime() throws Exception { } testQueryByTime.release(); - long hlOffset = messageStore.getOffsetInQueueByTime(topic, 0, hlTime, false); + long hlOffset = messageStore.getOffsetInQueueByTime(topic, 0, hlTime, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); GetMessageResult hlResult = messageStore.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); hlCount = hlResult.getMessageCount(); hlResult.release(); - long endOffset1 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, false); + long endOffset1 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); GetMessageResult endResult1 = messageStore.getMessage(consumerGroup, topic, 0, endOffset1, 20, null); endCount1 = endResult1.getMessageCount(); endResult1.release(); - long endOffset2 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, true); + long endOffset2 = messageStore.getOffsetInQueueByTime(topic, 0, endTime, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET); GetMessageResult endResult2 = messageStore.getMessage(consumerGroup, topic, 0, endOffset2, 20, null); endCount2 = endResult2.getMessageCount(); endResult2.release(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index f9a5ace5c0e..127c2108466 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -104,8 +104,8 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti } @Override - public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { - return defaultMQAdminExtImpl.searchOffset(mq, timestamp, getTimeLast); + public long searchOffset(MessageQueue mq, long timestamp,int getLastOrFirstOffset) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp, getLastOrFirstOffset); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9ce8651f768..1dd1890508c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientExt; @@ -508,8 +509,8 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); } else { resetOffset = - this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, false, - timeoutMillis); + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, + OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET, timeoutMillis); } RollbackStats rollbackStats = new RollbackStats(); @@ -938,8 +939,8 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti } @Override - public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { - return this.mqClientInstance.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); + public long searchOffset(MessageQueue mq, long timestamp,int getLastOrFirstOffset) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().doSearchOffset(mq, timestamp, getLastOrFirstOffset); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 2526886b7dd..e817ea16d7d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -260,5 +260,5 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; - long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; + long searchOffset(final MessageQueue mq, final long timestamp, final int getLastOrFirstOffset) throws MQClientException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index b3ac71e3190..3743e9db72d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -17,13 +17,6 @@ package org.apache.rocketmq.tools.command.message; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -32,12 +25,21 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + public class PrintMessageByQueueCommand implements SubCommand { public static long timestampFormat(final String value) { @@ -190,7 +192,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue, true); + maxOffset = consumer.searchOffset(mq, timeValue, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET); } final Map tagCalmap = new HashMap(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index 6ffad472efb..5a169c35949 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.OffsetConstant; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; @@ -131,7 +132,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue, true); + maxOffset = consumer.searchOffset(mq, timeValue, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET); } System.out.printf("minOffset=%s, maxOffset=%s, %s", minOffset, maxOffset, mq);