Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c37adb5
fix query message by time and fix get queue offset by time
lindzh Jul 6, 2017
2c24b67
fix consumer queue get offset by time high low
lindzh Jul 11, 2017
a8835c4
add search offset by time return last position if needed
lindzh Aug 9, 2017
f2f2fba
add search offset by time last one
lindzh Aug 10, 2017
d1f2b9d
fix time offset
lindzh Aug 11, 2017
1ae0693
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
8302bb8
Merge branch 'develop' of github.com:apache/incubator-rocketmq into f…
lindzh Aug 11, 2017
f0e243c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
1810be4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 14, 2017
e3988d3
Merge branch 'develop' into fix_query_message_by_time
lindzh Aug 14, 2017
53dcd8d
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 22, 2017
4abfa4f
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 28, 2017
d576e38
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 30, 2017
bb446c4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 21, 2017
11d40c2
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 25, 2017
ae5c41e
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 11, 2017
89dbf04
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 12, 2017
caa68a9
Merge branch 'develop' into fix_query_message_by_time
lindzh Oct 12, 2017
9ebcd6b
fix query offset by time and add comment for isGetTimeLast
lindzh Oct 12, 2017
bc1c880
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 25, 2017
48d93e8
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 7, 2017
962dfbf
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 13, 2017
844f0a9
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 14, 2017
e35ac69
Merge branch 'develop' into fix_query_message_by_time
lindzh Dec 14, 2017
fb51089
add isGetlast JavaDoc and fix queryConsumeQueueOffsetByTime
lindzh Dec 14, 2017
5567377
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 18, 2017
c1f503c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 20, 2017
ccbc4b4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Jan 16, 2018
171799e
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Jan 23, 2018
03911db
fix last get
lindzh Feb 27, 2018
ddb1cc7
Merge branch 'develop' into fix_query_message_by_time
lindzh Feb 27, 2018
124cdad
Merge branch 'develop' into fix_query_message_by_time
Mar 26, 2018
d17a094
fix method and parameter name
Mar 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
Expand All @@ -34,8 +28,7 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -47,12 +40,21 @@
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

public class Broker2Client {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
Expand Down Expand Up @@ -151,7 +153,8 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b

timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
}

if (timeStampOffset < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQue
}

@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
return next.getOffsetInQueueByTime(topic, queueId, timestamp);
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, int getLastOrFirstOffset) {
return next.getOffsetInQueueByTime(topic, queueId, timestamp, getLastOrFirstOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
Expand Down Expand Up @@ -364,8 +365,13 @@ private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);

int getLastOrFirstOffset = OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET;
if (requestHeader.getGetLastOrFirstOffset() != null) {
getLastOrFirstOffset = requestHeader.getGetLastOrFirstOffset();
}

long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
requestHeader.getTimestamp(), getLastOrFirstOffset);

responseHeader.setOffset(offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
}

@Override
public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp, getLastOrFirstOffset);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
}

@Override
public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp, getLastOrFirstOffset);
}

@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ void sendMessageBack(final MessageExt msg, final int delayLevel, final String br
* @return queue set
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;

long searchOffset(final MessageQueue mq, final long timestamp, final int getLastOrFirstOffset) throws MQClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
Expand Down Expand Up @@ -167,6 +168,10 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.doSearchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
}

public long doSearchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -176,7 +181,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
getLastOrFirstOffset, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,14 +668,14 @@ public MessageExt viewMessage(final String addr, final long phyoffset, final lon

throw new MQBrokerException(response.getCode(), response.getRemark());
}

public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
final long timeoutMillis)

public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final int getLastOrFirstOffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTimestamp(timestamp);
requestHeader.setGetLastOrFirstOffset(getLastOrFirstOffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
Expand All @@ -45,10 +38,10 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
Expand All @@ -59,10 +52,19 @@
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
Expand Down Expand Up @@ -471,8 +473,12 @@ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.searchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
}

public long searchOffset(final MessageQueue mq, final long timestamp, final int getLastOrFirstOffset) throws MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getLastOrFirstOffset);
}

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
Expand Down Expand Up @@ -956,7 +957,11 @@ public void resetOffsetByTimeStamp(long timeStamp)
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
return this.searchOffset(mq, timestamp, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
}

public long searchOffset(MessageQueue mq, long timestamp, int getLastOrFirstOffset) throws MQClientException {
return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq, timestamp, getLastOrFirstOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.constant;

public class OffsetConstant {

public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET = 1;

public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET = 2;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long timestamp;

private Integer getLastOrFirstOffset;

@Override
public void checkFields() throws RemotingCommandException {

Expand Down Expand Up @@ -61,4 +63,11 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public Integer getGetLastOrFirstOffset() {
return getLastOrFirstOffset;
}

public void setGetLastOrFirstOffset(Integer getLastOrFirstOffset) {
this.getLastOrFirstOffset = getLastOrFirstOffset;
}
}
Loading