-
Notifications
You must be signed in to change notification settings - Fork 561
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
13121: Performance test for large state and stable performance r=Zelldon a=Zelldon ## Description `@oleschoenburg` I created this PR in order to get the first increment merged and to not overwhelm you with more changes. Most of the changes here are refactorings and preparation for the JMH test, plus of course the JMH test itself. I applied already several things we have discussed, like creation of large state in setup phase, keeping the test in engine module etc. In an upcoming PR I will execute the JMH benchmark inside a unit test, which is executed in a separate CI job, please see [related comment](#12241 (comment)). <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> related to #12241 Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
- Loading branch information
Showing
20 changed files
with
575 additions
and
281 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
engine/src/test/java/io/camunda/zeebe/engine/perf/EngineLargeStatePerformanceTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under | ||
* one or more contributor license agreements. See the NOTICE file distributed | ||
* with this work for additional information regarding copyright ownership. | ||
* Licensed under the Zeebe Community License 1.1. You may not use this file | ||
* except in compliance with the Zeebe Community License 1.1. | ||
*/ | ||
package io.camunda.zeebe.engine.perf; | ||
|
||
import io.camunda.zeebe.engine.perf.TestEngine.TestContext; | ||
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient; | ||
import io.camunda.zeebe.model.bpmn.Bpmn; | ||
import io.camunda.zeebe.protocol.record.Record; | ||
import io.camunda.zeebe.protocol.record.intent.JobIntent; | ||
import io.camunda.zeebe.protocol.record.value.JobRecordValue; | ||
import io.camunda.zeebe.scheduler.ActorScheduler; | ||
import io.camunda.zeebe.scheduler.clock.DefaultActorClock; | ||
import io.camunda.zeebe.test.util.AutoCloseableRule; | ||
import io.camunda.zeebe.test.util.record.RecordingExporter; | ||
import java.io.IOException; | ||
import java.util.concurrent.TimeUnit; | ||
import org.junit.rules.TemporaryFolder; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
import org.openjdk.jmh.annotations.Setup; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.TearDown; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) | ||
@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS) | ||
@Fork(value = 1) | ||
@BenchmarkMode(Mode.Throughput) | ||
@OutputTimeUnit(TimeUnit.SECONDS) | ||
@State(org.openjdk.jmh.annotations.Scope.Benchmark) | ||
public class EngineLargeStatePerformanceTest { | ||
public static final Logger LOG = | ||
LoggerFactory.getLogger(EngineLargeStatePerformanceTest.class.getName()); | ||
|
||
private long count; | ||
private ProcessInstanceClient processInstanceClient; | ||
private TestEngine.TestContext testContext; | ||
|
||
@Setup | ||
public void setup() throws Throwable { | ||
testContext = createTestContext(); | ||
|
||
final var singlePartitionEngine = TestEngine.createSinglePartitionEngine(testContext); | ||
|
||
setupState(singlePartitionEngine); | ||
} | ||
|
||
/** Will build up a state for the large state performance test */ | ||
private void setupState(final TestEngine singlePartitionEngine) { | ||
singlePartitionEngine | ||
.createDeploymentClient() | ||
.withXmlResource( | ||
Bpmn.createExecutableProcess("process") | ||
.startEvent() | ||
.serviceTask("task", (t) -> t.zeebeJobType("task").done()) | ||
.endEvent() | ||
.done()) | ||
.deploy(); | ||
|
||
processInstanceClient = singlePartitionEngine.createProcessInstanceClient(); | ||
|
||
for (int i = 0; i < 200_000; i++) { | ||
processInstanceClient.ofBpmnProcessId("process").create(); | ||
count++; | ||
RecordingExporter.reset(); | ||
} | ||
|
||
LOG.info("Started {} process instances", count); | ||
} | ||
|
||
private TestEngine.TestContext createTestContext() throws IOException { | ||
final var autoCloseableRule = new AutoCloseableRule(); | ||
final var temporaryFolder = new TemporaryFolder(); | ||
temporaryFolder.create(); | ||
|
||
// scheduler | ||
final var builder = | ||
ActorScheduler.newActorScheduler() | ||
.setCpuBoundActorThreadCount(2) | ||
.setIoBoundActorThreadCount(2) | ||
.setActorClock(new DefaultActorClock()); | ||
|
||
final var actorScheduler = builder.build(); | ||
autoCloseableRule.manage(actorScheduler); | ||
actorScheduler.start(); | ||
return new TestContext(actorScheduler, temporaryFolder, autoCloseableRule); | ||
} | ||
|
||
@TearDown | ||
public void tearDown() { | ||
LOG.info("Started {} process instances", count); | ||
testContext.autoCloseableRule().after(); | ||
} | ||
|
||
@Benchmark | ||
public Record<?> measureProcessExecutionTime() { | ||
final long piKey = processInstanceClient.ofBpmnProcessId("process").create(); | ||
|
||
final Record<JobRecordValue> task = | ||
RecordingExporter.jobRecords() | ||
.withIntent(JobIntent.CREATED) | ||
.withType("task") | ||
.withProcessInstanceKey(piKey) | ||
.getFirst(); | ||
|
||
count++; | ||
RecordingExporter.reset(); | ||
return task; | ||
} | ||
} |
109 changes: 109 additions & 0 deletions
109
engine/src/test/java/io/camunda/zeebe/engine/perf/TestEngine.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under | ||
* one or more contributor license agreements. See the NOTICE file distributed | ||
* with this work for additional information regarding copyright ownership. | ||
* Licensed under the Zeebe Community License 1.1. You may not use this file | ||
* except in compliance with the Zeebe Community License 1.1. | ||
*/ | ||
package io.camunda.zeebe.engine.perf; | ||
|
||
import io.camunda.zeebe.engine.processing.EngineProcessors; | ||
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; | ||
import io.camunda.zeebe.engine.processing.streamprocessor.JobStreamer; | ||
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory; | ||
import io.camunda.zeebe.engine.util.ProcessingExporterTransistor; | ||
import io.camunda.zeebe.engine.util.StreamProcessingComposite; | ||
import io.camunda.zeebe.engine.util.TestInterPartitionCommandSender; | ||
import io.camunda.zeebe.engine.util.TestStreams; | ||
import io.camunda.zeebe.engine.util.client.DeploymentClient; | ||
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient; | ||
import io.camunda.zeebe.scheduler.ActorScheduler; | ||
import io.camunda.zeebe.stream.impl.StreamProcessorMode; | ||
import io.camunda.zeebe.test.util.AutoCloseableRule; | ||
import io.camunda.zeebe.util.FeatureFlags; | ||
import java.util.ArrayList; | ||
import java.util.Optional; | ||
import org.junit.rules.TemporaryFolder; | ||
|
||
/** Helper class which should help to make it easy to create an engine for tests. */ | ||
public final class TestEngine { | ||
|
||
private final StreamProcessingComposite streamProcessingComposite; | ||
|
||
private TestEngine( | ||
final int partitionId, final int partitionCount, final TestContext testContext) { | ||
|
||
final var testStreams = | ||
new TestStreams( | ||
testContext.temporaryFolder(), | ||
testContext.autoCloseableRule(), | ||
testContext.actorScheduler()); | ||
testStreams.withStreamProcessorMode(StreamProcessorMode.PROCESSING); | ||
// for performance reasons we want to enable batch processing | ||
testStreams.maxCommandsInBatch(100); | ||
|
||
testContext | ||
.autoCloseableRule() | ||
.manage( | ||
testStreams.createLogStream( | ||
StreamProcessingComposite.getLogName(partitionId), partitionId)); | ||
|
||
streamProcessingComposite = | ||
new StreamProcessingComposite( | ||
testStreams, | ||
partitionId, | ||
DefaultZeebeDbFactory.defaultFactory(), | ||
testContext.actorScheduler()); | ||
|
||
final var interPartitionCommandSenders = new ArrayList<TestInterPartitionCommandSender>(); | ||
final var featureFlags = FeatureFlags.createDefaultForTests(); | ||
|
||
final var interPartitionCommandSender = | ||
new TestInterPartitionCommandSender(streamProcessingComposite::newLogStreamWriter); | ||
interPartitionCommandSenders.add(interPartitionCommandSender); | ||
testContext | ||
.autoCloseableRule() | ||
.manage( | ||
streamProcessingComposite.startTypedStreamProcessor( | ||
partitionId, | ||
(recordProcessorContext) -> | ||
EngineProcessors.createEngineProcessors( | ||
recordProcessorContext, | ||
partitionCount, | ||
new SubscriptionCommandSender(partitionId, interPartitionCommandSender), | ||
interPartitionCommandSender, | ||
featureFlags, | ||
JobStreamer.noop()) | ||
.withListener( | ||
new ProcessingExporterTransistor( | ||
testStreams.getLogStream( | ||
StreamProcessingComposite.getLogName(partitionId)))), | ||
Optional.empty())); | ||
interPartitionCommandSenders.forEach(s -> s.initializeWriters(partitionCount)); | ||
} | ||
|
||
public DeploymentClient createDeploymentClient() { | ||
return new DeploymentClient(streamProcessingComposite, (p) -> p.accept(1)); | ||
} | ||
|
||
public ProcessInstanceClient createProcessInstanceClient() { | ||
return new ProcessInstanceClient(streamProcessingComposite); | ||
} | ||
|
||
public static TestEngine createSinglePartitionEngine(final TestContext testContext) { | ||
return new TestEngine(1, 1, testContext); | ||
} | ||
|
||
/** | ||
* Containing infrastructure related dependencies which might be shared between TestEngines. | ||
* | ||
* @param actorScheduler the scheduler which is used during tests | ||
* @param temporaryFolder the temporary folder where the log and runtime is written to | ||
* @param autoCloseableRule a collector of all to managed resources, which should be cleaned up | ||
* later | ||
*/ | ||
public record TestContext( | ||
ActorScheduler actorScheduler, | ||
TemporaryFolder temporaryFolder, | ||
AutoCloseableRule autoCloseableRule) {} | ||
} |
Oops, something went wrong.