Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-21] Logic Queue submodule common & client #3127

Merged
merged 1 commit into from
Jul 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
<artifactId>rocketmq-client</artifactId>
<name>rocketmq-client ${project.version}</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -73,5 +86,9 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public class MQBrokerException extends Exception {
private final String errorMessage;
private final String brokerAddr;

MQBrokerException() {
this.responseCode = 0;
this.errorMessage = null;
this.brokerAddr = null;
}

public MQBrokerException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ errorMessage));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.exception;

public class MQRedirectException extends MQBrokerException {
private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0];

private final byte[] body;

public MQRedirectException(byte[] responseBody) {
this.body = responseBody;
}

// This exception class is used as a flow control item, so stack trace is useless and performance killer.
@Override public synchronized Throwable fillInStackTrace() {
this.setStackTrace(UNASSIGNED_STACK);
return this;
}

public byte[] getBody() {
return body;
}
}
180 changes: 153 additions & 27 deletions client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,52 @@
*/
package org.apache.rocketmq.client.impl;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MQAdminImpl {

private final InternalLogger log = ClientLogger.getLog();
Expand Down Expand Up @@ -182,6 +187,10 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
LogicalQueueRouteData logicalQueueRouteData = searchLogicalQueueRouteByTimestamp(mq, timestamp);
if (logicalQueueRouteData != null) {
mq = logicalQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -190,8 +199,9 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
long offset = this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
return correctLogicalQueueOffset(offset, logicalQueueRouteData);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -201,24 +211,50 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
}

public long maxOffset(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
return this.maxOffset(mq, true);
}

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
public long maxOffset(MessageQueue mq, boolean committed) throws MQClientException {
final MessageQueue origMq = mq;
String topic = mq.getTopic();
LogicalQueueRouteData previousQueueRouteData = null;
for (int i = 0; i < 5; i++) {
LogicalQueueRouteData maxQueueRouteData = this.searchLogicalQueueRouteByOffset(origMq, Long.MAX_VALUE);
if (maxQueueRouteData != null) {
if (previousQueueRouteData != null && Objects.equal(previousQueueRouteData.getMessageQueue(), maxQueueRouteData.getMessageQueue())) {
throw new MQClientException("Topic route info not latest", null);
}
previousQueueRouteData = maxQueueRouteData;
mq = maxQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
if (brokerAddr != null) {
try {
long offset = this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, topic, mq.getQueueId(), committed, maxQueueRouteData != null, timeoutMillis);
return correctLogicalQueueOffset(offset, maxQueueRouteData);
} catch (MQRedirectException e) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, false, null, Collections.singleton(mq.getQueueId()));
continue;
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
throw new MQClientException("Redirect exceed max times", null);
}

public long minOffset(MessageQueue mq) throws MQClientException {
LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
if (minQueueRouteData != null) {
mq = minQueueRouteData.getMessageQueue();
}

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -227,7 +263,8 @@ public long minOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
long offset = this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return correctLogicalQueueOffset(offset, minQueueRouteData);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -236,7 +273,29 @@ public long minOffset(MessageQueue mq) throws MQClientException {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

private List<LogicalQueueRouteData> queryLogicalQueueRouteData(MessageQueue mq) {
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
TopicRouteData topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
if (topicRouteData == null) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
}
if (topicRouteData != null) {
LogicalQueuesInfo logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
if (logicalQueuesInfo != null) {
return logicalQueuesInfo.get(mq.getQueueId());
}
}
}
return null;
}

public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
if (minQueueRouteData != null) {
mq = minQueueRouteData.getMessageQueue();
}

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand Down Expand Up @@ -445,4 +504,71 @@ public void operationComplete(ResponseFuture responseFuture) {

throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
}

private static long correctLogicalQueueOffset(long offset, LogicalQueueRouteData logicalQueueRouteData) {
if (logicalQueueRouteData == null) {
return offset;
}
return logicalQueueRouteData.toLogicalQueueOffset(offset);
}

private LogicalQueueRouteData searchLogicalQueueRouteByTimestamp(MessageQueue mq, long timestamp) {
List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
if (queueRouteDataList == null) {
return null;
}
LogicalQueueRouteData logicalQueueRouteData = null;
for (LogicalQueueRouteData el : queueRouteDataList) {
if (!el.isReadable()) {
continue;
}
if (logicalQueueRouteData == null && el.getFirstMsgTimeMillis() < 0) {
logicalQueueRouteData = el;
} else if (el.getFirstMsgTimeMillis() >= 0) {
if (el.getFirstMsgTimeMillis() <= timestamp && el.getLastMsgTimeMillis() >= timestamp) {
logicalQueueRouteData = el;
break;
}
}
}
if (logicalQueueRouteData == null) {
logicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
}
return logicalQueueRouteData;
}

private LogicalQueueRouteData searchLogicalQueueRouteByOffset(MessageQueue mq, long offset) {
List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
if (queueRouteDataList == null) {
return null;
}
{
List<LogicalQueueRouteData> list = Lists.newArrayListWithCapacity(queueRouteDataList.size());
for (LogicalQueueRouteData queueRouteData : queueRouteDataList) {
if (LogicalQueueRouteData.READABLE_PREDICT.apply(queueRouteData)) {
list.add(queueRouteData);
}
}
queueRouteDataList = list;
}
if (queueRouteDataList.isEmpty()) {
return null;
}
if (offset <= 0) {
// min
return Collections.min(queueRouteDataList);
} else if (offset == Long.MAX_VALUE) {
// max
return Collections.max(queueRouteDataList);
}
Collections.sort(queueRouteDataList);
LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
searchKey.setLogicalQueueDelta(offset);
int idx = Collections.binarySearch(queueRouteDataList, searchKey);
if (idx < 0) {
idx = -idx - 1;
idx -= 1;
}
return queueRouteDataList.get(idx);
}
}
Loading