Skip to content

Commit

Permalink
Merge d1f2b9d into 421a22c
Browse files Browse the repository at this point in the history
  • Loading branch information
lindzh committed Aug 11, 2017
2 parents 421a22c + d1f2b9d commit fb4dc12
Show file tree
Hide file tree
Showing 20 changed files with 303 additions and 50 deletions.
Expand Up @@ -151,7 +151,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b

timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp, false);
}

if (timeStampOffset < 0) {
Expand Down
Expand Up @@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue
}

@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
return next.getOffsetInQueueByTime(topic, queueId, timestamp);
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) {
return next.getOffsetInQueueByTime(topic, queueId, timestamp, isGetTimeLast);
}

@Override
Expand Down
Expand Up @@ -360,8 +360,10 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, Remot
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);

boolean isGetTimeLast = requestHeader.getGetTimelast() != null ? requestHeader.getGetTimelast() : false;

long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
requestHeader.getTimestamp(), isGetTimeLast);

responseHeader.setOffset(offset);

Expand Down
Expand Up @@ -117,6 +117,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
}

@Override
public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getTimeLast);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
Expand Down
Expand Up @@ -264,6 +264,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
}

@Override
public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getTimeLast);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
Expand Down
Expand Up @@ -64,4 +64,6 @@ void sendMessageBack(final MessageExt msg, final int delayLevel, final String br
* @throws MQClientException
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;

long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException;
}
Expand Up @@ -167,6 +167,10 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.doSearchOffset(mq,timestamp,false);
}

public long doSearchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -176,7 +180,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
getTimeLast, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down
Expand Up @@ -668,12 +668,13 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final boolean getTimeLast, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTimestamp(timestamp);
requestHeader.setGetTimelast(getTimeLast);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down
Expand Up @@ -471,8 +471,12 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.searchOffset(mq, timestamp, false);
}

public long searchOffset(final MessageQueue mq, final long timestamp, final boolean getTimeLast) throws MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast);
}

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
Expand Down
Expand Up @@ -908,7 +908,11 @@ public void resetOffsetByTimeStamp(long timeStamp)
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
return this.searchOffset(mq, timestamp, false);
}

public long searchOffset(MessageQueue mq, long timestamp, boolean getTimeLast) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getTimeLast);
}

@Override
Expand Down
Expand Up @@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long timestamp;

private Boolean getTimelast = false;

@Override
public void checkFields() throws RemotingCommandException {
// TODO Auto-generated method stub
Expand Down Expand Up @@ -62,4 +64,11 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public Boolean getGetTimelast() {
return getTimelast;
}

public void setGetTimelast(Boolean getTimelast) {
this.getTimelast = getTimelast;
}
}
54 changes: 48 additions & 6 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Expand Up @@ -151,7 +151,7 @@ public void recover() {
}
}

public long getOffsetInQueueByTime(final long timestamp) {
public long getOffsetInQueueByTime(final long timestamp, boolean isGetTimeLast) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
Expand Down Expand Up @@ -195,8 +195,7 @@ public long getOffsetInQueueByTime(final long timestamp) {
}

if (targetOffset != -1) {

offset = targetOffset;
offset = fixCandidateTime(mappedFile,byteBuffer,targetOffset,isGetTimeLast,minPhysicOffset,timestamp);
} else {
if (leftIndexValue == -1) {

Expand All @@ -205,9 +204,13 @@ public long getOffsetInQueueByTime(final long timestamp) {

offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
if (isGetTimeLast) {
long fixedTimestamp = getStoreTimeStamp(byteBuffer, leftOffset);
offset = fixCandidateTime(mappedFile, byteBuffer, leftOffset, true, minPhysicOffset, fixedTimestamp);
} else {
long fixedTimestamp = getStoreTimeStamp(byteBuffer, rightOffset);
offset = fixCandidateTime(mappedFile, byteBuffer, rightOffset, false, minPhysicOffset, fixedTimestamp);
}
}
}

Expand All @@ -220,6 +223,45 @@ public long getOffsetInQueueByTime(final long timestamp) {
return 0;
}

private long getStoreTimeStamp(ByteBuffer byteBuffer, int targetOffset) {
byteBuffer.position(targetOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
return this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
}

private int fixCandidateTime(MappedFile mappedFile, ByteBuffer byteBuffer, int targetOffset, boolean isGetTimeLast, long minPhysicOffset, long timestamp) {
int candidateOffset = targetOffset;
int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int fixedOffset = candidateOffset;
while (fixedOffset >= fixedLow) {
byteBuffer.position(fixedOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
break;
}
if (!isGetTimeLast) {
long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < timestamp) {
break;
} else {
candidateOffset = fixedOffset;
fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE;
}
} else {
long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime > timestamp) {
break;
} else {
candidateOffset = fixedOffset;
fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE;
}
}
}
return candidateOffset;
}

public void truncateDirtyLogicFiles(long phyOffet) {

int logicFileSize = this.mappedFileSize;
Expand Down
Expand Up @@ -621,10 +621,10 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue
return 0;
}

public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, boolean isGetTimeLast) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getOffsetInQueueByTime(timestamp);
return logic.getOffsetInQueueByTime(timestamp, isGetTimeLast);
}

return 0;
Expand Down Expand Up @@ -807,10 +807,11 @@ public void executeDeleteFilesManually() {
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();

long lastQueryMsgTime = end;
long fixedBeginTime = begin - 1001;
long lastQueryMsgTime = end + 1;

for (int i = 0; i < 3; i++) {
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, fixedBeginTime, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
Expand All @@ -825,33 +826,28 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon

try {

boolean match = true;
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
}
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
long storeTime = getMessageStoreTimestamp(result);

boolean keyMatch = true;
// check key match,now ignore check key duplicate
if (keyMatch) {
if (storeTime >= begin && storeTime <= end) {
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}

// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
// if (topic.equals(msg.getTopic())) {
// for (String k : keyArray) {
// if (k.equals(key)) {
// match = true;
// break;
// }
// }
// }

if (match) {
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
if (0 == m) {
lastQueryMsgTime = storeTime;
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}

} catch (Exception e) {
log.error("queryMessage exception", e);
}
Expand All @@ -861,14 +857,18 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon
break;
}

if (lastQueryMsgTime < begin) {
if (lastQueryMsgTime < fixedBeginTime) {
break;
}
}

return queryMessageResult;
}

private long getMessageStoreTimestamp(SelectMappedBufferResult message) {
return message.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
}

@Override
public void updateHaMasterAddress(String newAddr) {
this.haService.updateMasterAddress(newAddr);
Expand Down Expand Up @@ -1413,6 +1413,7 @@ class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {

if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
Expand Down
Expand Up @@ -110,7 +110,7 @@ GetMessageResult getMessage(final String group, final String topic, final int qu
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp,boolean isGetTimeLast);

/**
* Look up the message by given commit log offset.
Expand Down

0 comments on commit fb4dc12

Please sign in to comment.