diff --git a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java similarity index 63% rename from common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java index d1ec894685f..e140681971f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java @@ -14,46 +14,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.common.config; +package org.apache.rocketmq.broker; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.FlushOptions; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; +import java.nio.charset.StandardCharsets; import java.util.function.BiConsumer; public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - protected volatile boolean isStop = false; - protected ConfigRocksDBStorage configRocksDBStorage = null; + public volatile boolean isStop = false; + public ConfigRocksDBStorage configRocksDBStorage = null; private FlushOptions flushOptions = null; private volatile long lastFlushMemTableMicroSecond = 0; + + private final String filePath; private final long memTableFlushInterval; + private DataVersion kvDataVersion = new DataVersion(); + - public RocksDBConfigManager(long memTableFlushInterval) { + public RocksDBConfigManager(String filePath, long memTableFlushInterval) { + this.filePath = filePath; this.memTableFlushInterval = memTableFlushInterval; } - public boolean load(String configFilePath, BiConsumer biConsumer) { + public boolean init() { this.isStop = false; - this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath); - if (!this.configRocksDBStorage.start()) { - return false; - } - RocksIterator iterator = this.configRocksDBStorage.iterator(); + this.configRocksDBStorage = new ConfigRocksDBStorage(filePath); + return this.configRocksDBStorage.start(); + } + public final boolean loadDataVersion() { + String currDataVersionString = null; try { + byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion(); + if (dataVersion != null && dataVersion.length > 0) { + currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8); + } + kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion(); + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean loadData(BiConsumer biConsumer) { + try (RocksIterator iterator = this.configRocksDBStorage.iterator()) { iterator.seekToFirst(); while (iterator.isValid()) { biConsumer.accept(iterator.key(), iterator.value()); iterator.next(); } - } finally { - iterator.close(); } this.flushOptions = new FlushOptions(); @@ -103,6 +123,20 @@ public void delete(final byte[] keyBytes) throws Exception { this.configRocksDBStorage.delete(keyBytes); } + public void updateKvDataVersion() throws Exception { + kvDataVersion.nextVersion(); + this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8)); + } + + public DataVersion getKvDataVersion() { + return kvDataVersion; + } + + public void updateForbidden(String key, String value) throws Exception { + this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); + } + + public void batchPutWithWal(final WriteBatch batch) throws Exception { this.configRocksDBStorage.batchPutWithWal(batch); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java index 05b53b0bcf2..de293fc4992 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.utils.DataConverter; import org.rocksdb.WriteBatch; @@ -31,14 +31,19 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { + protected RocksDBConfigManager rocksDBConfigManager; + public RocksDBConsumerOffsetManager(BrokerController brokerController) { super(brokerController); - this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override public boolean load() { - return this.rocksDBConfigManager.load(configFilePath(), this::decode0); + if (!rocksDBConfigManager.init()) { + return false; + } + return this.rocksDBConfigManager.loadData(this::decodeOffset); } @Override @@ -56,8 +61,7 @@ protected void removeConsumerOffset(String topicAtGroup) { } } - @Override - protected void decode0(final byte[] key, final byte[] body) { + protected void decodeOffset(final byte[] key, final byte[] body) { String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8); RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java index e9a81a8d686..fe79d5e4320 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -16,32 +16,109 @@ */ package org.apache.rocketmq.broker.subscription; -import java.io.File; - +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.rocksdb.RocksIterator; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { + protected RocksDBConfigManager rocksDBConfigManager; + public RocksDBSubscriptionGroupManager(BrokerController brokerController) { super(brokerController, false); - this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override public boolean load() { - if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + if (!rocksDBConfigManager.init()) { + return false; + } + if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) { return false; } this.init(); return true; } + public boolean loadDataVersion() { + return this.rocksDBConfigManager.loadDataVersion(); + } + + public boolean loadForbidden(BiConsumer biConsumer) { + try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) { + iterator.seekToFirst(); + while (iterator.isValid()) { + biConsumer.accept(iterator.key(), iterator.value()); + iterator.next(); + } + } + return true; + } + + public boolean loadSubscriptionGroupAndForbidden() { + return this.rocksDBConfigManager.loadData(this::decodeSubscriptionGroup) && + this.loadForbidden(this::decodeForbidden) && + merge(); + } + + + private boolean merge() { + if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { + log.info("The switch is off, no merge operation is needed."); + return true; + } + if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { + log.info("json file and json back file not exist, so skip merge"); + return true; + } + + if (!super.load()) { + log.error("load group and forbidden info from json file error, startup will exit"); + return false; + } + + final ConcurrentMap groupTable = this.getSubscriptionGroupTable(); + final ConcurrentMap> forbiddenTable = this.getForbiddenTable(); + final DataVersion dataVersion = super.getDataVersion(); + final DataVersion kvDataVersion = this.getDataVersion(); + if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + for (Map.Entry entry : groupTable.entrySet()) { + putSubscriptionGroupConfig(entry.getValue()); + log.info("import subscription config to rocksdb, group={}", entry.getValue()); + } + for (Map.Entry> entry : forbiddenTable.entrySet()) { + try { + this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue())); + log.info("import forbidden config to rocksdb, group={}", entry.getValue()); + } catch (Exception e) { + log.error("import forbidden config to rocksdb failed, group={}", entry.getValue()); + return false; + } + } + this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + updateDataVersion(); + } + log.info("finish marge subscription config from json file and merge to rocksdb"); + this.persist(); + + return true; + } + @Override public boolean stop() { return this.rocksDBConfigManager.stop(); @@ -89,8 +166,8 @@ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName return subscriptionGroupConfig; } - @Override - protected void decode0(byte[] key, byte[] body) { + + protected void decodeSubscriptionGroup(byte[] key, byte[] body) { String groupName = new String(key, DataConverter.CHARSET_UTF8); SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class); @@ -105,8 +182,63 @@ public synchronized void persist() { } } - @Override - public String configFilePath() { + public String rocksdbConfigFilePath() { return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator; } + + @Override + public DataVersion getDataVersion() { + return rocksDBConfigManager.getKvDataVersion(); + } + + @Override + public void updateDataVersion() { + try { + rocksDBConfigManager.updateKvDataVersion(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected void decodeForbidden(byte[] key, byte[] body) { + String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8); + JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8)); + Set> entries = jsonObject.entrySet(); + ConcurrentMap forbiddenGroup = new ConcurrentHashMap<>(entries.size()); + for (Map.Entry entry : entries) { + forbiddenGroup.put(entry.getKey(), (Integer) entry.getValue()); + } + this.getForbiddenTable().put(forbiddenGroupName, forbiddenGroup); + log.info("load forbidden,{} value {}", forbiddenGroupName, forbiddenGroup.toString()); + } + + @Override + public void updateForbidden(String group, String topic, int forbiddenIndex, boolean setOrClear) { + try { + super.updateForbidden(group, topic, forbiddenIndex, setOrClear); + this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void setForbidden(String group, String topic, int forbiddenIndex) { + try { + super.setForbidden(group, topic, forbiddenIndex); + this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void clearForbidden(String group, String topic, int forbiddenIndex) { + try { + super.clearForbidden(group, topic, forbiddenIndex); + this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index e63b9305868..c66754ccb33 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -156,8 +156,7 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) log.info("create new subscription group, {}", config); } - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } @@ -214,7 +213,7 @@ public int getForbidden(String group, String topic) { return topicForbidden; } - private void updateForbiddenValue(String group, String topic, Integer forbidden) { + protected void updateForbiddenValue(String group, String topic, Integer forbidden) { if (forbidden == null || forbidden <= 0) { this.forbiddenTable.remove(group); log.info("clear group forbidden, {}@{} ", group, topic); @@ -233,8 +232,7 @@ private void updateForbiddenValue(String group, String topic, Integer forbidden) log.info("set group forbidden, {}@{} old: {} new: {}", group, topic, 0, forbidden); } - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } @@ -243,8 +241,7 @@ public void disableConsume(final String groupName) { SubscriptionGroupConfig old = getSubscriptionGroupConfig(groupName); if (old != null) { old.setConsumeEnable(false); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); } } @@ -261,8 +258,7 @@ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { if (null == preConfig) { log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); } - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } } @@ -331,8 +327,7 @@ public void deleteSubscriptionGroupConfig(final String groupName) { this.forbiddenTable.remove(groupName); if (old != null) { log.info("delete subscription group OK, subscription group:{}", old); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } else { log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName); @@ -369,4 +364,14 @@ private Map current(String groupName) { } } } + + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion.assignNewOne(dataVersion); + } + + public void updateDataVersion() { + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + dataVersion.nextVersion(stateMachineVersion); + } + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java index fddecf2d92a..f7f01ce5eff 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -16,39 +16,86 @@ */ package org.apache.rocketmq.broker.topic; -import java.io.File; - +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.protocol.DataVersion; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; public class RocksDBTopicConfigManager extends TopicConfigManager { + protected RocksDBConfigManager rocksDBConfigManager; + public RocksDBTopicConfigManager(BrokerController brokerController) { super(brokerController, false); - this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override public boolean load() { - if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + if (!rocksDBConfigManager.init()) { + return false; + } + if (!loadDataVersion() || !loadTopicConfig()) { return false; } this.init(); return true; } + public boolean loadTopicConfig() { + return this.rocksDBConfigManager.loadData(this::decodeTopicConfig) && merge(); + } + + public boolean loadDataVersion() { + return this.rocksDBConfigManager.loadDataVersion(); + } + + private boolean merge() { + if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { + log.info("The switch is off, no merge operation is needed."); + return true; + } + if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { + log.info("json file and json back file not exist, so skip merge"); + return true; + } + + if (!super.load()) { + log.error("load topic config from json file error, startup will exit"); + return false; + } + + final ConcurrentMap topicConfigTable = this.getTopicConfigTable(); + final DataVersion dataVersion = super.getDataVersion(); + final DataVersion kvDataVersion = this.getDataVersion(); + if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + for (Map.Entry entry : topicConfigTable.entrySet()) { + putTopicConfig(entry.getValue()); + log.info("import topic config to rocksdb, topic={}", entry.getValue()); + } + this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + updateDataVersion(); + } + log.info("finish read topic config from json file and merge to rocksdb"); + this.persist(); + return true; + } + + @Override public boolean stop() { return this.rocksDBConfigManager.stop(); } - @Override - protected void decode0(byte[] key, byte[] body) { + protected void decodeTopicConfig(byte[] key, byte[] body) { String topicName = new String(key, DataConverter.CHARSET_UTF8); TopicConfig topicConfig = JSON.parseObject(body, TopicConfig.class); @@ -56,11 +103,6 @@ protected void decode0(byte[] key, byte[] body) { log.info("load exist local topic, {}", topicConfig.toString()); } - @Override - public String configFilePath() { - return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator; - } - @Override protected TopicConfig putTopicConfig(TopicConfig topicConfig) { String topicName = topicConfig.getTopicName(); @@ -92,4 +134,23 @@ public synchronized void persist() { this.rocksDBConfigManager.flushWAL(); } } + + public String rocksdbConfigFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator; + } + + + @Override + public DataVersion getDataVersion() { + return rocksDBConfigManager.getKvDataVersion(); + } + + @Override + public void updateDataVersion() { + try { + rocksDBConfigManager.updateKvDataVersion(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 511d29e12ad..7542b7bf734 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -277,8 +277,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri putTopicConfig(topicConfig); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); createNew = true; @@ -321,8 +320,7 @@ public TopicConfig createTopicIfAbsent(TopicConfig topicConfig, boolean register } log.info("Create new topic [{}] config:[{}]", topicConfig.getTopicName(), topicConfig); putTopicConfig(topicConfig); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); createNew = true; this.persist(); } finally { @@ -381,8 +379,7 @@ public TopicConfig createTopicInSendMessageBackMethod( log.info("create new topic {}", topicConfig); putTopicConfig(topicConfig); createNew = true; - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } finally { this.topicConfigTableLock.unlock(); @@ -422,8 +419,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue log.info("create new topic {}", topicConfig); putTopicConfig(topicConfig); createNew = true; - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } finally { this.topicConfigTableLock.unlock(); @@ -456,8 +452,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) { putTopicConfig(topicConfig); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); registerBrokerData(topicConfig); @@ -479,8 +474,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) putTopicConfig(topicConfig); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); registerBrokerData(topicConfig); @@ -509,8 +503,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) { log.info("create new topic [{}]", topicConfig); } - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(topicConfig.getTopicName(), topicConfig); } @@ -529,25 +522,8 @@ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { } } - // We don't have a mandatory rule to maintain the validity of order conf in NameServer, - // so we may overwrite the order field mistakenly. - // To avoid the above case, we comment the below codes, please use mqadmin API to update - // the order filed. - /*for (Map.Entry entry : this.topicConfigTable.entrySet()) { - String topic = entry.getKey(); - if (!orderTopics.contains(topic)) { - TopicConfig topicConfig = entry.getValue(); - if (topicConfig.isOrder()) { - topicConfig.setOrder(false); - isChange = true; - log.info("update order topic config, topic={}, order={}", topic, false); - } - } - }*/ - if (isChange) { - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } } @@ -571,8 +547,7 @@ public void deleteTopicConfig(final String topic) { TopicConfig old = removeTopicConfig(topic); if (old != null) { log.info("delete topic config OK, topic: {}", old); - long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; - dataVersion.nextVersion(stateMachineVersion); + updateDataVersion(); this.persist(); } else { log.warn("delete topic config failed, topic: {} not exists", topic); @@ -687,5 +662,11 @@ public boolean containsTopic(String topic) { return topicConfigTable.containsKey(topic); } + public void updateDataVersion() { + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + dataVersion.nextVersion(stateMachineVersion); + } + + } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java index 58b690c9a34..de82eca8f99 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; @@ -108,6 +107,6 @@ public void testOffsetPersistInMemory() { } private boolean notToBeExecuted() { - return MixAll.isMac(); + return false; } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java new file mode 100644 index 00000000000..205e642843b --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.subscription; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RocksdbGroupConfigTransferTest { + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); + + private RocksDBSubscriptionGroupManager rocksDBSubscriptionGroupManager; + + private SubscriptionGroupManager jsonSubscriptionGroupManager; + @Mock + private BrokerController brokerController; + + @Mock + private DefaultMessageStore defaultMessageStore; + + @Before + public void init() { + if (notToBeExecuted()) { + return; + } + BrokerConfig brokerConfig = new BrokerConfig(); + Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + messageStoreConfig.setTransferMetadataJsonToRocksdb(true); + Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); + when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); + } + + @After + public void destroy() { + if (notToBeExecuted()) { + return; + } + Path pathToBeDeleted = Paths.get(basePath); + + try { + Files.walk(pathToBeDeleted) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + // ignore + } + }); + } catch (IOException e) { + // ignore + } + if (rocksDBSubscriptionGroupManager != null) { + rocksDBSubscriptionGroupManager.stop(); + } + } + + + public void initRocksDBSubscriptionGroupManager() { + if (rocksDBSubscriptionGroupManager == null) { + rocksDBSubscriptionGroupManager = new RocksDBSubscriptionGroupManager(brokerController); + rocksDBSubscriptionGroupManager.load(); + } + } + + public void initJsonSubscriptionGroupManager() { + if (jsonSubscriptionGroupManager == null) { + jsonSubscriptionGroupManager = new SubscriptionGroupManager(brokerController); + jsonSubscriptionGroupManager.load(); + } + } + + @Test + public void theFirstTimeLoadJsonSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initJsonSubscriptionGroupManager(); + DataVersion dataVersion = jsonSubscriptionGroupManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(0L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, jsonSubscriptionGroupManager.getSubscriptionGroupTable().size()); + } + + @Test + public void theFirstTimeLoadRocksDBSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initRocksDBSubscriptionGroupManager(); + DataVersion dataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(0L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + } + + + @Test + public void addGroupLoadJsonSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initJsonSubscriptionGroupManager(); + int beforeSize = jsonSubscriptionGroupManager.getSubscriptionGroupTable().size(); + String groupName = "testAddGroupConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(groupName); + subscriptionGroupConfig.setAttributes(attributes); + DataVersion beforeDataVersion = jsonSubscriptionGroupManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + jsonSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig); + + int afterSize = jsonSubscriptionGroupManager.getSubscriptionGroupTable().size(); + DataVersion afterDataVersion = jsonSubscriptionGroupManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + + Assert.assertEquals(0, beforeDataVersionCounter); + Assert.assertEquals(1, afterDataVersionCounter); + Assert.assertEquals(1, afterSize - beforeSize); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + } + + @Test + public void addForbiddenGroupLoadJsonSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initJsonSubscriptionGroupManager(); + int beforeSize = jsonSubscriptionGroupManager.getForbiddenTable().size(); + String groupName = "testAddGroupConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(groupName); + subscriptionGroupConfig.setAttributes(attributes); + DataVersion beforeDataVersion = jsonSubscriptionGroupManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + jsonSubscriptionGroupManager.setForbidden(groupName, "topic", 0); + int afterSize = jsonSubscriptionGroupManager.getForbiddenTable().size(); + DataVersion afterDataVersion = jsonSubscriptionGroupManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + + Assert.assertEquals(1, afterDataVersionCounter - beforeDataVersionCounter); + Assert.assertEquals(1, afterSize - beforeSize); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + } + + @Test + public void addGroupLoadRocksdbSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initRocksDBSubscriptionGroupManager(); + int beforeSize = rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(); + String groupName = "testAddGroupConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(groupName); + subscriptionGroupConfig.setAttributes(attributes); + DataVersion beforeDataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + rocksDBSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig); + int afterSize = rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(); + DataVersion afterDataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + Assert.assertEquals(1, afterDataVersionCounter); + Assert.assertEquals(0, beforeDataVersionCounter); + Assert.assertEquals(1, afterSize - beforeSize); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + } + + @Test + public void addForbiddenLoadRocksdbSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + initRocksDBSubscriptionGroupManager(); + int beforeSize = rocksDBSubscriptionGroupManager.getForbiddenTable().size(); + String groupName = "testAddGroupConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(groupName); + subscriptionGroupConfig.setAttributes(attributes); + DataVersion beforeDataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + rocksDBSubscriptionGroupManager.updateForbidden(groupName, "topic", 0, true); + + int afterSize = rocksDBSubscriptionGroupManager.getForbiddenTable().size(); + DataVersion afterDataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + Assert.assertEquals(1, afterDataVersionCounter - beforeDataVersionCounter); + Assert.assertEquals(1, afterSize - beforeSize); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + } + + @Test + public void theSecondTimeLoadJsonSubscriptionGroupManager() { + if (notToBeExecuted()) { + return; + } + addGroupLoadJsonSubscriptionGroupManager(); + jsonSubscriptionGroupManager.stop(); + rocksDBSubscriptionGroupManager = null; + addForbiddenGroupLoadJsonSubscriptionGroupManager(); + jsonSubscriptionGroupManager.stop(); + rocksDBSubscriptionGroupManager = null; + jsonSubscriptionGroupManager = new SubscriptionGroupManager(brokerController); + jsonSubscriptionGroupManager.load(); + DataVersion dataVersion = jsonSubscriptionGroupManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(2L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, jsonSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertNotEquals(0, jsonSubscriptionGroupManager.getForbiddenTable().size()); + Assert.assertNotEquals(0, jsonSubscriptionGroupManager.getSubscriptionGroupTable().size()); + } + + @Test + public void theSecondTimeLoadRocksdbTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + addGroupLoadRocksdbSubscriptionGroupManager(); + rocksDBSubscriptionGroupManager.stop(); + rocksDBSubscriptionGroupManager = null; + addForbiddenLoadRocksdbSubscriptionGroupManager(); + rocksDBSubscriptionGroupManager.stop(); + rocksDBSubscriptionGroupManager = null; + rocksDBSubscriptionGroupManager = new RocksDBSubscriptionGroupManager(brokerController); + rocksDBSubscriptionGroupManager.load(); + DataVersion dataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(2L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getForbiddenTable().size()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + } + + + @Test + public void jsonUpgradeToRocksdb() { + if (notToBeExecuted()) { + return; + } + addGroupLoadJsonSubscriptionGroupManager(); + addForbiddenGroupLoadJsonSubscriptionGroupManager(); + initRocksDBSubscriptionGroupManager(); + DataVersion dataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(3L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getForbiddenTable().size()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(), jsonSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(), jsonSubscriptionGroupManager.getForbiddenTable().size()); + + rocksDBSubscriptionGroupManager.stop(); + rocksDBSubscriptionGroupManager = new RocksDBSubscriptionGroupManager(brokerController); + rocksDBSubscriptionGroupManager.load(); + dataVersion = rocksDBSubscriptionGroupManager.getDataVersion(); + Assert.assertEquals(3L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getForbiddenTable().size()); + Assert.assertNotEquals(0, rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertEquals(rocksDBSubscriptionGroupManager.getSubscriptionGroupTable().size(), jsonSubscriptionGroupManager.getSubscriptionGroupTable().size()); + Assert.assertEquals(rocksDBSubscriptionGroupManager.getForbiddenTable().size(), jsonSubscriptionGroupManager.getForbiddenTable().size()); + } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } + +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java index 3c829437cf1..cd05ef516f7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java @@ -18,7 +18,11 @@ package org.apache.rocketmq.broker.subscription; import com.google.common.collect.ImmutableMap; + +import java.nio.file.Paths; import java.util.Map; +import java.util.UUID; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SubscriptionGroupAttributes; @@ -30,17 +34,19 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class SubscriptionGroupManagerTest { private String group = "group"; + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); @Mock private BrokerController brokerControllerMock; private SubscriptionGroupManager subscriptionGroupManager; @@ -53,13 +59,14 @@ public void before() { false )); subscriptionGroupManager = spy(new SubscriptionGroupManager(brokerControllerMock)); - when(brokerControllerMock.getMessageStore()).thenReturn(null); - doNothing().when(subscriptionGroupManager).persist(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + Mockito.lenient().when(brokerControllerMock.getMessageStoreConfig()).thenReturn(messageStoreConfig); } @After public void destroy() { - if (MixAll.isMac()) { + if (notToBeExecuted()) { return; } if (subscriptionGroupManager != null) { @@ -69,18 +76,18 @@ public void destroy() { @Test public void testUpdateAndCreateSubscriptionGroupInRocksdb() { - if (MixAll.isMac()) { + if (notToBeExecuted()) { return; } - when(brokerControllerMock.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); - subscriptionGroupManager = spy(new RocksDBSubscriptionGroupManager(brokerControllerMock)); - subscriptionGroupManager.load(); group += System.currentTimeMillis(); updateSubscriptionGroupConfig(); } @Test public void updateSubscriptionGroupConfig() { + if (notToBeExecuted()) { + return; + } SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(group); Map attr = ImmutableMap.of("+test", "true"); @@ -99,4 +106,8 @@ public void updateSubscriptionGroupConfig() { assertThatThrownBy(() -> subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1)) .isInstanceOf(RuntimeException.class).hasMessage("attempt to update an unchangeable attribute. key: test"); } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java index ed71a3313a8..b0e0d057363 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java @@ -16,10 +16,13 @@ */ package org.apache.rocketmq.broker.topic; +import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; @@ -39,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import static com.google.common.collect.Sets.newHashSet; @@ -47,6 +51,10 @@ @RunWith(MockitoJUnitRunner.class) public class RocksdbTopicConfigManagerTest { + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); + private RocksDBTopicConfigManager topicConfigManager; @Mock private BrokerController brokerController; @@ -62,9 +70,11 @@ public void init() { BrokerConfig brokerConfig = new BrokerConfig(); when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + messageStoreConfig.setTransferMetadataJsonToRocksdb(true); when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); - when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); + Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); + Mockito.lenient().when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); topicConfigManager = new RocksDBTopicConfigManager(brokerController); topicConfigManager.load(); } @@ -197,7 +207,6 @@ public void testNormalAddKeyOnCreating() { TopicConfig existingTopicConfig = topicConfigManager.getTopicConfigTable().get(topic); Assert.assertEquals("enum-2", existingTopicConfig.getAttributes().get("enum.key")); Assert.assertEquals("16", existingTopicConfig.getAttributes().get("long.range.key")); - // assert file } @Test diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java new file mode 100644 index 00000000000..ddd6fa07d32 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RocksdbTopicConfigTransferTest { + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); + + private RocksDBTopicConfigManager rocksdbTopicConfigManager; + + private TopicConfigManager jsonTopicConfigManager; + @Mock + private BrokerController brokerController; + + @Mock + private DefaultMessageStore defaultMessageStore; + + @Before + public void init() { + if (notToBeExecuted()) { + return; + } + BrokerConfig brokerConfig = new BrokerConfig(); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + messageStoreConfig.setTransferMetadataJsonToRocksdb(true); + Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); + when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); + } + + @After + public void destroy() { + Path pathToBeDeleted = Paths.get(basePath); + + try { + Files.walk(pathToBeDeleted) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + // ignore + } + }); + } catch (IOException e) { + // ignore + } + if (rocksdbTopicConfigManager != null) { + rocksdbTopicConfigManager.stop(); + } + } + + public void initRocksdbTopicConfigManager() { + if (rocksdbTopicConfigManager == null) { + rocksdbTopicConfigManager = new RocksDBTopicConfigManager(brokerController); + rocksdbTopicConfigManager.load(); + } + } + + public void initJsonTopicConfigManager() { + if (jsonTopicConfigManager == null) { + jsonTopicConfigManager = new TopicConfigManager(brokerController); + jsonTopicConfigManager.load(); + } + } + + @Test + public void theFirstTimeLoadJsonTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + initJsonTopicConfigManager(); + DataVersion dataVersion = jsonTopicConfigManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(0L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, jsonTopicConfigManager.getTopicConfigTable().size()); + } + + @Test + public void theFirstTimeLoadRocksdbTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicConfigManager(); + DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(0L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksdbTopicConfigManager.getTopicConfigTable().size()); + } + + + @Test + public void addTopicLoadJsonTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + initJsonTopicConfigManager(); + String topicName = "testAddTopicConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + DataVersion beforeDataVersion = jsonTopicConfigManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + jsonTopicConfigManager.updateTopicConfig(topicConfig); + + DataVersion afterDataVersion = jsonTopicConfigManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + + Assert.assertEquals(0, beforeDataVersionCounter); + Assert.assertEquals(1, afterDataVersionCounter); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + } + + @Test + public void addTopicLoadRocksdbTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + initRocksdbTopicConfigManager(); + String topicName = "testAddTopicConfig-" + System.currentTimeMillis(); + + Map attributes = new HashMap<>(); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setAttributes(attributes); + DataVersion beforeDataVersion = rocksdbTopicConfigManager.getDataVersion(); + long beforeDataVersionCounter = beforeDataVersion.getCounter().get(); + long beforeTimestamp = beforeDataVersion.getTimestamp(); + + rocksdbTopicConfigManager.updateTopicConfig(topicConfig); + + DataVersion afterDataVersion = rocksdbTopicConfigManager.getDataVersion(); + long afterDataVersionCounter = afterDataVersion.getCounter().get(); + long afterTimestamp = afterDataVersion.getTimestamp(); + Assert.assertEquals(0, beforeDataVersionCounter); + Assert.assertEquals(1, afterDataVersionCounter); + Assert.assertTrue(afterTimestamp >= beforeTimestamp); + } + + @Test + public void theSecondTimeLoadJsonTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + addTopicLoadJsonTopicConfigManager(); + jsonTopicConfigManager.stop(); + jsonTopicConfigManager = new TopicConfigManager(brokerController); + jsonTopicConfigManager.load(); + DataVersion dataVersion = jsonTopicConfigManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(1L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, jsonTopicConfigManager.getTopicConfigTable().size()); + } + + @Test + public void theSecondTimeLoadRocksdbTopicConfigManager() { + if (notToBeExecuted()) { + return; + } + addTopicLoadRocksdbTopicConfigManager(); + rocksdbTopicConfigManager.stop(); + rocksdbTopicConfigManager = null; + rocksdbTopicConfigManager = new RocksDBTopicConfigManager(brokerController); + rocksdbTopicConfigManager.load(); + DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(1L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksdbTopicConfigManager.getTopicConfigTable().size()); + } + + @Test + public void jsonUpgradeToRocksdb() { + if (notToBeExecuted()) { + return; + } + addTopicLoadJsonTopicConfigManager(); + initRocksdbTopicConfigManager(); + DataVersion dataVersion = rocksdbTopicConfigManager.getDataVersion(); + Assert.assertNotNull(dataVersion); + Assert.assertEquals(2L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksdbTopicConfigManager.getTopicConfigTable().size()); + Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(), jsonTopicConfigManager.getTopicConfigTable().size()); + + rocksdbTopicConfigManager.stop(); + rocksdbTopicConfigManager = new RocksDBTopicConfigManager(brokerController); + rocksdbTopicConfigManager.load(); + dataVersion = rocksdbTopicConfigManager.getDataVersion(); + Assert.assertEquals(2L, dataVersion.getCounter().get()); + Assert.assertEquals(0L, dataVersion.getStateVersion()); + Assert.assertNotEquals(0, rocksdbTopicConfigManager.getTopicConfigTable().size()); + Assert.assertEquals(rocksdbTopicConfigManager.getTopicConfigTable().size(), rocksdbTopicConfigManager.getTopicConfigTable().size()); + } + + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } + +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java index 6052a79d413..3fd1d14c3a2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -16,10 +16,13 @@ */ package org.apache.rocketmq.broker.topic; +import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicAttributes; @@ -37,6 +40,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import static com.google.common.collect.Sets.newHashSet; @@ -45,6 +49,9 @@ @RunWith(MockitoJUnitRunner.class) public class TopicConfigManagerTest { + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); private TopicConfigManager topicConfigManager; @Mock private BrokerController brokerController; @@ -57,8 +64,9 @@ public void init() { BrokerConfig brokerConfig = new BrokerConfig(); when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); - when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); + Mockito.lenient().when(brokerController.getMessageStore()).thenReturn(defaultMessageStore); when(defaultMessageStore.getStateMachineVersion()).thenReturn(0L); topicConfigManager = new TopicConfigManager(brokerController); } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 5e997596194..178b7528762 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -16,11 +16,9 @@ */ package org.apache.rocketmq.common; -import org.apache.rocketmq.common.config.RocksDBConfigManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.rocksdb.Statistics; import java.io.IOException; import java.util.Map; @@ -28,8 +26,6 @@ public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - protected RocksDBConfigManager rocksDBConfigManager; - public boolean load() { String fileName = null; try { @@ -89,10 +85,6 @@ public synchronized void persist() { } } - protected void decode0(final byte[] key, final byte[] body) { - - } - public boolean stop() { return true; } @@ -104,8 +96,4 @@ public boolean stop() { public abstract String encode(final boolean prettyFormat); public abstract void decode(final String jsonString); - - public Statistics getStatistics() { - return rocksDBConfigManager == null ? null : rocksDBConfigManager.getStatistics(); - } } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 20319abba3d..6711d556ab2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -16,18 +16,7 @@ */ package org.apache.rocketmq.common.config; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import com.google.common.collect.Maps; - import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; @@ -51,6 +40,16 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import static org.rocksdb.RocksDB.NOT_FOUND; public abstract class AbstractRocksDBStorage { diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java index b40f8046e84..f657d9cf2d2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -18,6 +18,7 @@ import java.io.File; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -54,6 +55,15 @@ public class ConfigRocksDBStorage extends AbstractRocksDBStorage { + private static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(StandardCharsets.UTF_8); + private static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(StandardCharsets.UTF_8); + protected ColumnFamilyHandle kvDataVersionFamilyHandle; + + private static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = "forbidden".getBytes(StandardCharsets.UTF_8); + protected ColumnFamilyHandle forbiddenFamilyHandle; + + + public ConfigRocksDBStorage(final String dbPath) { super(); this.dbPath = dbPath; @@ -115,11 +125,15 @@ protected boolean postLoad() { ColumnFamilyOptions defaultOptions = createConfigOptions(); this.cfOptions.add(defaultOptions); cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); - + cfDescriptors.add(new ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions)); + cfDescriptors.add(new ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions)); final List cfHandles = new ArrayList(); open(cfDescriptors, cfHandles); this.defaultCFHandle = cfHandles.get(0); + this.kvDataVersionFamilyHandle = cfHandles.get(1); + this.forbiddenFamilyHandle = cfHandles.get(2); + } catch (final Exception e) { AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e); return false; @@ -129,7 +143,8 @@ protected boolean postLoad() { @Override protected void preShutdown() { - + this.kvDataVersionFamilyHandle.close(); + this.forbiddenFamilyHandle.close(); } private ColumnFamilyOptions createConfigOptions() { @@ -225,6 +240,22 @@ public byte[] get(final byte[] keyBytes) throws Exception { return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes); } + public void updateKvDataVersion(final byte[] valueBytes) throws Exception { + put(this.kvDataVersionFamilyHandle, this.ableWalWriteOptions, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, valueBytes, valueBytes.length); + } + + public byte[] getKvDataVersion() throws Exception { + return get(this.kvDataVersionFamilyHandle, this.totalOrderReadOptions, KV_DATA_VERSION_KEY); + } + + public void updateForbidden(final byte[] keyBytes, final byte[] valueBytes) throws Exception { + put(this.forbiddenFamilyHandle, this.ableWalWriteOptions, keyBytes, keyBytes.length, valueBytes, valueBytes.length); + } + + public byte[] getForbidden(final byte[] keyBytes) throws Exception { + return get(this.forbiddenFamilyHandle, this.totalOrderReadOptions, keyBytes); + } + public void delete(final byte[] keyBytes) throws Exception { delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes); } @@ -246,6 +277,10 @@ public RocksIterator iterator() { return this.db.newIterator(this.defaultCFHandle, this.totalOrderReadOptions); } + public RocksIterator forbiddenIterator() { + return this.db.newIterator(this.forbiddenFamilyHandle, this.totalOrderReadOptions); + } + public void rangeDelete(final byte[] startKey, final byte[] endKey) throws RocksDBException { rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey); } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 0ba02e4cb9d..21b8aa8dbbf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -105,6 +105,9 @@ public class MessageStoreConfig { // default, defaultRocksDB @ImportantField private String storeType = StoreType.DEFAULT.getStoreType(); + + private boolean transferMetadataJsonToRocksdb = false; + // ConsumeQueue file size,default is 30W private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; // enable consume queue ext @@ -1819,4 +1822,13 @@ public int getTopicQueueLockNum() { public void setTopicQueueLockNum(int topicQueueLockNum) { this.topicQueueLockNum = topicQueueLockNum; } + + public boolean isTransferMetadataJsonToRocksdb() { + return transferMetadataJsonToRocksdb; + } + + public void setTransferMetadataJsonToRocksdb(boolean transferMetadataJsonToRocksdb) { + this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb; + } + } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f8b8ec248a8..79b2315f91a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -92,6 +92,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand; +import org.apache.rocketmq.tools.command.metadata.RocksDBConfigToJsonCommand; import org.apache.rocketmq.tools.command.namesrv.AddWritePermSubCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; @@ -298,6 +299,7 @@ public static void initCommand() { initCommand(new GetAclSubCommand()); initCommand(new ListAclSubCommand()); initCommand(new CopyAclsSubCommand()); + initCommand(new RocksDBConfigToJsonCommand()); } private static void printHelp() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java index b987ad873be..1d81287ac7d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java @@ -14,18 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.tools.command.metadata; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import org.rocksdb.RocksIterator; import java.io.File; import java.util.HashMap; @@ -47,8 +50,8 @@ public String commandDesc() { @Override public Options buildCommandlineOptions(Options options) { - Option pathOption = new Option("p", "path", true, - "Absolute path to the metadata directory"); + Option pathOption = new Option("p", "configPath", true, + "Absolute path to the metadata config directory"); pathOption.setRequired(true); options.addOption(pathOption); @@ -62,57 +65,50 @@ public Options buildCommandlineOptions(Options options) { @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { - String path = commandLine.getOptionValue("path").trim(); + String path = commandLine.getOptionValue("configPath").trim(); if (StringUtils.isEmpty(path) || !new File(path).exists()) { System.out.print("Rocksdb path is invalid.\n"); return; } String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + if (!path.endsWith("/")) { + path += "/"; + } + path += configType; + + ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true); + configRocksDBStorage.start(); + RocksIterator iterator = configRocksDBStorage.iterator(); - final long memTableFlushInterval = 60 * 60 * 1000L; - RocksDBConfigManager kvConfigManager = new RocksDBConfigManager(memTableFlushInterval); try { - if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { - // for topics.json - final Map topicsJsonConfig = new HashMap<>(); - final Map topicConfigTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String topic = new String(key, DataConverter.CHARSET_UTF8); - final String topicConfig = new String(value, DataConverter.CHARSET_UTF8); - final JSONObject jsonObject = JSONObject.parseObject(topicConfig); - topicConfigTable.put(topic, jsonObject); - }); + final Map configMap = new HashMap<>(); + final Map configTable = new HashMap<>(); + iterator.seekToFirst(); + while (iterator.isValid()) { + final byte[] key = iterator.key(); + final byte[] value = iterator.value(); + final String name = new String(key, DataConverter.CHARSET_UTF8); + final String config = new String(value, DataConverter.CHARSET_UTF8); + final JSONObject jsonObject = JSONObject.parseObject(config); + configTable.put(name, jsonObject); + iterator.next(); + } + byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion(); + configMap.put("dataVersion", + JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8))); - if (isLoad) { - topicsJsonConfig.put("topicConfigTable", (JSONObject) JSONObject.toJSON(topicConfigTable)); - final String topicsJsonStr = JSONObject.toJSONString(topicsJsonConfig, true); - System.out.print(topicsJsonStr + "\n"); - return; - } + if (TOPICS_JSON_CONFIG.toLowerCase().equals(configType)) { + configMap.put("topicConfigTable", JSON.parseObject(JSONObject.toJSONString(configTable))); } if (SUBSCRIPTION_GROUP_JSON_CONFIG.toLowerCase().equals(configType)) { - // for subscriptionGroup.json - final Map subscriptionGroupJsonConfig = new HashMap<>(); - final Map subscriptionGroupTable = new HashMap<>(); - boolean isLoad = kvConfigManager.load(path, (key, value) -> { - final String subscriptionGroup = new String(key, DataConverter.CHARSET_UTF8); - final String subscriptionGroupConfig = new String(value, DataConverter.CHARSET_UTF8); - final JSONObject jsonObject = JSONObject.parseObject(subscriptionGroupConfig); - subscriptionGroupTable.put(subscriptionGroup, jsonObject); - }); - - if (isLoad) { - subscriptionGroupJsonConfig.put("subscriptionGroupTable", - (JSONObject) JSONObject.toJSON(subscriptionGroupTable)); - final String subscriptionGroupJsonStr = JSONObject.toJSONString(subscriptionGroupJsonConfig, true); - System.out.print(subscriptionGroupJsonStr + "\n"); - return; - } + configMap.put("subscriptionGroupTable", JSON.parseObject(JSONObject.toJSONString(configTable))); } - System.out.print("Config type was not recognized, configType=" + configType + "\n"); + System.out.print(JSONObject.toJSONString(configMap, true) + "\n"); + } catch (Exception e) { + System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n"); } finally { - kvConfigManager.stop(); + configRocksDBStorage.shutdown(); } } -} +} \ No newline at end of file