diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 65b444e6e4ba..f579add4971a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -151,7 +151,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { - timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); + timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp, false); } if (timeStampOffset < 0) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 690f70bfc88d..44546b30db02 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue } @Override - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp); + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp, isGetTimeLast); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 71fdda931ff2..a0645772222c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -360,8 +360,10 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, Remot final SearchOffsetRequestHeader requestHeader = (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class); + boolean isGetTimeLast = requestHeader.getGetTimelast() != null ? requestHeader.getGetTimelast() : false; + long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(), - requestHeader.getTimestamp()); + requestHeader.getTimestamp(), isGetTimeLast); responseHeader.setOffset(offset); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f5016f82f51c..aadb7f06f3b2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -117,6 +117,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPullConsumerImpl.maxOffset(mq); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 9c9b59ef0762..309f1ba6560c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -264,6 +264,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQPushConsumerImpl.maxOffset(mq); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index 03ee4d96654a..8b3f6fc80f9b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -64,4 +64,6 @@ void sendMessageBack(final MessageExt msg, final int delayLevel, final String br * @throws MQClientException */ Set fetchSubscribeMessageQueues(final String topic) throws MQClientException; + + long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 983e5157ea94..7f8459d2af44 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -167,6 +167,10 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.doSearchOffset(mq,timestamp,false); + } + + public long doSearchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); @@ -176,7 +180,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti if (brokerAddr != null) { try { return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, - timeoutMillis); + getTimeLast, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 4244bddcf2f2..6059c26601f9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -668,12 +668,13 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) + public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final boolean getTimeLast, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); requestHeader.setTimestamp(timestamp); + requestHeader.setGetTimelast(getTimeLast); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 35ee16fe3b3a..f211098b06b5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -471,8 +471,12 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.searchOffset(mq, timestamp, false); + } + + public long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException { this.makeSureStateOK(); - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 9bf34be8cdf3..d8ed7637ddb6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -908,7 +908,11 @@ public void resetOffsetByTimeStamp(long timeStamp) } public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + return this.searchOffset(mq, timestamp, false); + } + + public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); } @Override diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 113e46f671c9..225e4a78e7b0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { @CFNotNull private Long timestamp; + private Boolean getTimelast = false; + @Override public void checkFields() throws RemotingCommandException { // TODO Auto-generated method stub @@ -62,4 +64,11 @@ public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } + public Boolean getGetTimelast() { + return getTimelast; + } + + public void setGetTimelast(Boolean getTimelast) { + this.getTimelast = getTimelast; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 275334c05f3e..7748e31b2a9a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -151,7 +151,7 @@ public void recover() { } } - public long getOffsetInQueueByTime(final long timestamp) { + public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) { MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); if (mappedFile != null) { long offset = 0; @@ -195,8 +195,7 @@ public long getOffsetInQueueByTime(final long timestamp) { } if (targetOffset != -1) { - - offset = targetOffset; + offset = fixCandidateTime(mappedFile,byteBuffer,targetOffset,isGetTimeLast,minPhysicOffset,timestamp); } else { if (leftIndexValue == -1) { @@ -205,9 +204,13 @@ public long getOffsetInQueueByTime(final long timestamp) { offset = leftOffset; } else { - offset = - Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - - rightIndexValue) ? rightOffset : leftOffset; + if (isGetTimeLast) { + long fixedTimestamp = getStoreTimeStamp(byteBuffer, leftOffset); + offset = fixCandidateTime(mappedFile, byteBuffer, leftOffset, true, minPhysicOffset, fixedTimestamp); + } else { + long fixedTimestamp = getStoreTimeStamp(byteBuffer, rightOffset); + offset = fixCandidateTime(mappedFile, byteBuffer, rightOffset, false, minPhysicOffset, fixedTimestamp); + } } } @@ -220,6 +223,45 @@ public long getOffsetInQueueByTime(final long timestamp) { return 0; } + private long getStoreTimeStamp(ByteBuffer byteBuffer, int targetOffset) { + byteBuffer.position(targetOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + return this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + } + + private int fixCandidateTime(MappedFile mappedFile, ByteBuffer byteBuffer, int targetOffset, boolean isGetTimeLast, long minPhysicOffset, long timestamp) { + int candidateOffset = targetOffset; + int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; + int fixedOffset = candidateOffset; + while (fixedOffset >= fixedLow) { + byteBuffer.position(fixedOffset); + long phyOffset = byteBuffer.getLong(); + int size = byteBuffer.getInt(); + if (phyOffset < minPhysicOffset) { + break; + } + if (!isGetTimeLast) { + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime < timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE; + } + } else { + long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); + if (storeTime > timestamp) { + break; + } else { + candidateOffset = fixedOffset; + fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE; + } + } + } + return candidateOffset; + } + public void truncateDirtyLogicFiles(long phyOffet) { int logicFileSize = this.mappedFileSize; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b5bac3f721f6..7e7c8c2cd06c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -621,10 +621,10 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue return 0; } - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { - return logic.getOffsetInQueueByTime(timestamp); + return logic.getOffsetInQueueByTime(timestamp, isGetTimeLast); } return 0; @@ -807,10 +807,11 @@ public void executeDeleteFilesManually() { public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult(); - long lastQueryMsgTime = end; + long fixedBeginTime = begin - 1001; + long lastQueryMsgTime = end + 1; for (int i = 0; i < 3; i++) { - QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); + QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, fixedBeginTime, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; } @@ -825,33 +826,28 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon try { - boolean match = true; - MessageExt msg = this.lookMessageByOffset(offset); - if (0 == m) { - lastQueryMsgTime = msg.getStoreTimestamp(); - } + SelectMappedBufferResult result = this.commitLog.getData(offset, false); + if (result != null) { + int size = result.getByteBuffer().getInt(0); + result.getByteBuffer().limit(size); + result.setSize(size); + long storeTime = getMessageStoreTimestamp(result); + + boolean keyMatch = true; + // check key match,now ignore check key duplicate + if (keyMatch) { + if (storeTime >= begin && storeTime <= end) { + queryMessageResult.addMessage(result); + } + } else { + log.warn("queryMessage hash duplicate, {} {}", topic, key); + } -// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); -// if (topic.equals(msg.getTopic())) { -// for (String k : keyArray) { -// if (k.equals(key)) { -// match = true; -// break; -// } -// } -// } - - if (match) { - SelectMappedBufferResult result = this.commitLog.getData(offset, false); - if (result != null) { - int size = result.getByteBuffer().getInt(0); - result.getByteBuffer().limit(size); - result.setSize(size); - queryMessageResult.addMessage(result); + if (0 == m) { + lastQueryMsgTime = storeTime; } - } else { - log.warn("queryMessage hash duplicate, {} {}", topic, key); } + } catch (Exception e) { log.error("queryMessage exception", e); } @@ -861,7 +857,7 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon break; } - if (lastQueryMsgTime < begin) { + if (lastQueryMsgTime < fixedBeginTime) { break; } } @@ -869,6 +865,10 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon return queryMessageResult; } + private long getMessageStoreTimestamp(SelectMappedBufferResult message) { + return message.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); + } + @Override public void updateHaMasterAddress(String newAddr) { this.haService.updateMasterAddress(newAddr); @@ -1413,6 +1413,7 @@ class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { + if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 55572ce10f46..cff7480ab228 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -110,7 +110,7 @@ GetMessageResult getMessage(final String group, final String topic, final int qu * @param timestamp Timestamp to look up. * @return physical offset which matches. */ - long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp); + long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,boolean isGetTimeLast); /** * Look up the message by given commit log offset. diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 75f1de94128c..5d765b81915e 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -20,16 +20,25 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertTrue; + public class DefaultMessageStoreTest { private final String StoreMessage = "Once, there was a chance for me!"; @@ -59,7 +68,7 @@ public void testWriteAndRead() throws Exception { MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); master.start(); try { @@ -103,7 +112,7 @@ public void testGroupCommit() throws Exception { messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); boolean load = master.load(); - assertTrue(load); + Assert.assertTrue(load); master.start(); try { @@ -129,4 +138,157 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, byte[] filterBitMap, Map properties) { } } + + @Test + public void testQueryByTime() throws Exception { + int totalMsgs = 100; + int randomIndex = new Random().nextInt(10) + 40; + QUEUE_TOTAL = 8; + String topic = "TimeTopic"; + String keys = "testQueryByTime"; + long now = System.currentTimeMillis(); + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + MessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + boolean load = master.load(); + Assert.assertTrue(load); + + master.start(); + try { + for (int i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); + if (i == randomIndex) { + now = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + } + } + Thread.sleep(2000L); + long end = System.currentTimeMillis(); + QueryMessageResult result = master.queryMessage(topic, keys, totalMsgs, now, end); + for (ByteBuffer byteBuffer : result.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + } + int bufferTotalSize = result.getMessageBufferList().size(); + result.release(); + Assert.assertTrue(totalMsgs - randomIndex - 1 <= bufferTotalSize); + } finally { + master.shutdown(); + master.destroy(); + } + } + + @Test + public void testQueueOffsetByTime() throws Exception { + long totalMsgs = 100; + QUEUE_TOTAL = 1; + String topic = "TimeTopic"; + String keys = "testQueryByTime"; + String consumerGroup = "testQueryByTime"; + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + DefaultMessageStore master = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); + boolean load = master.load(); + Assert.assertTrue(load); + + TreeMap sameTimeCountCache = new TreeMap<>(); + TreeMap sameTimeResultCache = new TreeMap<>(); + long hlTime = System.currentTimeMillis(); + long hlCount = 0; + long endTime = System.currentTimeMillis(); + int endCount1 = 0; + int endCount2 = 0; + long start = 0; + master.start(); + try { + for (long i = 0; i < totalMsgs; i++) { + if (i == totalMsgs - 20) { + Thread.sleep(1000); + hlTime = System.currentTimeMillis(); + Thread.sleep(500); + } + if (i == totalMsgs - 10) { + endTime = System.currentTimeMillis(); + } + MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner(); + messageExtBrokerInner.setBody(("time:" + System.currentTimeMillis() + " index:" + i).getBytes()); + messageExtBrokerInner.setTopic(topic); + messageExtBrokerInner.setKeys(keys); + messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis()); + messageExtBrokerInner.setStoreHost(StoreHost); + messageExtBrokerInner.setBornHost(BornHost); + messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties())); + PutMessageResult putMessageResult = master.putMessage(messageExtBrokerInner); + long storeTimestamp = putMessageResult.getAppendMessageResult().getStoreTimestamp(); + AtomicInteger count = sameTimeCountCache.get(storeTimestamp); + if (count == null) { + count = new AtomicInteger(0); + sameTimeCountCache.put(storeTimestamp, count); + } + count.incrementAndGet(); + } + Thread.sleep(2000L); + + Map.Entry timeCount = sameTimeCountCache.lastEntry(); + start = timeCount.getKey(); + long offsetInQueueByTime = master.getOffsetInQueueByTime(topic, 0, start, false); + GetMessageResult testQueryByTime = master.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null); + + List messageBufferList = testQueryByTime.getMessageBufferList(); + for (ByteBuffer byteBuffer : messageBufferList) { + MessageExt messageExt = MessageDecoder.decode(byteBuffer); + AtomicInteger cc = sameTimeResultCache.get(messageExt.getStoreTimestamp()); + if (cc == null) { + cc = new AtomicInteger(0); + sameTimeResultCache.put(messageExt.getStoreTimestamp(), cc); + } + cc.incrementAndGet(); + } + testQueryByTime.release(); + + long hlOffset = master.getOffsetInQueueByTime(topic, 0, hlTime, false); + GetMessageResult hlResult = master.getMessage(consumerGroup, topic, 0, hlOffset, 20, null); + hlCount = hlResult.getMessageCount(); + hlResult.release(); + + long endOffset1 = master.getOffsetInQueueByTime(topic, 0, endTime, false); + GetMessageResult endResult1 = master.getMessage(consumerGroup, topic, 0, endOffset1, 20, null); + endCount1 = endResult1.getMessageCount(); + endResult1.release(); + + long endOffset2 = master.getOffsetInQueueByTime(topic, 0, endTime, true); + GetMessageResult endResult2 = master.getMessage(consumerGroup, topic, 0, endOffset2, 20, null); + endCount2 = endResult2.getMessageCount(); + endResult2.release(); + + } finally { + master.shutdown(); + master.destroy(); + } + + Assert.assertTrue(start > 0); + AtomicInteger cc = sameTimeCountCache.get(start); + AtomicInteger result = sameTimeResultCache.get(start); + Assert.assertEquals(cc.get(), result.get()); + Assert.assertTrue(19 == hlCount || hlCount == 20); + Assert.assertTrue(endCount1 >= endCount2); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 409ea3322f21..2b473fbbd005 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -103,6 +103,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return defaultMQAdminExtImpl.searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return defaultMQAdminExtImpl.maxOffset(mq); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 157ae21eba3b..f4b9ba2b0106 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -490,7 +490,7 @@ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consume resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis); } else { resetOffset = - this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, + this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp, false, timeoutMillis); } @@ -903,6 +903,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); } + @Override + public long searchOffset(MessageQueue mq, long timestamp,boolean getTimeLast) throws MQClientException { + return this.mqClientInstance.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast); + } + @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().maxOffset(mq); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 82add92b2d60..71c54e84cb49 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -263,4 +263,6 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + + long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index ac51267161c3..128a44bd6587 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -188,7 +188,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue); + maxOffset = consumer.searchOffset(mq, timeValue, true); } final Map tagCalmap = new HashMap(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java index ac485199c04c..d2df755eb434 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java @@ -132,7 +132,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (commandLine.hasOption('e')) { String timestampStr = commandLine.getOptionValue('e').trim(); long timeValue = timestampFormat(timestampStr); - maxOffset = consumer.searchOffset(mq, timeValue); + maxOffset = consumer.searchOffset(mq, timeValue, true); } System.out.printf("minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", " + mq);