Skip to content

Commit

Permalink
feat: allow to decode keys
Browse files Browse the repository at this point in the history
For certain CF we can register decoder functions which allow to decode the keys and print them correctly.
Already done for DEFAULT and KEY
  • Loading branch information
Zelldon committed Jul 10, 2023
1 parent 1fa8344 commit 27636a1
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.zell.zdb.state

import io.camunda.zeebe.db.impl.DbLong
import io.camunda.zeebe.db.impl.DbString
import io.camunda.zeebe.db.impl.ZeebeDbConstants
import io.camunda.zeebe.protocol.ZbColumnFamilies
import org.agrona.concurrent.UnsafeBuffer
import java.util.*

typealias Decoder = (ByteArray) -> String
class ColumnFamilyKeyDecoder {


var cfDecoders: EnumMap<ZbColumnFamilies, Decoder> = EnumMap(ZbColumnFamilies::class.java)

init {
cfDecoders[ZbColumnFamilies.DEFAULT] = ::dbStringToStringDecoder
cfDecoders[ZbColumnFamilies.KEY] = ::dbStringToStringDecoder
}

fun decodeColumnFamilyKey(cf: ZbColumnFamilies, key: ByteArray) : String {
val function = cfDecoders[cf]
return function?.let {
function(key)
} ?: HexFormat.ofDelimiter(" ").formatHex(key)
}

private fun dbLongToStringDecoder(bytes : ByteArray) : String {
val dbLong = DbLong()
dbLong.wrap(UnsafeBuffer(bytes), Long.SIZE_BYTES, bytes.size)
return dbLong.value.toString()
}

private fun dbStringToStringDecoder(bytes : ByteArray) : String {
val dbString = DbString()
dbString.wrap(UnsafeBuffer(bytes), Long.SIZE_BYTES, bytes.size)
return dbString.toString()
}
}
10 changes: 5 additions & 5 deletions backend/src/main/kotlin/io/zell/zdb/state/Experimental.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ import java.util.*
class Experimental(private var rocksDb: RocksDB) {
constructor(statePath: Path) : this(OptimisticTransactionDB.openReadOnly(statePath.toString()))


fun interface Visitor {
fun visit(cf: ZbColumnFamilies, key: ByteArray, value: ByteArray)
fun visit(cf: ZbColumnFamilies, key: String, value: ByteArray)
}

fun interface JsonVisitor {
fun visit(cf: ZbColumnFamilies, key: ByteArray, valueJson: String)
fun visit(cf: ZbColumnFamilies, key: String, valueJson: String)
}

fun visitDB(visitor: Visitor) {
rocksDb.newIterator(rocksDb.defaultColumnFamily, ReadOptions()).use {
val columnFamilyKeyDecoder = ColumnFamilyKeyDecoder()
rocksDb.newIterator(rocksDb.defaultColumnFamily, ReadOptions()).use {
it.seekToFirst()
while (it.isValid) {
val key: ByteArray = it.key()
val value: ByteArray = it.value()
val unsafeBuffer = UnsafeBuffer(key)
val enumValue = unsafeBuffer.getLong(0, ZeebeDbConstants.ZB_DB_BYTE_ORDER)
val cf = ZbColumnFamilies.values()[enumValue.toInt()]
visitor.visit(cf, key, value)
visitor.visit(cf, columnFamilyKeyDecoder.decodeColumnFamilyKey(cf, key), value)
it.next()
}
}
Expand Down
28 changes: 23 additions & 5 deletions backend/src/test/kotlin/ZeebeStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,36 @@ public void shouldCreateStatsForCompleteState() {
public void shouldVisitValuesAsJson() {
// given
final var experimental = new Experimental(ZeebePaths.Companion.getRuntimePath(tempDir, "1"));
final var incidentMap = new HashMap<String, String>();
HashMap<ZbColumnFamilies, HashMap<String, String>> columnFamiliesMap = new HashMap<>();
Experimental.JsonVisitor jsonVisitor = (cf, k, v) -> {
if (cf == ZbColumnFamilies.INCIDENTS) {
incidentMap.put(new String(k), v);
}
final var cfMap = columnFamiliesMap.computeIfAbsent(cf, (columnFamily) -> new HashMap<>());
cfMap.put(k, v);
};

// when
experimental.visitDBWithJsonValues(jsonVisitor);

// then
assertThat(incidentMap).containsValue("{\"incidentRecord\":{\"errorType\":\"IO_MAPPING_ERROR\",\"errorMessage\":\"failed to evaluate expression '{bar:foo}': no variable found for name 'foo'\",\"bpmnProcessId\":\"process\",\"processDefinitionKey\":2251799813685249,\"processInstanceKey\":2251799813685251,\"elementId\":\"incidentTask\",\"elementInstanceKey\":2251799813685260,\"jobKey\":-1,\"variableScopeKey\":2251799813685260}}");
assertThat(columnFamiliesMap.get(ZbColumnFamilies.INCIDENTS)).containsValue("{\"incidentRecord\":{\"errorType\":\"IO_MAPPING_ERROR\",\"errorMessage\":\"failed to evaluate expression '{bar:foo}': no variable found for name 'foo'\",\"bpmnProcessId\":\"process\",\"processDefinitionKey\":2251799813685249,\"processInstanceKey\":2251799813685251,\"elementId\":\"incidentTask\",\"elementInstanceKey\":2251799813685260,\"jobKey\":-1,\"variableScopeKey\":2251799813685260}}");
assertThat(columnFamiliesMap.get(ZbColumnFamilies.INCIDENT_PROCESS_INSTANCES)).containsValue("{\"key\":2251799813685263}");
}

@Test
public void shouldDecodeKeyIfPossible() {
// given
final var experimental = new Experimental(ZeebePaths.Companion.getRuntimePath(tempDir, "1"));
HashMap<ZbColumnFamilies, HashMap<String, String>> columnFamiliesMap = new HashMap<>();
Experimental.JsonVisitor jsonVisitor = (cf, k, v) -> {
final var cfMap = columnFamiliesMap.computeIfAbsent(cf, (columnFamily) -> new HashMap<>());
cfMap.put(k, v);
};

// when
experimental.visitDBWithJsonValues(jsonVisitor);

// then
assertThat(columnFamiliesMap.get(ZbColumnFamilies.DEFAULT)).containsKey("LAST_PROCESSED_EVENT_KEY");
assertThat(columnFamiliesMap.get(ZbColumnFamilies.KEY)).containsKey("latestKey");
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion cli/src/main/java/io/zell/zdb/StateCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public int list(
}
System.out.printf(
"\n{\"cf\":\"%s\",\"key\":\"%s\",\"value\":%s}",
cf, HexFormat.ofDelimiter(" ").formatHex(key), valueJson);
cf, key, valueJson);
}
}));
System.out.print("]}");
Expand Down

0 comments on commit 27636a1

Please sign in to comment.