Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public static void main(String[] args) {

public static BrokerController start(BrokerController controller) {
try {

controller.start();

String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

Expand Down Expand Up @@ -242,7 +244,7 @@ private static void properties2SystemEnv(Properties properties) {
System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
}

public static Options buildCommandlineOptions(final Options options) {
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

if (tagsCode == null || tagsCode < 0L) {
if (tagsCode == null) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

public class BrokerStartupTest {

private String storePathRootDir = ".";

@Test
public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
Expand All @@ -36,5 +38,4 @@ public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationT
method.invoke(null, properties);
Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
Expand Down Expand Up @@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest {
try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
} catch (UnknownHostException e) {
e.printStackTrace();
}
try {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

@Before
public void init() {
public void init() throws Exception {
filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
try {
master = gen(filterManager);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
master = gen(filterManager);
}

@After
Expand All @@ -107,7 +102,7 @@ public void destroy() {
public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
msg.setTags("TAG1");
msg.setTags(System.currentTimeMillis() + "TAG");
msg.setKeys("Hello");
msg.setBody(msgBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
Expand All @@ -125,7 +120,7 @@ public MessageExtBrokerInner buildMessage() {
}

public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
boolean enableCqExt, int cqExtFileSize) {
boolean enableCqExt, int cqExtFileSize) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
Expand Down Expand Up @@ -155,9 +150,7 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex
new MessageArrivingListener() {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// System.out.println(String.format("Msg coming: %s, %d, %d, %d",
// topic, queueId, logicOffset, tagsCode));
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
Expand All @@ -166,8 +159,6 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
@Override
public void dispatch(DispatchRequest request) {
try {
// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
// BitsArray.create(request.getBitMap()).toString()));
} catch (Throwable e) {
e.printStackTrace();
}
Expand All @@ -183,7 +174,7 @@ public void dispatch(DispatchRequest request) {
}

protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
int msgCountPerTopic) throws Exception {
int msgCountPerTopic) throws Exception {
List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
Expand Down Expand Up @@ -229,22 +220,10 @@ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs,
}

@Test
public void testGetMessage_withFilterBitMapAndConsumerChanged() {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

// sleep to wait for consume queue has been constructed.
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}
Thread.sleep(200);

// reset consumer;
String topic = "topic" + 0;
Expand Down Expand Up @@ -303,16 +282,10 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() {
}

@Test
public void testGetMessage_withFilterBitMap() {
List<MessageExtBrokerInner> msgs = null;
try {
msgs = putMsg(master, topicCount, msgPerTopic);
// sleep to wait for consume queue has been constructed.
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
public void testGetMessage_withFilterBitMap() throws Exception {
List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);

Thread.sleep(100);

for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
Expand Down Expand Up @@ -369,4 +342,32 @@ public void testGetMessage_withFilterBitMap() {
}
}
}

@Test
public void testGetMessage_withFilter_checkTagsCode() throws Exception {
putMsg(master, topicCount, msgPerTopic);

Thread.sleep(200);

for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;

GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000,
new MessageFilter() {
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) {
return false;
}
return true;
}

@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
return true;
}
});
assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
Expand All @@ -49,15 +48,6 @@ public static void main(String[] args) {

public static NamesrvController main0(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 4096;
}

if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 4096;
}

try {
//PackageConflictDetect.detectFastjson();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final

if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}

if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
Expand Down Expand Up @@ -569,6 +576,6 @@ protected boolean isExtWriteEnable() {
* Check {@code tagsCode} is address of extend file or tags code.
*/
public boolean isExtAddr(long tagsCode) {
return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
return ConsumeQueueExt.isExtAddr(tagsCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ConsumeQueueExt(final String topic,
* Just test {@code address} is less than 0.
* </p>
*/
public boolean isExtAddr(final long address) {
public static boolean isExtAddr(final long address) {
return address <= MAX_ADDR;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -104,6 +106,10 @@ public class DefaultMessageStore implements MessageStore {

private final LinkedList<CommitLogDispatcher> dispatcherList;

private RandomAccessFile lockFile;

private FileLock lock;

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
Expand Down Expand Up @@ -138,6 +144,10 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}

public void truncateDirtyLogicFiles(long phyOffset) {
Expand Down Expand Up @@ -196,6 +206,15 @@ public boolean load() {
* @throws Exception
*/
public void start() throws Exception {

lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}

lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);

this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
Expand Down Expand Up @@ -254,6 +273,14 @@ public void shutdown() {
}

this.transientStorePool.destroy();

if (lockFile != null && lock != null) {
try {
lock.release();
lockFile.close();
} catch (IOException e) {
}
}
}

public void destroy() {
Expand Down Expand Up @@ -487,7 +514,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
break;
}

boolean extRet = false;
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
Expand All @@ -496,11 +523,12 @@ public GetMessageResult getMessage(final String group, final String topic, final
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}

if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public static String getAbortFile(final String rootDir) {
return rootDir + File.separator + "abort";
}

public static String getLockFile(final String rootDir) {
return rootDir + File.separator + "lock";
}

public static String getDelayOffsetStorePath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
}
Expand Down
Loading