Skip to content

Commit

Permalink
merge: #12475
Browse files Browse the repository at this point in the history
12475: Introduce cache for process versions r=megglos a=Zelldon

## Description

When executing a process instance we often have to get the process model and related version to it.

When running a cluster for a while (creating a lot of state etc.) the process version will eventually be migrated to a low level of RocksDB (potentially L3), because most of the time process models are not deployed that often. In other words, if a key is not updated it will be moved to a lower level by RocksDB. Accessing lower levels of RocksDB is slower than accessing higher levels or mem tables.

You might ask why is it slow, even if we repeatedly access via RocksDB, why is it not in the cache? There are multiple reasons for it. 

1. We only have caches for L0, and L1 configured (not for lower levels)
2. We have limited the cache sizes to a certain amount which might cause continuous eviction

In order to avoid running into issues with cold data, which is mostly static data, we can introduce our own caches, to work around this. This allows us to avoid unnecessary RocksDB access, unnecessary io, etc.

**This PR does the following:**

 * Refactors the NextValueManager, including renaming as its only purpose is to be used for ProcessVersions
 * Introduce a new cache for the version of each process, in order to avoid access to cold data.

 **Performance:**

We run again a JMH benchmark with the changes and can see that the performance _slightly_ increased (potentially 8%), not significant but it will likely come into play with other changes later.

[See more details here ](#12241 (comment))
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #12034



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Apr 20, 2023
2 parents 9743a0a + 743bec2 commit 59d5639
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class DbProcessState implements MutableProcessState {
private final ColumnFamily<DbForeignKey<DbString>, Digest> digestByIdColumnFamily;
private final Digest digest = new Digest();

private final NextValueManager versionManager;
private final ProcessVersionManager versionManager;

public DbProcessState(
final ZeebeDb<ZbColumnFamilies> zeebeDb, final TransactionContext transactionContext) {
Expand Down Expand Up @@ -101,9 +101,7 @@ public DbProcessState(

processesByKey = new Long2ObjectHashMap<>();

versionManager =
new NextValueManager(
DEFAULT_VERSION_VALUE, zeebeDb, transactionContext, ZbColumnFamilies.PROCESS_VERSION);
versionManager = new ProcessVersionManager(DEFAULT_VERSION_VALUE, zeebeDb, transactionContext);
}

@Override
Expand Down Expand Up @@ -151,11 +149,11 @@ private void updateLatestVersion(final ProcessRecord processRecord) {
processId.wrapBuffer(processRecord.getBpmnProcessIdBuffer());
final var bpmnProcessId = processRecord.getBpmnProcessId();

final var currentVersion = versionManager.getCurrentValue(bpmnProcessId);
final var currentVersion = versionManager.getCurrentProcessVersion(bpmnProcessId);
final var nextVersion = processRecord.getVersion();

if (nextVersion > currentVersion) {
versionManager.setValue(bpmnProcessId, nextVersion);
versionManager.setProcessVersion(bpmnProcessId, nextVersion);
}
}

Expand Down Expand Up @@ -218,7 +216,7 @@ public DeployedProcess getLatestProcessVersionByProcessId(final DirectBuffer pro
processesByProcessIdAndVersion.get(processIdBuffer);

processId.wrapBuffer(processIdBuffer);
final long latestVersion = versionManager.getCurrentValue(processIdBuffer);
final long latestVersion = versionManager.getCurrentProcessVersion(processIdBuffer);

DeployedProcess deployedProcess;
if (versionMap == null) {
Expand Down Expand Up @@ -285,7 +283,7 @@ public DirectBuffer getLatestVersionDigest(final DirectBuffer processIdBuffer) {

@Override
public int getProcessVersion(final String bpmnProcessId) {
return (int) versionManager.getCurrentValue(bpmnProcessId);
return (int) versionManager.getCurrentProcessVersion(bpmnProcessId);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.deployment;

import io.camunda.zeebe.db.ColumnFamily;
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import java.util.function.ToLongFunction;
import org.agrona.DirectBuffer;
import org.agrona.collections.Object2LongHashMap;

public final class ProcessVersionManager {

private final long initialValue;

private final ColumnFamily<DbString, NextValue> nextValueColumnFamily;
private final DbString processIdKey;
private final NextValue nextVersion = new NextValue();
private final Object2LongHashMap<String> versionCache;

public ProcessVersionManager(
final long initialValue,
final ZeebeDb<ZbColumnFamilies> zeebeDb,
final TransactionContext transactionContext) {
this.initialValue = initialValue;

processIdKey = new DbString();
nextValueColumnFamily =
zeebeDb.createColumnFamily(
ZbColumnFamilies.PROCESS_VERSION, transactionContext, processIdKey, nextVersion);
versionCache = new Object2LongHashMap<>(initialValue);
}

public void setProcessVersion(final String processId, final long value) {
processIdKey.wrapString(processId);
nextVersion.set(value);
nextValueColumnFamily.upsert(processIdKey, nextVersion);
versionCache.put(processId, value);
}

public long getCurrentProcessVersion(final String processId) {
processIdKey.wrapString(processId);
return getCurrentProcessVersion();
}

public long getCurrentProcessVersion(final DirectBuffer processId) {
processIdKey.wrapBuffer(processId);
return getCurrentProcessVersion();
}

private long getCurrentProcessVersion() {
return versionCache.computeIfAbsent(
processIdKey.toString(), (ToLongFunction<String>) (key) -> getProcessVersionFromDB());
}

private long getProcessVersionFromDB() {
final NextValue readValue = nextValueColumnFamily.get(processIdKey);

long currentValue = initialValue;
if (readValue != null) {
currentValue = readValue.get();
}
return currentValue;
}
}

0 comments on commit 59d5639

Please sign in to comment.