Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
Expand Down
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;

/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
private long awaitTerminationMillisWhenShutdown = 0;

/**
* Interface of asynchronous transfer data
*/
Expand Down Expand Up @@ -705,7 +710,7 @@ public void start() throws MQClientException {
*/
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
Expand Down Expand Up @@ -886,6 +891,14 @@ public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}

public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}

public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
}

public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;

Expand Down Expand Up @@ -92,9 +93,9 @@ public void run() {
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}

public void shutdown() {
public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
this.cleanExpireMsgExecutors.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -96,10 +97,10 @@ public void run() {
}
}

public void shutdown() {
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public interface ConsumeMessageService {
void start();

void shutdown();
void shutdown(long awaitTerminateMillis);

void updateCorePoolSize(int corePoolSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,16 @@ private int getMaxReconsumeTimes() {
}
}

public synchronized void shutdown() {
public void shutdown() {
shutdown(0);
}

public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
Expand Down Expand Up @@ -625,7 +629,7 @@ public synchronized void start() throws MQClientException {
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
Expand Down Expand Up @@ -256,6 +257,34 @@ public void testCheckConfig() {
}
}

@Test
public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
pushConsumer.setAwaitTerminationMillisWhenShutdown(2000);
final AtomicBoolean messageConsumedFlag = new AtomicBoolean(false);
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
countDownLatch.countDown();
try {
Thread.sleep(1000);
messageConsumedFlag.set(true);
} catch (InterruptedException e) {
}

return null;
}
}));

PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();

pushConsumer.shutdown();
assertThat(messageConsumedFlag.get()).isTrue();
}

private DefaultMQPushConsumer createPushConsumer() {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

public class MQVersion {

public static final int CURRENT_VERSION = Version.V4_7_0.ordinal();
public static final int CURRENT_VERSION = Version.V4_7_1.ordinal();

public static String getVersionDesc(int value) {
int length = Version.values().length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ public static String withoutNamespace(String resourceWithNamespace) {
return resourceWithNamespace;
}

StringBuffer strBuffer = new StringBuffer();
StringBuilder stringBuilder = new StringBuilder();
if (isRetryTopic(resourceWithNamespace)) {
strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
stringBuilder.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}
if (isDLQTopic(resourceWithNamespace)) {
strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
stringBuilder.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
}

String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithNamespace);
int index = resourceWithoutRetryAndDLQ.indexOf(NAMESPACE_SEPARATOR);
if (index > 0) {
String resourceWithoutNamespace = resourceWithoutRetryAndDLQ.substring(index + 1);
return strBuffer.append(resourceWithoutNamespace).toString();
return stringBuilder.append(resourceWithoutNamespace).toString();
}

return resourceWithNamespace;
Expand Down Expand Up @@ -91,17 +91,17 @@ public static String wrapNamespace(String namespace, String resourceWithOutNames
}

String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
StringBuffer strBuffer = new StringBuffer();
StringBuilder stringBuilder = new StringBuilder();

if (isRetryTopic(resourceWithOutNamespace)) {
strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
stringBuilder.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
}

if (isDLQTopic(resourceWithOutNamespace)) {
strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
stringBuilder.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
}

return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
return stringBuilder.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.junit.Test;

import static org.apache.rocketmq.common.message.MessageConst.PROPERTY_TRACE_SWITCH;
import static org.junit.Assert.*;

public class MessageTest {
@Test(expected = RuntimeException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.rocketmq.common.protocol.body;

import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;


public class TopicRouteDataTest {
Expand Down
2 changes: 1 addition & 1 deletion distribution/bin/runbroker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
Expand Down
2 changes: 1 addition & 1 deletion distribution/bin/runserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
Expand Down
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
Expand Down
6 changes: 3 additions & 3 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -51,12 +51,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2 changes: 1 addition & 1 deletion filter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion logappender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.1-SNAPSHOT</version>
<version>4.8.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Loading