Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ public class PopConsumerRocksdbStore extends AbstractRocksDBStorage implements P
private WriteOptions writeOptions;
private WriteOptions deleteOptions;
protected ColumnFamilyHandle columnFamilyHandle;
private final long blockCacheSize;
private final long writeBufferSize;

public PopConsumerRocksdbStore(String filePath) {
public PopConsumerRocksdbStore(String filePath, long blockCacheSize, long writeBufferSize) {
super(filePath);
this.blockCacheSize = blockCacheSize;
this.writeBufferSize = writeBufferSize;
}

// https://www.cnblogs.com/renjc/p/rocksdb-class-db.html
Expand Down Expand Up @@ -83,8 +87,8 @@ protected boolean postLoad() {
initOptions();

// init column family here
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions();
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions();
ColumnFamilyOptions defaultOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
ColumnFamilyOptions popStateOptions = RocksDBOptionsFactory.createPopCFOptions(blockCacheSize, writeBufferSize);
this.cfOptions.add(defaultOptions);
this.cfOptions.add(popStateOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public PopConsumerService(BrokerController brokerController) {
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
this.consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString());
brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString(),
brokerController.getMessageStoreConfig().getPopRocksdbBlockCacheSize(),
brokerController.getMessageStoreConfig().getPopRocksdbWriteBufferSize());
this.popConsumerCache = brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache(
brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.rocksdb.util.SizeUnit;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
Expand Down Expand Up @@ -65,7 +66,8 @@ public static PopConsumerRecord getConsumerRecord() {
@Test
public void rocksdbStoreWriteDeleteTest() {
String filePath = getRandomStorePath();
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath);
PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(
filePath, 256 * SizeUnit.MB, 32 * SizeUnit.MB);
Assert.assertEquals(filePath, consumerStore.getFilePath());

consumerStore.start();
Expand Down Expand Up @@ -127,7 +129,8 @@ private long getDirectorySizeRecursive(File directory) {
@Ignore
@SuppressWarnings("ConstantValue")
public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException {
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath());
PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(
getRandomStorePath(), 256 * SizeUnit.MB, 32 * SizeUnit.MB);
rocksdbStore.start();

int iterCount = 1000 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ public class MessageStoreConfig {

private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName();

private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;

private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB;

/**
* Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
*/
Expand All @@ -531,6 +535,22 @@ public void setRocksdbCompressionType(String compressionType) {
this.rocksdbCompressionType = compressionType;
}

public long getPopRocksdbBlockCacheSize() {
return popRocksdbBlockCacheSize;
}

public void setPopRocksdbBlockCacheSize(long popRocksdbBlockCacheSize) {
this.popRocksdbBlockCacheSize = popRocksdbBlockCacheSize;
}

public long getPopRocksdbWriteBufferSize() {
return popRocksdbWriteBufferSize;
}

public void setPopRocksdbWriteBufferSize(long popRocksdbWriteBufferSize) {
this.popRocksdbWriteBufferSize = popRocksdbWriteBufferSize;
}

/**
* Spin number in the retreat strategy of spin lock
* Default is 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static ColumnFamilyOptions createOffsetCFOptions() {
setInplaceUpdateSupport(true);
}

public static ColumnFamilyOptions createPopCFOptions() {
public static ColumnFamilyOptions createPopCFOptions(long blockCacheSize, long writeBufferSize) {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig()
.setFormatVersion(5)
.setIndexType(IndexType.kBinarySearch)
Expand All @@ -145,7 +145,7 @@ public static ColumnFamilyOptions createPopCFOptions() {
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(false)
.setPinTopLevelIndexAndFilter(true)
.setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false))
.setBlockCache(new LRUCache(blockCacheSize, 8, false))
.setWholeKeyFiltering(true);

CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal()
Expand All @@ -160,7 +160,7 @@ public static ColumnFamilyOptions createPopCFOptions() {
//noinspection resource
return new ColumnFamilyOptions()
.setMaxWriteBufferNumber(4)
.setWriteBufferSize(128 * SizeUnit.MB)
.setWriteBufferSize(writeBufferSize)
.setMinWriteBufferNumberToMerge(1)
.setTableFormatConfig(blockBasedTableConfig)
.setMemTableConfig(new SkipListMemTableConfig())
Expand Down
Loading