Skip to content

Commit

Permalink
[ISSUE apache#8058]Support for upgrading metadata in json to rocksdb (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed May 14, 2024
1 parent 159a603 commit bb7aff8
Show file tree
Hide file tree
Showing 18 changed files with 985 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.config;
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected volatile boolean isStop = false;
protected ConfigRocksDBStorage configRocksDBStorage = null;
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
}

public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) {
public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
if (!this.configRocksDBStorage.start()) {
return false;
}
RocksIterator iterator = this.configRocksDBStorage.iterator();
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
return this.configRocksDBStorage.start();
}
public final boolean loadDataVersion() {
String currDataVersionString = null;
try {
byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
if (dataVersion != null && dataVersion.length > 0) {
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
}
kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
} finally {
iterator.close();
}

this.flushOptions = new FlushOptions();
Expand Down Expand Up @@ -103,6 +123,20 @@ public void delete(final byte[] keyBytes) throws Exception {
this.configRocksDBStorage.delete(keyBytes);
}

public void updateKvDataVersion() throws Exception {
kvDataVersion.nextVersion();
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public DataVersion getKvDataVersion() {
return kvDataVersion;
}

public void updateForbidden(String key, String value) throws Exception {
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
}


public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

Expand All @@ -31,14 +31,19 @@

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
}

@Override
Expand All @@ -56,8 +61,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
protected void decodeOffset(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,109 @@
*/
package org.apache.rocketmq.broker.subscription;

import java.io.File;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.rocksdb.RocksIterator;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;

public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBSubscriptionGroupManager(BrokerController brokerController) {
super(brokerController, false);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) {
if (!rocksDBConfigManager.init()) {
return false;
}
if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) {
return false;
}
this.init();
return true;
}

public boolean loadDataVersion() {
return this.rocksDBConfigManager.loadDataVersion();
}

public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
}
return true;
}

public boolean loadSubscriptionGroupAndForbidden() {
return this.rocksDBConfigManager.loadData(this::decodeSubscriptionGroup) &&
this.loadForbidden(this::decodeForbidden) &&
merge();
}


private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("The switch is off, no merge operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("json file and json back file not exist, so skip merge");
return true;
}

if (!super.load()) {
log.error("load group and forbidden info from json file error, startup will exit");
return false;
}

final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = this.getSubscriptionGroupTable();
final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
for (Map.Entry<String, SubscriptionGroupConfig> entry : groupTable.entrySet()) {
putSubscriptionGroupConfig(entry.getValue());
log.info("import subscription config to rocksdb, group={}", entry.getValue());
}
for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : forbiddenTable.entrySet()) {
try {
this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue()));
log.info("import forbidden config to rocksdb, group={}", entry.getValue());
} catch (Exception e) {
log.error("import forbidden config to rocksdb failed, group={}", entry.getValue());
return false;
}
}
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
updateDataVersion();
}
log.info("finish marge subscription config from json file and merge to rocksdb");
this.persist();

return true;
}

@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
Expand Down Expand Up @@ -89,8 +166,8 @@ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName
return subscriptionGroupConfig;
}

@Override
protected void decode0(byte[] key, byte[] body) {

protected void decodeSubscriptionGroup(byte[] key, byte[] body) {
String groupName = new String(key, DataConverter.CHARSET_UTF8);
SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class);

Expand All @@ -105,8 +182,63 @@ public synchronized void persist() {
}
}

@Override
public String configFilePath() {
public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator;
}

@Override
public DataVersion getDataVersion() {
return rocksDBConfigManager.getKvDataVersion();
}

@Override
public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected void decodeForbidden(byte[] key, byte[] body) {
String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8);
JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8));
Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
ConcurrentMap<String, Integer> forbiddenGroup = new ConcurrentHashMap<>(entries.size());
for (Map.Entry<String, Object> entry : entries) {
forbiddenGroup.put(entry.getKey(), (Integer) entry.getValue());
}
this.getForbiddenTable().put(forbiddenGroupName, forbiddenGroup);
log.info("load forbidden,{} value {}", forbiddenGroupName, forbiddenGroup.toString());
}

@Override
public void updateForbidden(String group, String topic, int forbiddenIndex, boolean setOrClear) {
try {
super.updateForbidden(group, topic, forbiddenIndex, setOrClear);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void setForbidden(String group, String topic, int forbiddenIndex) {
try {
super.setForbidden(group, topic, forbiddenIndex);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void clearForbidden(String group, String topic, int forbiddenIndex) {
try {
super.clearForbidden(group, topic, forbiddenIndex);
this.rocksDBConfigManager.updateForbidden(group, JSON.toJSONString(this.getForbiddenTable().get(group)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit bb7aff8

Please sign in to comment.