Skip to content

Commit

Permalink
merge: #12630 #12645
Browse files Browse the repository at this point in the history
12630: [Backport 8.0]: Introduce experimental SST partitioning r=remcowesterhoud a=Zelldon

## Description
Backports  #12483
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #12033



12645: [Backport 8.0]: Restore blacklist metric r=remcowesterhoud a=Zelldon

## Description

Backports #12606
<!-- Please explain the changes you made here. -->

The PR https://github.com/camunda/zeebe/pull/12306/files wasn't backported to 8.0, which caused some conflicts. I had to add the onRecovered method and call it in the ZeebeDbState.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #8263



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Christopher Kujawa (Zell) <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed May 3, 2023
3 parents f16d221 + cd66e83 + 1b89679 commit fd2076a
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class RocksdbCfg implements ConfigurationEntry {
private int ioRateBytesPerSecond = RocksDbConfiguration.DEFAULT_IO_RATE_BYTES_PER_SECOND;
private boolean disableWal = RocksDbConfiguration.DEFAULT_WAL_DISABLED;

private boolean enableSstPartitioning = RocksDbConfiguration.DEFAULT_SST_PARTITIONING_ENABLED;

@Override
public void init(final BrokerCfg globalConfig, final String brokerBase) {
if (columnFamilyOptions == null) {
Expand Down Expand Up @@ -110,6 +112,14 @@ public void setDisableWal(final boolean disableWal) {
this.disableWal = disableWal;
}

public boolean isEnableSstPartitioning() {
return enableSstPartitioning;
}

public void setEnableSstPartitioning(final boolean enableSstPartitioning) {
this.enableSstPartitioning = enableSstPartitioning;
}

public RocksDbConfiguration createRocksDbConfiguration() {
return new RocksDbConfiguration()
.setColumnFamilyOptions(columnFamilyOptions)
Expand All @@ -119,7 +129,8 @@ public RocksDbConfiguration createRocksDbConfiguration() {
.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
.setStatisticsEnabled(enableStatistics)
.setIoRateBytesPerSecond(ioRateBytesPerSecond)
.setWalDisabled(disableWal);
.setWalDisabled(disableWal)
.setSstPartitioningEnabled(enableSstPartitioning);
}

@Override
Expand All @@ -141,6 +152,8 @@ public String toString() {
+ ioRateBytesPerSecond
+ ", disableWal="
+ disableWal
+ ", enableSstPartitioning="
+ enableSstPartitioning
+ '}';
}

Expand Down
8 changes: 8 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,14 @@
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_ROCKSDB_DISABLEWAL
# disableWal: false

# Configures if the RocksDB SST files should be partitioned based on some virtual column families.
# By default RocksDB will not partition the SST files, which might have influence on the compacting of certain key ranges.
# Enabling this option gives RocksDB some good hints how to improve compaction and reduce the write amplification.
# Benchmarks have show impressive results allowing to sustain performance on larger states, but it is not yet 100% clear what implications it else has except
# increasing the file count of runtime and snapshots.
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_ROCKSDB_ENABLESSTPARTITIONING
# enableSstPartitioning: false

# consistencyChecks:
# Configures if the basic operations on RocksDB, such as inserting or deleting key-value pairs, should check preconditions,
# for example that a key does not already exist when inserting.
Expand Down
8 changes: 8 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_ROCKSDB_DISABLEWAL
# disableWal: false

# Configures if the RocksDB SST files should be partitioned based on some virtual column families.
# By default RocksDB will not partition the SST files, which might have influence on the compacting of certain key ranges.
# Enabling this option gives RocksDB some good hints how to improve compaction and reduce the write amplification.
# Benchmarks have show impressive results allowing to sustain performance on larger states, but it is not yet 100% clear what implications it else has except
# increasing the file count of runtime and snapshots.
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_ROCKSDB_ENABLESSTPARTITIONING
# enableSstPartitioning: false

# consistencyChecks:
# Configures if the basic operations on RocksDB, such as inserting or deleting key-value pairs, should check preconditions,
# for example that a key does not already exist when inserting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
*/
package io.camunda.zeebe.engine.metrics;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

public final class BlacklistMetrics {

private static final Counter BLACKLISTED_INSTANCES_COUNTER =
Counter.build()
private static final Gauge BLACKLISTED_INSTANCES_COUNTER =
Gauge.build()
.namespace("zeebe")
.name("blacklisted_instances_total")
.help("Number of blacklisted instances")
Expand All @@ -28,4 +28,8 @@ public BlacklistMetrics(final int partitionId) {
public void countBlacklistedInstance() {
BLACKLISTED_INSTANCES_COUNTER.labels(partitionIdLabel).inc();
}

public void setBlacklistInstanceCounter(final int counter) {
BLACKLISTED_INSTANCES_COUNTER.labels(partitionIdLabel).set(counter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public ZeebeDbState(
public void onRecovered(final ReadonlyProcessingContext context) {
messageSubscriptionState.onRecovered(context);
processMessageSubscriptionState.onRecovered(context);
blackListState.onRecovered(context);
}

@Override
Expand Down Expand Up @@ -187,19 +188,29 @@ public MutableMigrationState getMigrationState() {
return mutableMigrationState;
}

@Override
public MutablePendingMessageSubscriptionState getPendingMessageSubscriptionState() {
return messageSubscriptionState;
}

@Override
public MutablePendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
return processMessageSubscriptionState;
}

@Override
public KeyGenerator getKeyGenerator() {
return keyGenerator;
}

@Override
public MutablePendingMessageSubscriptionState getPendingMessageSubscriptionState() {
return messageSubscriptionState;
public MutableLastProcessedPositionState getLastProcessedPositionState() {
return lastProcessedPositionState;
}

@Override
public MutablePendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
return processMessageSubscriptionState;
public MutableDecisionState getDecisionState() {
return decisionState;
}

@Override
Expand Down Expand Up @@ -241,14 +252,4 @@ public <KeyType extends DbKey, ValueType extends DbValue> void forEach(
public KeyGeneratorControls getKeyGeneratorControls() {
return keyGenerator;
}

@Override
public MutableLastProcessedPositionState getLastProcessedPositionState() {
return lastProcessedPositionState;
}

@Override
public MutableDecisionState getDecisionState() {
return decisionState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
*/
package io.camunda.zeebe.engine.state.immutable;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;

public interface BlackListState {
public interface BlackListState extends StreamProcessorLifecycleAware {

boolean isOnBlacklist(final TypedRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import io.camunda.zeebe.db.impl.DbNil;
import io.camunda.zeebe.engine.Loggers;
import io.camunda.zeebe.engine.metrics.BlacklistMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.state.mutable.MutableBlackListState;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceRelatedIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRelated;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;

Expand All @@ -46,6 +48,13 @@ public DbBlackListState(
blacklistMetrics = new BlacklistMetrics(partitionId);
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
final var counter = new AtomicInteger(0);
blackListColumnFamily.forEach(ignore -> counter.getAndIncrement());
blacklistMetrics.setBlacklistInstanceCounter(counter.get());
}

private void blacklist(final long key) {
if (key >= 0) {
LOG.warn(BLACKLIST_INSTANCE_MESSAGE, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ public final class RocksDbConfiguration {
public static final int DEFAULT_MIN_WRITE_BUFFER_NUMBER_TO_MERGE = 3;
public static final boolean DEFAULT_STATISTICS_ENABLED = false;
public static final boolean DEFAULT_WAL_DISABLED = false;

/**
* This is an experimental feature, it is not 100% clear yet what the implications are besides
* having much better performance (shown in several benchmarks) and generating more SST files.
*
* <p>There will be files created for each virtual colum family.
*/
public static final boolean DEFAULT_SST_PARTITIONING_ENABLED = false;

public static final int DEFAULT_IO_RATE_BYTES_PER_SECOND = 0;

private Properties columnFamilyOptions = new Properties();
Expand All @@ -26,6 +35,8 @@ public final class RocksDbConfiguration {
private int minWriteBufferNumberToMerge = DEFAULT_MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
private boolean walDisabled = DEFAULT_WAL_DISABLED;

private boolean sstPartitioningEnabled = DEFAULT_SST_PARTITIONING_ENABLED;

/**
* Defines how many files are kept open by RocksDB, per default it is unlimited (-1). This is done
* for performance reasons, if we set a value higher then zero it needs to keep track of open
Expand Down Expand Up @@ -120,4 +131,13 @@ public RocksDbConfiguration setWalDisabled(final boolean walDisabled) {
this.walDisabled = walDisabled;
return this;
}

public boolean isSstPartitioningEnabled() {
return sstPartitioningEnabled;
}

public RocksDbConfiguration setSstPartitioningEnabled(final boolean sstPartitioningEnabled) {
this.sstPartitioningEnabled = sstPartitioningEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.rocksdb.RateLimiter;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstPartitionerFixedPrefixFactory;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.TableFormatConfig;
Expand Down Expand Up @@ -181,6 +182,11 @@ private ColumnFamilyOptions createDefaultColumnFamilyOptions(

final var tableConfig = createTableFormatConfig(closeables, blockCacheMemory);

if (rocksDbConfiguration.isSstPartitioningEnabled()) {
columnFamilyOptions.setSstPartitionerFactory(
new SstPartitionerFixedPrefixFactory(Long.BYTES));
}

return columnFamilyOptions
// to extract our column family type (used as prefix) and seek faster
.useFixedLengthPrefixExtractor(Long.BYTES)
Expand Down

0 comments on commit fd2076a

Please sign in to comment.