Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scalability to hazelcast when exporting records from zeebe #261

Open
shahamit opened this issue May 4, 2023 · 6 comments
Open

Adding scalability to hazelcast when exporting records from zeebe #261

shahamit opened this issue May 4, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@shahamit
Copy link

shahamit commented May 4, 2023

Continuing this discussion on slack about the enhancement we want to add to this exporter of being able to scale when zeebe engine is configured with multiple partitions. Right now when hazelcast cluster is deployed with a zeebe cluster of around 8 brokers (with 8 partitions) and 2 gateways we observed that the hazelcast nodes crash when the zeebe cluster is under load. In our discussion we realised that this is due to the fact that the exporter is exporting zeebe records to a single ringbuffer.

This issue is created to discuss the fix for the enhancement of exporting as many ringbuffers as the number of zeebe partitions so that we get the scalability benefits of hazelcast.

Code Question @saig0 - While digging into the exporter code, I tried to convert the ringbuffer instance variable to a Map<partitionId, Ringbuffer> but can you please throw insights into how do we get to know the number of zeebe partitions in the exporter code? Do we accept it as part of the configuration (probably not right to do so)?

Thanks.

@saig0 saig0 self-assigned this May 15, 2023
@saig0 saig0 added the enhancement New feature or request label May 16, 2023
@saig0
Copy link
Collaborator

saig0 commented May 16, 2023

how do we get to know the number of zeebe partitions in the exporter code?

Good question. 🤔

Currently, the exporter can't access this kind of Zeebe configuration. Only during the exporting, the exporter can read the partition id from the record.

One solution could be to initialize the ring-buffer lazy. When reading the first record, initialize the ring-buffer for this partition.

Another question is, how does the consumer know about the existing partitions to subscribe from?

  • The simple solution would be a static configuration in the client.
  • The dynamic solution would require a management data type in Hazelcast to store and read the available ring-buffers for the different partitions.

@saig0 saig0 removed their assignment May 16, 2023
@shahamit
Copy link
Author

We started the code changes and deployed some basic code that creates multiple ring buffers (as many as the zeebe partitions). The cluster configuration is 2 zeebe-gateways, 8 brokers, 5 hazelcast instances, 1 zeeqs, 1 postgres.

The number of hazelcast instance restarts have reduced but we still see over-utilised and under-utilised hazelcast instances. Please find the resource utilisation screenshots attached.

Quick question - @saig0 - Since ringbuffer is a replicated data structure, should the memory usage of all hazelcast instances be same? Some instances show the memory usage as 300 MB while the overutilised ones show it as 6 GB.

image
image
image
image
image

@saig0 saig0 self-assigned this Jul 7, 2023
@shahamit
Copy link
Author

@saig0 - We progressed and took a separate track. We exported the data to different ring buffers (1:1 relationship with partitions) and added zeeqs replicas. Each zeeqs instance reads from one ring buffer. We had make changes in zeeqs to support this.

This improved the import speed but not much. When running the camunda 8 benchmarking tool we observed that the import speed is 0.05x for 5 partitions, 5 ring buffers and 5 zeeqs instances. As the number of partitions increase, we have to add those many zeeqs instances which doesn't sound great.
Even earlier when we tried reading different ring buffers in different threads, the import speed was still slow.

The main problem looks like ringBuffer.readOne(sequence) api call thread here which then processes the record and updates the postProcessListener sequentially. We attempted it with readManyAsync api call but that lead to other problems.

With slow import speed, zeeqs takes more than a day or two to sync up data from hazelcast which leads to other problems like hazelcast crashes etc.

Any inputs on how can we improve the zeeqs import speed?

Thanks.

@saig0
Copy link
Collaborator

saig0 commented Jul 14, 2023

Any inputs on how can we improve the zeeqs import speed?

I can't give you a definite answer because I never optimized these tools for performance.

First, you should measure where these tools spend the most time. It could be on exporting: transform to Protobuf, add to Hazelcast ring-buffer, or importing: read from Hazelcast ring-buffer, transform to record DTO, insert in Postgres database.

Depending on the measurement result, you could optimize this part. For example,

  • exporting: add batches to the Hazelcast ring-buffer
  • importing: read batches from the Hazelcast ring-buffer

@saig0 saig0 removed their assignment Jul 17, 2023
@shahamit
Copy link
Author

Right, I have been trying to measure the code and troubleshoot what operation(s) is taking time. Given that we would have to modify the import code, I tried executing the ExporterRecordTest test class but the test case isn't passing. It fails with the below error - any idea what could be wrong @saig0 and how to make it pass?

org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a io.zeebe.hazelcast.ExporterRecordTest 
Expecting size of:
  []
to be greater than or equal to 3 but was 0 within 10 seconds.

	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at io.zeebe.hazelcast.ExporterRecordTest.shouldExportRecords(ExporterRecordTest.java:152)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: java.lang.AssertionError: 
Expecting size of:
  []
to be greater than or equal to 3 but was 0
	at io.zeebe.hazelcast.ExporterRecordTest.lambda$shouldExportRecords$34(ExporterRecordTest.java:155)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

@saig0
Copy link
Collaborator

saig0 commented Sep 5, 2023

@shahamit I don't know why the test is not working for you. I suggest checking the log output. Maybe, something is wrong with the Zeebe Testcontainer or the Hazelcast client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants