Skip to content

Commit

Permalink
Fix conflict,merge develop (#2906)
Browse files Browse the repository at this point in the history
* [ISSUE #1233] Fix CVE-2011-1473

* fix Multiple instances in the same application share MQClientInstance

* [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset

* [ISSUE #2745] Changed the support time of the request/reply feature to 4.6.0.

Co-authored-by: von gosling <vongosling@apache.org>

* [ISSUE #2729] Replace with Math.min method call

* [ISSUE #2801]Fix NamesrvAddr connot set in Producer

* [ISSUE 2800] optimize: the spelling of topicSynFlag

Co-authored-by: ph3636 <tianxingguang@kanzhun.com>

* [ISSUE #2803] Fix the endpoint cannot get instanceId without http (#2804)

* fix the endpoint cannot get instanceId without http

* fix the endpoint cannot get instanceId without http

* add unit test

* add unit test

* add unit test

Co-authored-by: panzhi33 <wb-pz502261@alibaba-inc.com>

* fix messageArrivingListener NPE

* [ISSUE #2538]Optimize log output when message trace saving fails

* [ISSUE #2811] Fix the wrong topic was consumed in the DefaultMessageStoreTest test program

* [ISSUE #2821] Overriding the ServiceThread#shutdown in HAClient class

* [ISSUE #2805] remove redundant package imports

* [ISSUE #2833] Support trace for TranscationProducer (#2834)

* [ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer (#2832)

* [ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer

* Fix message loss problem when rebalance with LitePullConsumer, update 2

* [ISSUE #2846]fix -E might not port to other systems

* fix some nonconformity after checkstyle

* Support OpenTracing(#2861)

* [ISSUE #2872] remove log files created by integration test when mvn clean

* [ISSUE #2872] move log files created by integration test to target dir

* Change log level to debug: "Half offset {} has been committed/rolled back"

* Fix unit test stability

Bump mockito-core to 3.10.0, remove powermock dependency, suppress useless logging

* [ISSUE #2898] Resolve rocketmq-example project failed during checkstyle execution (#2899)

Co-authored-by: SSpirits <shadowyspirits@outlook.com>
Co-authored-by: panzhi33 <wb-pz502261@alibaba-inc.com>
Co-authored-by: panzhi <panzhi33@qq.com>
Co-authored-by: ArronHuang <41609451+ArronHuang@users.noreply.github.com>
Co-authored-by: von gosling <vongosling@apache.org>
Co-authored-by: drgnchan <40224023+drgnchan@users.noreply.github.com>
Co-authored-by: zhangjidi2016 <zhangjidi@cmss.chinamobile.com>
Co-authored-by: ph3636 <38041490+ph3636@users.noreply.github.com>
Co-authored-by: ph3636 <tianxingguang@kanzhun.com>
Co-authored-by: BurningCN <1015773611@qq.com>
Co-authored-by: francis lee <francislee.cn@outlook.com>
Co-authored-by: 灼华 <43363120+BurningCN@users.noreply.github.com>
Co-authored-by: yuz10 <845238369@qq.com>
Co-authored-by: huangli <areyouok@gmail.com>
Co-authored-by: chenrl <raymond2366@outlook.com>
Co-authored-by: ayanamist <ayanamist@gmail.com>
Co-authored-by: zhangjidi2016 <1017543663@qq.com>
  • Loading branch information
18 people committed May 19, 2021
1 parent 7d32c02 commit ac9aaa4
Show file tree
Hide file tree
Showing 63 changed files with 2,041 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,20 @@ public void cloneOffset(final String srcGroup, final String destGroup, final Str
}
}

public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
log.warn("clean group offset {}", topicAtGroup);
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,10 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,

this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());

if (requestHeader.isRemoveOffset()) {
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
}

if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);

int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

if (queueNums < 0) {
queueNums = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void check(long transactionTimeout, int transactionCheckMax,
break;
}
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
Expand Down
12 changes: 12 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>0.33.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-mock</artifactId>
<version>0.33.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void setInstanceName(String instanceName) {

public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}

Expand Down Expand Up @@ -178,8 +178,8 @@ public ClientConfig cloneClientConfig() {
}

public String getNamesrvAddr() {
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.NAMESRV_ENDPOINT_PATTERN.matcher(namesrvAddr.trim()).matches()) {
return namesrvAddr.substring(NameServerAddressUtils.ENDPOINT_PREFIX.length());
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr.trim())) {
return NameServerAddressUtils.getNameSrvAddrFromNamesrvEndpoint(namesrvAddr);
}
return namesrvAddr;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;

public class EndTransactionContext {
private String producerGroup;
private Message message;
private String brokerAddr;
private String msgId;
private String transactionId;
private LocalTransactionState transactionState;
private boolean fromTransactionCheck;

public String getProducerGroup() {
return producerGroup;
}

public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}

public Message getMessage() {
return message;
}

public void setMessage(Message message) {
this.message = message;
}

public String getBrokerAddr() {
return brokerAddr;
}

public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}

public String getMsgId() {
return msgId;
}

public void setMsgId(String msgId) {
this.msgId = msgId;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

public LocalTransactionState getTransactionState() {
return transactionState;
}

public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}

public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}

public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

public interface EndTransactionHook {
String hookName();

void endTransaction(final EndTransactionContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1721,10 +1721,11 @@ public void deleteTopicInNameServer(final String addr, final String topic, final
throw new MQClientException(response.getCode(), response.getRemark());
}

public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
public void deleteSubscriptionGroup(final String addr, final String groupName, final boolean removeOffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
requestHeader.setRemoveOffset(removeOffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ public long getPullOffset(MessageQueue messageQueue) {
return -1;
}

public void updatePullOffset(MessageQueue messageQueue, long offset) {
public void updatePullOffset(MessageQueue messageQueue, long offset, ProcessQueue processQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
if (messageQueueState.getProcessQueue() != processQueue) {
return;
}
messageQueueState.setPullOffset(offset);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ public synchronized void commitAll() {
}
}

private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) {
private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset);
assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
}
}

Expand Down Expand Up @@ -746,6 +746,9 @@ public void run() {
}

long offset = nextPullOffset(messageQueue);
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
Expand All @@ -758,7 +761,9 @@ public void run() {
}

PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());

if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
Expand All @@ -775,7 +780,7 @@ public void run() {
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
Expand Down Expand Up @@ -171,6 +174,11 @@ public void registerSendMessageHook(final SendMessageHook hook) {
log.info("register sendMessage Hook, {}", hook.hookName());
}

public void registerEndTransactionHook(final EndTransactionHook hook) {
this.endTransactionHookList.add(hook);
log.info("register endTransaction Hook, {}", hook.hookName());
}

public void start() throws MQClientException {
this.start(true);
}
Expand Down Expand Up @@ -386,6 +394,7 @@ private void processTransactionState(
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
Expand Down Expand Up @@ -967,6 +976,36 @@ public void executeSendMessageHookAfter(final SendMessageContext context) {
}
}

public boolean hasEndTransactionHook() {
return !this.endTransactionHookList.isEmpty();
}

public void executeEndTransactionHook(final EndTransactionContext context) {
if (!this.endTransactionHookList.isEmpty()) {
for (EndTransactionHook hook : this.endTransactionHookList) {
try {
hook.endTransaction(context);
} catch (Throwable e) {
log.warn("failed to executeEndTransactionHook", e);
}
}
}
}

public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
boolean fromTransactionCheck) {
if (hasEndTransactionHook()) {
EndTransactionContext context = new EndTransactionContext();
context.setProducerGroup(defaultMQProducer.getProducerGroup());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMsgId(msgId);
context.setTransactionId(msg.getTransactionId());
context.setTransactionState(state);
context.setFromTransactionCheck(fromTransactionCheck);
executeEndTransactionHook(context);
}
}
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
Expand Down Expand Up @@ -1266,7 +1305,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
}

try {
this.endTransaction(sendResult, localTransactionState, localException);
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
Expand All @@ -1290,6 +1329,7 @@ public SendResult send(
}

public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
Expand Down Expand Up @@ -1318,6 +1358,7 @@ public void endTransaction(
break;
}

doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
Expand Down
Loading

0 comments on commit ac9aaa4

Please sign in to comment.