Skip to content

Commit

Permalink
Merge branch 'develop' into add_clusterlist_cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
lindzh committed Aug 28, 2017
2 parents 2df4cfc + 4abfa4f commit a674878
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 366 deletions.
13 changes: 0 additions & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,13 @@ matrix:
# On Linux, run with specific JDKs only.
- os: linux
env: CUSTOM_JDK="oraclejdk8"
- os: linux
env: CUSTOM_JDK="oraclejdk7"
- os: linux
env: CUSTOM_JDK="openjdk7"

before_install:
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
- cat ~/.mavenrc
- if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi

#os:
# - linux
# - osx
#jdk:
# - oraclejdk8
# - oraclejdk7
# - openjdk7


script:
- travis_retry mvn -B clean apache-rat:check
- travis_retry mvn -B package jacoco:report coveralls:report
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ It offers a variety of features:
----------

## Contributing
We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](CONTRIBUTING.md)
We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/)

----------
## License
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ public class BrokerControllerTest {
*/
@Test
public void testBrokerRestart() throws Exception {
for (int i = 0; i < 2; i++) {
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
brokerController.shutdown();
}
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
brokerController.shutdown();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -63,6 +65,14 @@ public class MessageStoreWithFilterTest {

private static SocketAddress StoreHost;

private DefaultMessageStore master;

private ConsumerFilterManager filterManager;

private int topicCount = 3;

private int msgPerTopic = 30;

static {
try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
Expand All @@ -76,6 +86,24 @@ public class MessageStoreWithFilterTest {
}
}

@Before
public void init() {
filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
try {
master = gen(filterManager);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
}

@After
public void destroy() {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}

public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
Expand Down Expand Up @@ -202,177 +230,143 @@ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs,

@Test
public void testGetMessage_withFilterBitMapAndConsumerChanged() {
int topicCount = 10, msgPerTopic = 10;
ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);

DefaultMessageStore master = null;
List<MessageExtBrokerInner> msgs = null;
try {
master = gen(filterManager);
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// sleep to wait for consume queue has been constructed.
try {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// sleep to wait for consume queue has been constructed.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}

// reset consumer;
String topic = "topic" + 0;
String resetGroup = "CID_" + 2;
String normalGroup = "CID_" + 3;
// reset consumer;
String topic = "topic" + 0;
String resetGroup = "CID_" + 2;
String normalGroup = "CID_" + 3;

{
// reset CID_2@topic0 to get all messages.
SubscriptionData resetSubData = new SubscriptionData();
resetSubData.setExpressionType(ExpressionType.SQL92);
resetSubData.setTopic(topic);
resetSubData.setClassFilterMode(false);
resetSubData.setSubString("a is not null OR a is null");
{
// reset CID_2@topic0 to get all messages.
SubscriptionData resetSubData = new SubscriptionData();
resetSubData.setExpressionType(ExpressionType.SQL92);
resetSubData.setTopic(topic);
resetSubData.setClassFilterMode(false);
resetSubData.setSubString("a is not null OR a is null");

ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
System.currentTimeMillis());
ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
System.currentTimeMillis());

GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));
GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));

try {
assertThat(resetGetResult).isNotNull();
try {
assertThat(resetGetResult).isNotNull();

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);

assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
resetGetResult.release();
}
assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
resetGetResult.release();
}
}

{
ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
assertThat(normalFilterData).isNotNull();
assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());
{
ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
assertThat(normalFilterData).isNotNull();
assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());

SubscriptionData normalSubData = new SubscriptionData();
normalSubData.setExpressionType(normalFilterData.getExpressionType());
normalSubData.setTopic(topic);
normalSubData.setClassFilterMode(false);
normalSubData.setSubString(normalFilterData.getExpression());
SubscriptionData normalSubData = new SubscriptionData();
normalSubData.setExpressionType(normalFilterData.getExpressionType());
normalSubData.setTopic(topic);
normalSubData.setClassFilterMode(false);
normalSubData.setSubString(normalFilterData.getExpression());

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);

GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));
GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));

try {
assertThat(normalGetResult).isNotNull();
assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
normalGetResult.release();
}
try {
assertThat(normalGetResult).isNotNull();
assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
} finally {
normalGetResult.release();
}
} finally {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}
}

@Test
public void testGetMessage_withFilterBitMap() {
int topicCount = 10, msgPerTopic = 500;
ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);

DefaultMessageStore master = null;
List<MessageExtBrokerInner> msgs = null;
try {
master = gen(filterManager);
msgs = putMsg(master, topicCount, msgPerTopic);
// sleep to wait for consume queue has been constructed.
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

try {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
// sleep to wait for consume queue has been constructed.
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;

for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;

for (int j = 0; j < msgPerTopic; j++) {
String group = "CID_" + j;

ConsumerFilterData filterData = filterManager.get(realTopic, group);
assertThat(filterData).isNotNull();

List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);

SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setExpressionType(filterData.getExpressionType());
subscriptionData.setTopic(filterData.getTopic());
subscriptionData.setClassFilterMode(false);
subscriptionData.setSubString(filterData.getExpression());

GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
String assertMsg = group + "-" + realTopic;
try {
assertThat(getMessageResult).isNotNull();
assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());

for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
assertThat(messageExt).isNotNull();

Object evlRet = null;
try {
evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
for (int j = 0; j < msgPerTopic; j++) {
String group = "CID_" + j;

assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);
ConsumerFilterData filterData = filterManager.get(realTopic, group);
assertThat(filterData).isNotNull();

// check
boolean find = false;
for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
find = true;
}
List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);

SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setExpressionType(filterData.getExpressionType());
subscriptionData.setTopic(filterData.getTopic());
subscriptionData.setClassFilterMode(false);
subscriptionData.setSubString(filterData.getExpression());

GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
String assertMsg = group + "-" + realTopic;
try {
assertThat(getMessageResult).isNotNull();
assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());

for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
assertThat(messageExt).isNotNull();

Object evlRet = null;
try {
evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}

assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);

// check
boolean find = false;
for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
find = true;
}
assertThat(find).isTrue();
}
} finally {
getMessageResult.release();
assertThat(find).isTrue();
}
} finally {
getMessageResult.release();
}
}
} finally {
master.shutdown();
master.destroy();
UtilAll.deleteFile(new File(storePath));
}
}
}
1 change: 0 additions & 1 deletion broker/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
<configuration>

<appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
<pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
Expand Down
Loading

0 comments on commit a674878

Please sign in to comment.