Skip to content

Commit

Permalink
[rocksdb] add rocksdb properties (#5588)
Browse files Browse the repository at this point in the history
* [rocksdb] add rocksdb properties

* [rocksdb] spotless apply
  • Loading branch information
cigarl committed Apr 21, 2022
1 parent 254ca05 commit 7af14c9
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 18 deletions.
86 changes: 86 additions & 0 deletions server/src/assembly/resources/conf/schema-rocksdb.properties
@@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

####################
### RocksDB Configuration
####################

# This configuration takes effect only when the schema engine mode is Rocksdb_based.
# The mode is configured in the 'iotdb-engine.properties'(schema_engine_mode=Rocksdb_based).

####################
### Cache Configuration
####################
# A proper cache size can speed up metadata query.You can configure the cache size as required.

# Datatype: long
# LRU block cache size
# Block cache is where RocksDB caches data in memory for reads.
# The block cache stores uncompressed blocks.The default is 20 GB.
# block_cache_size=21474836480

# Datatype: long
# LRU block cache size
# Block cache is where RocksDB caches data in memory for reads.
# The block cache stores compressed blocks.The default is 10 GB.
# block_cache_compressed_size=10737418240

####################
### Professional Configuration
####################
# The following configuration items may serve professional personnel.
# If there is no special requirement, you can use the default value.

# Datatype: long
# The size of each block. The default is 4 KB.
# block_size=4096

# Datatype: int
# The block size deviation.The default is 5.
# block_size_deviation=5

# Datatype: long
# This is the maximum write buffer size used throughout the database.
# It represents the amount of data to build up in memory (backed by an unsorted log on disk)
# before converting to a sorted on-disk file. The default is 64 KB.
# write_buffer_size=65536

# Datatype: long
# This option applies to the whole database only.
# max_total_wal_size=65536

# Datatype: int
# The max number of background job,it contains compaction tasks and flush tasks.
# The default is 10.
# max_background_job_num=10

# Datatype: double
# The Bloom Filter uses a number of bits for each key,which yields a filter with false positive rate.
# The default is 64.
# bloom_filter_policy=64

# Datatype: int
# Record the interval size of Restart points.The default is 10.
# block_restart_interval=10

# Datatype: int
# The maximum number of memtables, both active and immutable.
# If the active memtable fills up and the total number of memtables is larger
# than max_write_buffer_number rocksdb stall further writes.
# max_write_buffer_num=6
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConfLoader;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaRegion;
import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
Expand All @@ -50,9 +51,11 @@ public class SchemaEngine {

private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
private SchemaEngineMode schemaRegionStoredMode;
private RSchemaConfLoader rSchemaConfLoader;
private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);

private static class SchemaEngineManagerHolder {

private static final SchemaEngine INSTANCE = new SchemaEngine();

private SchemaEngineManagerHolder() {}
Expand Down Expand Up @@ -147,7 +150,9 @@ public synchronized void createSchemaRegion(
new SchemaRegionSchemaFileImpl(storageGroup, schemaRegionId, storageGroupMNode);
break;
case Rocksdb_based:
schemaRegion = new RSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
schemaRegion =
new RSchemaRegion(
storageGroup, schemaRegionId, storageGroupMNode, loadRocksdbConfFile());
break;
default:
throw new UnsupportedOperationException(
Expand All @@ -162,4 +167,11 @@ public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataExc
schemaRegionMap.get(schemaRegionId).deleteSchemaRegion();
schemaRegionMap.remove(schemaRegionId);
}

private RSchemaConfLoader loadRocksdbConfFile() {
if (rSchemaConfLoader == null) {
rSchemaConfLoader = new RSchemaConfLoader();
}
return rSchemaConfLoader;
}
}
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.schemaregion.rocksdb;

import org.apache.iotdb.commons.conf.IoTDBConstant;

import org.rocksdb.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class RSchemaConfLoader {

private int maxBackgroundJobs = 10;
private int blockSizeDeviation = 5;
private int blockRestartInterval = 10;
private int maxWriteBufferNumber = 6;

private double bloomFilterPolicy = 64;

private long blockSize = 4 * SizeUnit.KB;
private long writeBufferSize = 64 * SizeUnit.KB;
private long maxTotalWalSize = 64 * SizeUnit.KB;
private long blockCache = 20L * 1024 * 1024 * 1024;
private long blockCacheCompressed = 10L * 1024 * 1024 * 1024;

private static final String ROCKSDB_CONFIG_FILE_NAME = "schema-rocksdb.properties";
private static final Logger logger = LoggerFactory.getLogger(RSchemaConfLoader.class);

public RSchemaConfLoader() {
loadProperties();
}

private void loadProperties() {
String iotdbHomePath = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
String rocksdbConfigPath =
iotdbHomePath + File.separatorChar + "conf" + File.separatorChar + ROCKSDB_CONFIG_FILE_NAME;
try (InputStream in = new BufferedInputStream(new FileInputStream(rocksdbConfigPath))) {
Properties properties = new Properties();
properties.load(in);
setBlockCache(
Long.parseLong(properties.getProperty("block_cache_size", Long.toString(blockCache))));
setBlockCacheCompressed(
Long.parseLong(
properties.getProperty(
"block_cache_compressed_size", Long.toString(blockCacheCompressed))));
setBlockSize(Long.parseLong(properties.getProperty("block_size", Long.toString(blockSize))));
setWriteBufferSize(
Long.parseLong(
properties.getProperty("write_buffer_size", Long.toString(writeBufferSize))));
setMaxTotalWalSize(
Long.parseLong(
properties.getProperty("max_total_wal_size", Long.toString(maxTotalWalSize))));
setMaxBackgroundJobs(
Integer.parseInt(
properties.getProperty(
"max_background_job_num", Integer.toString(maxBackgroundJobs))));
setBlockSizeDeviation(
Integer.parseInt(
properties.getProperty(
"block_size_deviation", Integer.toString(blockSizeDeviation))));
setBlockRestartInterval(
Integer.parseInt(
properties.getProperty(
"block_restart_interval", Integer.toString(blockRestartInterval))));
setMaxWriteBufferNumber(
Integer.parseInt(
properties.getProperty(
"max_write_buffer_num", Integer.toString(maxWriteBufferNumber))));
setBloomFilterPolicy(
Double.parseDouble(
properties.getProperty("bloom_filter_policy", Double.toString(bloomFilterPolicy))));
} catch (FileNotFoundException e) {
logger.warn("Fail to find rocksdb config file {}", rocksdbConfigPath, e);
} catch (IOException e) {
logger.warn("Cannot load rocksdb config file, use default configuration", e);
}
}

public long getBlockCache() {
return blockCache;
}

private void setBlockCache(long blockCache) {
this.blockCache = blockCache;
}

public long getBlockCacheCompressed() {
return blockCacheCompressed;
}

private void setBlockCacheCompressed(long blockCacheCompressed) {
this.blockCacheCompressed = blockCacheCompressed;
}

public long getWriteBufferSize() {
return writeBufferSize;
}

private void setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public long getMaxTotalWalSize() {
return maxTotalWalSize;
}

private void setMaxTotalWalSize(long maxTotalWalSize) {
this.maxTotalWalSize = maxTotalWalSize;
}

public int getMaxBackgroundJobs() {
return maxBackgroundJobs;
}

private void setMaxBackgroundJobs(int maxBackgroundJobs) {
this.maxBackgroundJobs = maxBackgroundJobs;
}

public double getBloomFilterPolicy() {
return bloomFilterPolicy;
}

private void setBloomFilterPolicy(double bloomFilterPolicy) {
this.bloomFilterPolicy = bloomFilterPolicy;
}

public int getBlockSizeDeviation() {
return blockSizeDeviation;
}

private void setBlockSizeDeviation(int blockSizeDeviation) {
this.blockSizeDeviation = blockSizeDeviation;
}

public int getBlockRestartInterval() {
return blockRestartInterval;
}

private void setBlockRestartInterval(int blockRestartInterval) {
this.blockRestartInterval = blockRestartInterval;
}

public int getMaxWriteBufferNumber() {
return maxWriteBufferNumber;
}

private void setMaxWriteBufferNumber(int maxWriteBufferNumber) {
this.maxWriteBufferNumber = maxWriteBufferNumber;
}

public long getBlockSize() {
return blockSize;
}

private void setBlockSize(long blockSize) {
this.blockSize = blockSize;
}
}
Expand Up @@ -49,7 +49,6 @@
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.rocksdb.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,11 +91,10 @@ public class RSchemaReadWriteHandler {

public static final String ROCKSDB_PATH = config.getSystemDir() + File.separator + ROCKSDB_FOLDER;

private static final long BLOCK_CACHE = 20L * 1024 * 1024 * 1024;
private static final long BLOCK_CACHE_COMPRESSED = 10L * 1024 * 1024 * 1024;

private RocksDB rocksDB;

private RSchemaConfLoader rSchemaConfLoader;

ConcurrentMap<String, ColumnFamilyHandle> columnFamilyHandleMap = new ConcurrentHashMap<>();
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
Expand All @@ -105,7 +103,9 @@ public class RSchemaReadWriteHandler {
RocksDB.loadLibrary();
}

public RSchemaReadWriteHandler(String path) throws RocksDBException {
public RSchemaReadWriteHandler(String path, RSchemaConfLoader schemaConfLoader)
throws RocksDBException {
this.rSchemaConfLoader = schemaConfLoader;
initReadWriteHandler(path);
}

Expand All @@ -121,23 +121,24 @@ private void initReadWriteHandler(String path) throws RocksDBException {
options
.setCreateIfMissing(true)
.setAllowMmapReads(true)
.setWriteBufferSize(64 * SizeUnit.KB)
.setMaxWriteBufferNumber(6)
.setMaxBackgroundJobs(10)
.setWriteBufferSize(rSchemaConfLoader.getWriteBufferSize())
.setMaxWriteBufferNumber(rSchemaConfLoader.getMaxWriteBufferNumber())
.setMaxBackgroundJobs(rSchemaConfLoader.getMaxBackgroundJobs())
.setStatistics(new Statistics())
.setLogger(rocksDBLogger);

final Filter bloomFilter = new BloomFilter(64);
final Filter bloomFilter = new BloomFilter(rSchemaConfLoader.getBloomFilterPolicy());

final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
Cache cache = new LRUCache(BLOCK_CACHE, 6);
Cache cache = new LRUCache(rSchemaConfLoader.getBlockCache(), 6);
tableOptions
.setBlockCache(cache)
.setFilterPolicy(bloomFilter)
.setBlockSizeDeviation(5)
.setBlockRestartInterval(10)
.setBlockSizeDeviation(rSchemaConfLoader.getBlockSizeDeviation())
.setBlockSize(rSchemaConfLoader.getBlockSize())
.setBlockRestartInterval(rSchemaConfLoader.getBlockRestartInterval())
.setCacheIndexAndFilterBlocks(true)
.setBlockCacheCompressed(new LRUCache(BLOCK_CACHE_COMPRESSED, 6));
.setBlockCacheCompressed(new LRUCache(rSchemaConfLoader.getBlockCacheCompressed(), 6));

options.setTableFormatConfig(tableOptions);

Expand Down
Expand Up @@ -160,13 +160,16 @@ public RSchemaRegion() throws MetadataException {
}

public RSchemaRegion(
PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
PartialPath storageGroup,
SchemaRegionId schemaRegionId,
IStorageGroupMNode storageGroupMNode,
RSchemaConfLoader rSchemaConfLoader)
throws MetadataException {
this.schemaRegionId = schemaRegionId;
storageGroupFullPath = storageGroup.getFullPath();
init(storageGroupMNode);
try {
readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath);
readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath, rSchemaConfLoader);
} catch (RocksDBException e) {
logger.error("create RocksDBReadWriteHandler fail", e);
throw new MetadataException(e);
Expand Down

0 comments on commit 7af14c9

Please sign in to comment.