Skip to content

Commit

Permalink
fix(importer): prevent OOM by filtering out older process versions ea…
Browse files Browse the repository at this point in the history
…rly on (#2038)

* fix(importer): prevent oom by filtering out older process versions early on

* fix tests
  • Loading branch information
chillleader committed Mar 5, 2024
1 parent 6981ed9 commit 6713bd7
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@ public ProcessDefinitionImporter(
@Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}")
public synchronized void scheduleImport() {
try {
search.query(this::handleImportedDefinitions);
var result = search.query();
handleImportedDefinitions(result);
ready = true;
} catch (Exception e) {
LOG.error("Failed to import process definitions", e);
ready = false;
}
}

public void handleImportedDefinitions(List<ProcessDefinition> unprocessedDefinitions) {
var definitions = keepOnlyLatestVersions(unprocessedDefinitions);

public void handleImportedDefinitions(List<ProcessDefinition> definitions) {
var notYetRegistered =
definitions.stream()
.filter(d -> !registeredProcessDefinitionKeys.contains(d.getKey()))
Expand Down Expand Up @@ -113,17 +112,6 @@ public void handleImportedDefinitions(List<ProcessDefinition> unprocessedDefinit
registeredProcessDefinitionKeys.removeAll(oldProcessDefinitionKeys);
}

private List<ProcessDefinition> keepOnlyLatestVersions(List<ProcessDefinition> unprocessed) {
Map<String, ProcessDefinition> versionsByBpmnProcessId = new HashMap<>();
for (ProcessDefinition pd : unprocessed) {
var currentVersion = versionsByBpmnProcessId.get(pd.getBpmnProcessId());
if (currentVersion == null || currentVersion.getVersion() < pd.getVersion()) {
versionsByBpmnProcessId.put(pd.getBpmnProcessId(), pd);
}
}
return versionsByBpmnProcessId.values().stream().toList();
}

private void logResult(
Set<ProcessDefinition> brandNew, Set<ProcessDefinition> upgraded, Set<Long> deleted) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.camunda.operate.search.Sort;
import io.camunda.operate.search.SortOrder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
Expand All @@ -45,13 +47,19 @@ public ProcessDefinitionSearch(CamundaOperateClient camundaOperateClient) {
this.camundaOperateClient = camundaOperateClient;
}

public void query(Consumer<List<ProcessDefinition>> resultHandler) {
/**
* Query process definitions from Camunda Operate. Guaranteed to return only the latest deployed
* version of each process definition.
*/
public List<ProcessDefinition> query() {
LOG.trace("Query process deployments...");
List<ProcessDefinition> processDefinitions = new ArrayList<>();
SearchResult<ProcessDefinition> processDefinitionResult;
LOG.trace("Running paginated query");

List<Object> paginationIndex = null;
final Set<String> encounteredBpmnProcessIds = new HashSet<>();

do {
try {
SearchQuery processDefinitionQuery =
Expand All @@ -71,14 +79,21 @@ public void query(Consumer<List<ProcessDefinition>> resultHandler) {
paginationIndex = newPaginationIdx;
}

var items = processDefinitionResult.getItems();
if (items != null) {
processDefinitions.addAll(items);
}
// result is sorted by key in descending order, so we will always encounter the latest
// version first

var items =
Optional.ofNullable(processDefinitionResult.getItems()).orElse(List.of()).stream()
.filter(
definition -> !encounteredBpmnProcessIds.contains(definition.getBpmnProcessId()))
.peek(definition -> encounteredBpmnProcessIds.add(definition.getBpmnProcessId()))
.toList();

processDefinitions.addAll(items);

} while (processDefinitionResult.getItems() != null
&& !processDefinitionResult.getItems().isEmpty());

resultHandler.accept(processDefinitions);
return processDefinitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.model.ProcessDefinition;
Expand All @@ -36,11 +37,12 @@ public class ProcessDefinitionImporterTest {

private ProcessDefinitionImporter importer;
private InboundConnectorManager manager;
private ProcessDefinitionSearch search;

@BeforeEach
public void init() {
manager = mock(InboundConnectorManager.class);
var search = mock(ProcessDefinitionSearch.class);
search = mock(ProcessDefinitionSearch.class);
importer = new ProcessDefinitionImporter(manager, search, new DefaultNoopMetricsRecorder());
}

Expand All @@ -51,9 +53,11 @@ void newProcessDefinitionDeployed_shouldRegister() {
List<ProcessDefinition> second =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process2", 1, 2));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(eq(new HashSet<>(first)));
Expand All @@ -65,20 +69,22 @@ void newProcessDefinitionDeployed_shouldRegister() {
void newVersionDeployed_shouldRegister() {
// given
List<ProcessDefinition> first = List.of(getProcessDefinition("process1", 1, 1));
List<ProcessDefinition> second =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process1", 2, 2));
List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 2, 2));

when(search.query()).thenReturn(first).thenReturn(second).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(first));
verify(manager, times(1)).handleDeletedProcessDefinitions(Set.of(first.get(0).getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(second.get(1)));
verify(manager, times(1))
.handleDeletedProcessDefinitions(Set.of(first.getFirst().getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(second.getFirst()));

// verify old version was deregistered and no action is taken on the next polling iteration
importer.handleImportedDefinitions(second);
importer.scheduleImport();
verifyNoMoreInteractions(manager);
}

Expand All @@ -88,9 +94,11 @@ void versionNotChanged_shouldNotRegister() {
List<ProcessDefinition> definitions =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process2", 1, 2));

when(search.query()).thenReturn(definitions);

// when
importer.handleImportedDefinitions(definitions);
importer.handleImportedDefinitions(definitions);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(definitions));
Expand All @@ -105,9 +113,11 @@ void processDefinitionDeleted_shouldDeregister() {

List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 1, 1));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(first));
Expand All @@ -117,18 +127,20 @@ void processDefinitionDeleted_shouldDeregister() {
@Test
void newerVersionDeleted_shouldRegisterOldOne() {
// given
List<ProcessDefinition> first =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process1", 2, 2));
List<ProcessDefinition> first = List.of(getProcessDefinition("process1", 2, 2));

List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 1, 1));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(first.get(1)));
verify(manager, times(1)).handleDeletedProcessDefinitions(Set.of(first.get(1).getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(first.getFirst()));
verify(manager, times(1))
.handleDeletedProcessDefinitions(Set.of(first.getFirst().getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(second));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package io.camunda.connector.e2e;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -106,6 +106,6 @@ public abstract class BaseAutomationAnywhereTest {

@BeforeEach
void beforeEach() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import static io.camunda.connector.e2e.AwsService.LAMBDA;
import static io.camunda.connector.e2e.AwsService.SNS;
import static io.camunda.connector.e2e.AwsService.SQS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import io.camunda.connector.e2e.app.TestConnectorRuntimeApplication;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import java.io.File;
import java.util.Collections;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -85,7 +85,7 @@ static void initializeLocalStackContainer() throws InterruptedException {

@BeforeEach
void beforeEach() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

/** Stops the LocalStack container and cleans up any associated resources. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package io.camunda.connector.e2e;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
Expand All @@ -28,6 +27,7 @@
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import java.io.File;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -77,7 +77,7 @@ public abstract class BaseEasyPostTest {

@BeforeEach
void beforeEach() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

protected ZeebeTest setupTestWithBpmnModel(String taskName, File elementTemplate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import static io.camunda.connector.e2e.BpmnFile.Replace.replace;
import static io.camunda.connector.e2e.BpmnFile.replace;
import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -48,6 +46,7 @@
import io.camunda.zeebe.model.bpmn.instance.Process;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -91,7 +90,7 @@ public class HttpTests {

@BeforeEach
void beforeAll() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package io.camunda.connector.e2e;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,7 +85,7 @@ static void tearDown() {

@BeforeEach
void beforeEach() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

private static void createTopics(String... topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package io.camunda.connector.e2e;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionSearch;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import java.io.File;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -47,7 +47,7 @@ public abstract class BaseRabbitMqTest {

@BeforeEach
void beforeEach() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

protected ZeebeTest setupTestWithBpmnModel(String taskName, File elementTemplate) {
Expand Down
Loading

0 comments on commit 6713bd7

Please sign in to comment.