Skip to content

Commit

Permalink
Merge pull request #4118 from ltamber/5.0.0-beta-ctopic
Browse files Browse the repository at this point in the history
[RIP 30] Support Compaction topic
  • Loading branch information
duhenglucky committed Jul 8, 2022
2 parents 9a9f445 + d1783ea commit ca94afc
Show file tree
Hide file tree
Showing 34 changed files with 3,126 additions and 330 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -13,3 +13,4 @@ devenv
.DS_Store
localbin
nohup.out
docker/
Expand Up @@ -18,13 +18,16 @@

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.attribute.DeletePolicy;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.MQVersion;
Expand All @@ -48,6 +51,7 @@
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.DeletePolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
Expand Down Expand Up @@ -245,6 +249,16 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
}

MessageAccessor.setProperties(msgInner, oriProps);

DeletePolicy deletePolicy = DeletePolicyUtils.getDeletePolicy(Optional.of(topicConfig));
if (Objects.equals(deletePolicy, DeletePolicy.COMPACTION)) {
if (StringUtils.isBlank(msgInner.getKeys())) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("the message don't have message key");
return response;
}
}

msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
Expand Down
Expand Up @@ -26,22 +26,30 @@

public class TopicAttributes {
public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute(
"queue.type",
false,
newHashSet("BatchCQ", "SimpleCQ"),
"SimpleCQ"
"queue.type",
false,
newHashSet("BatchCQ", "SimpleCQ"),
"SimpleCQ"
);
public static final EnumAttribute DELETE_POLICY_ATTRIBUTE = new EnumAttribute(
"delete.policy",
false,
newHashSet("NORMAL", "COMPACTION"),
"NORMAL"
);
public static final EnumAttribute TOPIC_MESSAGE_TYPE_ATTRIBUTE = new EnumAttribute(
"message.type",
true,
TopicMessageType.topicMessageTypeSet(),
TopicMessageType.NORMAL.getValue()
);

public static final Map<String, Attribute> ALL;

static {
ALL = new HashMap<>();
ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
ALL.put(DELETE_POLICY_ATTRIBUTE.getName(), DELETE_POLICY_ATTRIBUTE);
ALL.put(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName(), TOPIC_MESSAGE_TYPE_ATTRIBUTE);
}
}
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.attribute;

public enum DeletePolicy {
NORMAL,
COMPACTION
}
Expand Up @@ -24,6 +24,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,12 +35,14 @@
public class MessageDecoder {
// public final static int MSG_ID_LENGTH = 8 + 8;

public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public final static Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
public final static int MESSAGE_FLAG_POSTION = 16;
public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
public final static int MESSAGE_STORE_TIMESTAMP_POSITION = 56;
public final static int MESSAGE_MAGIC_CODE = -626843481;
// End of file empty MAGIC CODE cbd43194
public final static int BLANK_MAGIC_CODE = -875286124;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
Expand Down
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;

import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.DeletePolicy;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public class DeletePolicyUtils {
public static boolean isCompaction(Optional<TopicConfig> topicConfig) {
return Objects.equals(DeletePolicy.COMPACTION, getDeletePolicy(topicConfig));
}

public static DeletePolicy getDeletePolicy(Optional<TopicConfig> topicConfig) {
if (!topicConfig.isPresent()) {
return DeletePolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
}

String attributeName = TopicAttributes.DELETE_POLICY_ATTRIBUTE.getName();

Map<String, String> attributes = topicConfig.get().getAttributes();
if (attributes == null || attributes.size() == 0) {
return DeletePolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
}

if (attributes.containsKey(attributeName)) {
return DeletePolicy.valueOf(attributes.get(attributeName));
} else {
return DeletePolicy.valueOf(TopicAttributes.DELETE_POLICY_ATTRIBUTE.getDefaultValue());
}
}
}
47 changes: 47 additions & 0 deletions docs/cn/Example_Compaction_Topic_cn.md
@@ -0,0 +1,47 @@
# Compaction Topic

## 使用方式
### 创建compaction topic
```shell
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
```
### 生产数据
与普通消息一样
```java
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message(topic, "tags", "keys", "bodys"getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
```
### 消费数据
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
在compaction场景下,大部分消费都是从0开始消费完整的数据
```java
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setPullThreadNums(4);
consumer.start();

Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
consumer.assign(messageQueueList);
messageQueueList.forEach(mq -> {
try {
consumer.seekToBegin(mq);
} catch (MQClientException e) {
e.printStackTrace();
}
});

while (true) {
List<MessageExt> msgList = consumer.poll(1000);
if (msgList != null) {
msgList.forEach(msg -> System.out.println(msg.toString()));
}
}
```
49 changes: 49 additions & 0 deletions docs/en/Example_Compaction_Topic.md
@@ -0,0 +1,49 @@
# Compaction Topic

## use example
### create compaction topic
```shell
$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
```

### produce message
the same with ordinary message
```java
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message(topic, "tags", "keys", "bodys"getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
```
### consume message
the message offset remains unchanged after compaction. If the consumer specified offset does not exist, return the most recent message after the offset.

In the compaction scenario, most consumption was started from the beginning of the queue.
```java
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setPullThreadNums(4);
consumer.start();

Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
consumer.assign(messageQueueList);
messageQueueList.forEach(mq -> {
try {
consumer.seekToBegin(mq);
} catch (MQClientException e) {
e.printStackTrace();
}
});

while (true) {
List<MessageExt> msgList = consumer.poll(1000);
if (msgList != null) {
msgList.forEach(msg -> System.out.println(msg.toString()));
}
}
```
Expand Up @@ -54,6 +54,13 @@ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wro
this.pagecacheRT = pagecacheRT;
}

public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, long storeTimestamp) {
this.status = status;
this.wroteOffset = wroteOffset;
this.wroteBytes = wroteBytes;
this.storeTimestamp = storeTimestamp;
}

public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier<String> msgIdSupplier,
long storeTimestamp, long logicsOffset, long pagecacheRT) {
this.status = status;
Expand Down

0 comments on commit ca94afc

Please sign in to comment.