Skip to content

Commit

Permalink
[ISSUE apache#8058]Support for upgrading metadata in json to rocksdb (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed May 21, 2024
1 parent 159a603 commit 0dd653f
Show file tree
Hide file tree
Showing 18 changed files with 1,075 additions and 174 deletions.
26 changes: 24 additions & 2 deletions .github/workflows/maven.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,56 @@
name: Build and Run Tests by Maven

on:
pull_request:
types: [opened, reopened, synchronize]
push:
branches: [master, develop, bazel]

jobs:
java_build:
name: "maven-compile (${{ matrix.os }}, JDK-${{ matrix.jdk }})"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
# see https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#supported-runners-and-hardware-resources
os: [ubuntu-latest, windows-latest, macos-latest]
jdk: [8]

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.jdk }}
distribution: "adopt"
cache: "maven"

- name: Build with Maven
run: mvn -B package --file pom.xml
- name: Upload JVM crash logs

- name: Run tests with increased memory and debug info
run: mvn test -X -Dparallel=none -DargLine="-Xmx1024m -XX:MaxPermSize=256m"

- name: Upload Auth JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/auth/hs_err_pid*.log
retention-days: 1

- name: Check for broker JVM crash logs
if: failure()
run: |
echo "Checking for JVM crash logs..."
ls -al /Users/runner/work/rocketmq/rocketmq/broker/
- name: Upload broker JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/broker/hs_err_pid*.log
retention-days: 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.config;
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected volatile boolean isStop = false;
protected ConfigRocksDBStorage configRocksDBStorage = null;
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
}

public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) {
public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
if (!this.configRocksDBStorage.start()) {
return false;
}
RocksIterator iterator = this.configRocksDBStorage.iterator();
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
return this.configRocksDBStorage.start();
}
public final boolean loadDataVersion() {
String currDataVersionString = null;
try {
byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
if (dataVersion != null && dataVersion.length > 0) {
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
}
kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
} finally {
iterator.close();
}

this.flushOptions = new FlushOptions();
Expand Down Expand Up @@ -103,6 +123,20 @@ public void delete(final byte[] keyBytes) throws Exception {
this.configRocksDBStorage.delete(keyBytes);
}

public void updateKvDataVersion() throws Exception {
kvDataVersion.nextVersion();
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public DataVersion getKvDataVersion() {
return kvDataVersion;
}

public void updateForbidden(String key, String value) throws Exception {
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
}


public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

Expand All @@ -31,14 +31,19 @@

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
}

@Override
Expand All @@ -56,8 +61,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
protected void decodeOffset(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

Expand Down
Loading

0 comments on commit 0dd653f

Please sign in to comment.