Skip to content

Commit

Permalink
[RIP-21] submodule common & client & remoting
Browse files Browse the repository at this point in the history
  • Loading branch information
ayanamist committed Jul 3, 2021
1 parent efe598d commit b4c099c
Show file tree
Hide file tree
Showing 62 changed files with 3,229 additions and 177 deletions.
17 changes: 17 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
<artifactId>rocketmq-client</artifactId>
<name>rocketmq-client ${project.version}</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -73,5 +86,9 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public class MQBrokerException extends Exception {
private final String errorMessage;
private final String brokerAddr;

MQBrokerException() {
this.responseCode = 0;
this.errorMessage = null;
this.brokerAddr = null;
}

public MQBrokerException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ errorMessage));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.exception;

public class MQRedirectException extends MQBrokerException {
private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0];

private final byte[] body;

public MQRedirectException(byte[] responseBody) {
this.body = responseBody;
}

// This exception class is used as a flow control item, so stack trace is useless and performance killer.
@Override public synchronized Throwable fillInStackTrace() {
this.setStackTrace(UNASSIGNED_STACK);
return this;
}

public byte[] getBody() {
return body;
}
}
180 changes: 153 additions & 27 deletions client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,52 @@
*/
package org.apache.rocketmq.client.impl;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MQAdminImpl {

private final InternalLogger log = ClientLogger.getLog();
Expand Down Expand Up @@ -182,6 +187,10 @@ public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClie
}

public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
LogicalQueueRouteData logicalQueueRouteData = searchLogicalQueueRouteByTimestamp(mq, timestamp);
if (logicalQueueRouteData != null) {
mq = logicalQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -190,8 +199,9 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
long offset = this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
timeoutMillis);
return correctLogicalQueueOffset(offset, logicalQueueRouteData);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -201,24 +211,50 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
}

public long maxOffset(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
return this.maxOffset(mq, true);
}

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
public long maxOffset(MessageQueue mq, boolean committed) throws MQClientException {
final MessageQueue origMq = mq;
String topic = mq.getTopic();
LogicalQueueRouteData previousQueueRouteData = null;
for (int i = 0; i < 5; i++) {
LogicalQueueRouteData maxQueueRouteData = this.searchLogicalQueueRouteByOffset(origMq, Long.MAX_VALUE);
if (maxQueueRouteData != null) {
if (previousQueueRouteData != null && Objects.equal(previousQueueRouteData.getMessageQueue(), maxQueueRouteData.getMessageQueue())) {
throw new MQClientException("Topic route info not latest", null);
}
previousQueueRouteData = maxQueueRouteData;
mq = maxQueueRouteData.getMessageQueue();
}
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
if (brokerAddr != null) {
try {
long offset = this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, topic, mq.getQueueId(), committed, maxQueueRouteData != null, timeoutMillis);
return correctLogicalQueueOffset(offset, maxQueueRouteData);
} catch (MQRedirectException e) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, false, null, Collections.singleton(mq.getQueueId()));
continue;
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
throw new MQClientException("Redirect exceed max times", null);
}

public long minOffset(MessageQueue mq) throws MQClientException {
LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
if (minQueueRouteData != null) {
mq = minQueueRouteData.getMessageQueue();
}

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand All @@ -227,7 +263,8 @@ public long minOffset(MessageQueue mq) throws MQClientException {

if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
long offset = this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
return correctLogicalQueueOffset(offset, minQueueRouteData);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand All @@ -236,7 +273,29 @@ public long minOffset(MessageQueue mq) throws MQClientException {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

private List<LogicalQueueRouteData> queryLogicalQueueRouteData(MessageQueue mq) {
if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName())) {
TopicRouteData topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
if (topicRouteData == null) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
topicRouteData = this.mQClientFactory.queryTopicRouteData(mq.getTopic());
}
if (topicRouteData != null) {
LogicalQueuesInfo logicalQueuesInfo = topicRouteData.getLogicalQueuesInfo();
if (logicalQueuesInfo != null) {
return logicalQueuesInfo.get(mq.getQueueId());
}
}
}
return null;
}

public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
LogicalQueueRouteData minQueueRouteData = searchLogicalQueueRouteByOffset(mq, 0L);
if (minQueueRouteData != null) {
mq = minQueueRouteData.getMessageQueue();
}

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
Expand Down Expand Up @@ -445,4 +504,71 @@ public void operationComplete(ResponseFuture responseFuture) {

throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
}

private static long correctLogicalQueueOffset(long offset, LogicalQueueRouteData logicalQueueRouteData) {
if (logicalQueueRouteData == null) {
return offset;
}
return logicalQueueRouteData.toLogicalQueueOffset(offset);
}

private LogicalQueueRouteData searchLogicalQueueRouteByTimestamp(MessageQueue mq, long timestamp) {
List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
if (queueRouteDataList == null) {
return null;
}
LogicalQueueRouteData logicalQueueRouteData = null;
for (LogicalQueueRouteData el : queueRouteDataList) {
if (!el.isReadable()) {
continue;
}
if (logicalQueueRouteData == null && el.getFirstMsgTimeMillis() < 0) {
logicalQueueRouteData = el;
} else if (el.getFirstMsgTimeMillis() >= 0) {
if (el.getFirstMsgTimeMillis() <= timestamp && el.getLastMsgTimeMillis() >= timestamp) {
logicalQueueRouteData = el;
break;
}
}
}
if (logicalQueueRouteData == null) {
logicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
}
return logicalQueueRouteData;
}

private LogicalQueueRouteData searchLogicalQueueRouteByOffset(MessageQueue mq, long offset) {
List<LogicalQueueRouteData> queueRouteDataList = this.queryLogicalQueueRouteData(mq);
if (queueRouteDataList == null) {
return null;
}
{
List<LogicalQueueRouteData> list = Lists.newArrayListWithCapacity(queueRouteDataList.size());
for (LogicalQueueRouteData queueRouteData : queueRouteDataList) {
if (LogicalQueueRouteData.READABLE_PREDICT.apply(queueRouteData)) {
list.add(queueRouteData);
}
}
queueRouteDataList = list;
}
if (queueRouteDataList.isEmpty()) {
return null;
}
if (offset <= 0) {
// min
return Collections.min(queueRouteDataList);
} else if (offset == Long.MAX_VALUE) {
// max
return Collections.max(queueRouteDataList);
}
Collections.sort(queueRouteDataList);
LogicalQueueRouteData searchKey = new LogicalQueueRouteData();
searchKey.setLogicalQueueDelta(offset);
int idx = Collections.binarySearch(queueRouteDataList, searchKey);
if (idx < 0) {
idx = -idx - 1;
idx -= 1;
}
return queueRouteDataList.get(idx);
}
}
Loading

0 comments on commit b4c099c

Please sign in to comment.