Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(importer): prevent OOM by filtering out older process versions eerly on #2756

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,16 @@ public ProcessDefinitionImporter(
@Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}")
public synchronized void scheduleImport() {
try {
search.query(this::handleImportedDefinitions);
var result = search.query();
handleImportedDefinitions(result);
ready = true;
} catch (Exception e) {
LOG.error("Failed to import process definitions", e);
ready = false;
}
}

public void handleImportedDefinitions(List<ProcessDefinition> unprocessedDefinitions) {
var definitions = keepOnlyLatestVersions(unprocessedDefinitions);

public void handleImportedDefinitions(List<ProcessDefinition> definitions) {
var notYetRegistered =
definitions.stream()
.filter(d -> !registeredProcessDefinitionKeys.contains(d.getKey()))
Expand Down Expand Up @@ -113,17 +112,6 @@ public void handleImportedDefinitions(List<ProcessDefinition> unprocessedDefinit
registeredProcessDefinitionKeys.removeAll(oldProcessDefinitionKeys);
}

private List<ProcessDefinition> keepOnlyLatestVersions(List<ProcessDefinition> unprocessed) {
Map<String, ProcessDefinition> versionsByBpmnProcessId = new HashMap<>();
for (ProcessDefinition pd : unprocessed) {
var currentVersion = versionsByBpmnProcessId.get(pd.getBpmnProcessId());
if (currentVersion == null || currentVersion.getVersion() < pd.getVersion()) {
versionsByBpmnProcessId.put(pd.getBpmnProcessId(), pd);
}
}
return versionsByBpmnProcessId.values().stream().toList();
}

private void logResult(
Set<ProcessDefinition> brandNew, Set<ProcessDefinition> upgraded, Set<Long> deleted) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.camunda.operate.search.Sort;
import io.camunda.operate.search.SortOrder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
Expand All @@ -45,13 +47,19 @@ public ProcessDefinitionSearch(CamundaOperateClient camundaOperateClient) {
this.camundaOperateClient = camundaOperateClient;
}

public void query(Consumer<List<ProcessDefinition>> resultHandler) {
/**
* Query process definitions from Camunda Operate. Guaranteed to return only the latest deployed
* version of each process definition.
*/
public List<ProcessDefinition> query() {
LOG.trace("Query process deployments...");
List<ProcessDefinition> processDefinitions = new ArrayList<>();
SearchResult<ProcessDefinition> processDefinitionResult;
LOG.trace("Running paginated query");

List<Object> paginationIndex = null;
final Set<String> encounteredBpmnProcessIds = new HashSet<>();

do {
try {
SearchQuery processDefinitionQuery =
Expand All @@ -71,14 +79,21 @@ public void query(Consumer<List<ProcessDefinition>> resultHandler) {
paginationIndex = newPaginationIdx;
}

var items = processDefinitionResult.getItems();
if (items != null) {
processDefinitions.addAll(items);
}
// result is sorted by key in descending order, so we will always encounter the latest
// version first

var items =
Optional.ofNullable(processDefinitionResult.getItems()).orElse(List.of()).stream()
.filter(
definition -> !encounteredBpmnProcessIds.contains(definition.getBpmnProcessId()))
.peek(definition -> encounteredBpmnProcessIds.add(definition.getBpmnProcessId()))
.toList();

processDefinitions.addAll(items);

} while (processDefinitionResult.getItems() != null
&& !processDefinitionResult.getItems().isEmpty());

resultHandler.accept(processDefinitions);
return processDefinitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.model.ProcessDefinition;
Expand All @@ -36,11 +37,12 @@ public class ProcessDefinitionImporterTest {

private ProcessDefinitionImporter importer;
private InboundConnectorManager manager;
private ProcessDefinitionSearch search;

@BeforeEach
public void init() {
manager = mock(InboundConnectorManager.class);
var search = mock(ProcessDefinitionSearch.class);
search = mock(ProcessDefinitionSearch.class);
importer = new ProcessDefinitionImporter(manager, search, new DefaultNoopMetricsRecorder());
}

Expand All @@ -51,9 +53,11 @@ void newProcessDefinitionDeployed_shouldRegister() {
List<ProcessDefinition> second =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process2", 1, 2));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(eq(new HashSet<>(first)));
Expand All @@ -65,20 +69,21 @@ void newProcessDefinitionDeployed_shouldRegister() {
void newVersionDeployed_shouldRegister() {
// given
List<ProcessDefinition> first = List.of(getProcessDefinition("process1", 1, 1));
List<ProcessDefinition> second =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process1", 2, 2));
List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 2, 2));

when(search.query()).thenReturn(first).thenReturn(second).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(first));
verify(manager, times(1)).handleDeletedProcessDefinitions(Set.of(first.get(0).getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(second.get(1)));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(second.get(0)));

// verify old version was deregistered and no action is taken on the next polling iteration
importer.handleImportedDefinitions(second);
importer.scheduleImport();
verifyNoMoreInteractions(manager);
}

Expand All @@ -88,9 +93,11 @@ void versionNotChanged_shouldNotRegister() {
List<ProcessDefinition> definitions =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process2", 1, 2));

when(search.query()).thenReturn(definitions);

// when
importer.handleImportedDefinitions(definitions);
importer.handleImportedDefinitions(definitions);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(definitions));
Expand All @@ -105,9 +112,11 @@ void processDefinitionDeleted_shouldDeregister() {

List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 1, 1));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(first));
Expand All @@ -117,18 +126,19 @@ void processDefinitionDeleted_shouldDeregister() {
@Test
void newerVersionDeleted_shouldRegisterOldOne() {
// given
List<ProcessDefinition> first =
List.of(getProcessDefinition("process1", 1, 1), getProcessDefinition("process1", 2, 2));
List<ProcessDefinition> first = List.of(getProcessDefinition("process1", 2, 2));

List<ProcessDefinition> second = List.of(getProcessDefinition("process1", 1, 1));

when(search.query()).thenReturn(first).thenReturn(second);

// when
importer.handleImportedDefinitions(first);
importer.handleImportedDefinitions(second);
importer.scheduleImport();
importer.scheduleImport();

// then
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(first.get(1)));
verify(manager, times(1)).handleDeletedProcessDefinitions(Set.of(first.get(1).getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(Set.of(first.get(0)));
verify(manager, times(1)).handleDeletedProcessDefinitions(Set.of(first.get(0).getVersion()));
verify(manager, times(1)).handleNewProcessDefinitions(new HashSet<>(second));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import static io.camunda.connector.e2e.BpmnFile.Replace.replace;
import static io.camunda.connector.e2e.BpmnFile.replace;
import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -44,6 +42,7 @@
import io.camunda.zeebe.model.bpmn.instance.Process;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -87,7 +86,7 @@ public class HttpTests {

@BeforeEach
void beforeAll() {
doNothing().when(processDefinitionSearch).query(any());
when(processDefinitionSearch.query()).thenReturn(Collections.emptyList());
}

@Test
Expand Down