Skip to content

Commit

Permalink
merge: #10450 #10458
Browse files Browse the repository at this point in the history
10450: fix(raft): handle exceptions on partition server init r=megglos a=megglos

## Description

Previously any RuntimeException happening in RaftPartitionServer#initServer lead to a broken future chain during start which lead to a stale node without any logs on the actual exception occurred during init. Ultimately flying silently till [here](https://github.com/camunda/zeebe/blob/main/broker/src/main/java/io/camunda/zeebe/broker/bootstrap/PartitionManagerStep.java#L42) bringing the startup to a halt.

With this change issues are transparent, see this [log](https://console.cloud.google.com/logs/query;cursorTimestamp=2022-09-22T11:20:44.904454673Z;query=resource.labels.namespace_name%3D%22medic-cw-37-de38e9e086-benchmark-mixed%22%0Aresource.labels.pod_name%3D%22medic-cw-37-de38e9e086-benchmark-mixed-zeebe-2%22%0A-resource.labels.container_name%3D%22debugger-9q4tw%22%0A-logName%3D%22projects%2Fzeebe-io%2Flogs%2Fevents%22%0Atimestamp%3D%222022-09-22T11:20:44.904454673Z%22%0AinsertId%3D%2238ntsbk0c2ikn344%22%0Atimestamp%3D%222022-09-22T11:20:44.904454673Z%22%0AinsertId%3D%2238ntsbk0c2ikn344%22;summaryFields=:false:32:beginning;timeRange=2022-09-22T10:20:44.905Z%2F2022-09-22T11:20:44.905Z?project=zeebe-io) from a pod created with this change. 

This bug was hiding the underlying issue a node not being able to start due to #10451 .

## Related issues

relates to #10451



10458: Reorganize stream processor and engine tests r=Zelldon a=Zelldon

## Description

Moved some tests around to make it easier to detect which need to be migrated for #10455 and to make it easier to create the new module and copy the tests, which are part of the StreamProcessor see #10130 

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10455 
related to #10130 




Co-authored-by: Meggle (Sebastian Bathke) <sebastian.bathke@camunda.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
3 people committed Sep 26, 2022
3 parents f29e236 + 5d60fcc + 78266e9 commit e64ea10
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.atomix.raft.partition.RaftStorageConfig;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.StorageException;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.Managed;
Expand Down Expand Up @@ -113,8 +112,7 @@ public CompletableFuture<RaftPartitionServer> start() {
synchronized (this) {
try {
initServer();

} catch (final StorageException e) {
} catch (final RuntimeException e) {
return Futures.exceptionalFuture(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.atomix.raft.partition.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.partition.RaftPartitionGroupConfig;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;

public class RaftPartitionServerTest {

private final RaftPartition raftPartition = mock(RaftPartition.class);
private final RaftPartitionGroupConfig partitionGroupConfig =
mock(RaftPartitionGroupConfig.class);

@Test
public void testInitServerRuntimeExceptionReturnsExceptionalFuture() {
// given
final MemberId localMemberId = new MemberId("1");
when(raftPartition.members()).thenReturn(List.of(localMemberId));
when(raftPartition.id())
.thenReturn(PartitionId.from("group", Integer.parseInt(localMemberId.id())));

when(partitionGroupConfig.getPartitionConfig()).thenReturn(new RaftPartitionConfig());

final RaftPartitionServer raftPartitionServer =
new RaftPartitionServer(
raftPartition,
partitionGroupConfig,
localMemberId,
mock(ClusterMembershipService.class),
mock(ClusterCommunicationService.class),
mock(PartitionMetadata.class));

// this is called internally by #initServer which we need to ensure does not prevent
// a completableFuture to be returned on failure
when(partitionGroupConfig.getStorageConfig()).thenThrow(RuntimeException.class);

// when
final CompletableFuture<RaftPartitionServer> raftServerStartFuture =
raftPartitionServer.start();

// then
assertThat(raftServerStartFuture)
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(RuntimeException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

final class PartitionManagerStep extends AbstractBrokerStartupStep {
@Override
Expand All @@ -36,72 +37,67 @@ void startupInternal(
brokerStartupContext.getExporterRepository(),
brokerStartupContext.getGatewayBrokerTransport());

CompletableFuture.runAsync(
() ->
partitionManager
.start()
.whenComplete(
(ok, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}
CompletableFuture.supplyAsync(partitionManager::start)
.thenCompose(Function.identity())
.whenComplete(
(ok, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}

forwardExceptions(
forwardExceptions(
() ->
concurrencyControl.run(
() ->
concurrencyControl.run(
() ->
forwardExceptions(
() -> {
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
partitionManager.createAdminAccess(adminService));
adminService.injectPartitionInfoSource(
partitionManager.getPartitions());
forwardExceptions(
() -> {
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
partitionManager.createAdminAccess(adminService));
adminService.injectPartitionInfoSource(
partitionManager.getPartitions());

brokerStartupContext.setPartitionManager(
partitionManager);
brokerStartupContext.setPartitionManager(partitionManager);

startupFuture.complete(brokerStartupContext);
},
startupFuture)),
startupFuture);
}));
startupFuture.complete(brokerStartupContext);
},
startupFuture)),
startupFuture);
});
}

@Override
void shutdownInternal(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
final var partitionManger = brokerShutdownContext.getPartitionManager();
if (partitionManger == null) {
final var partitionManager = brokerShutdownContext.getPartitionManager();
if (partitionManager == null) {
shutdownFuture.complete(null);
return;
}

CompletableFuture.runAsync(
() ->
partitionManger
.stop()
.whenComplete(
(ok, error) -> {
if (error != null) {
shutdownFuture.completeExceptionally(error);
return;
}
forwardExceptions(
CompletableFuture.supplyAsync(partitionManager::stop)
.thenCompose(Function.identity())
.whenComplete(
(ok, error) -> {
if (error != null) {
shutdownFuture.completeExceptionally(error);
return;
}
forwardExceptions(
() ->
concurrencyControl.run(
() ->
concurrencyControl.run(
() ->
forwardExceptions(
() -> {
brokerShutdownContext.setPartitionManager(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture)),
shutdownFuture);
}));
forwardExceptions(
() -> {
brokerShutdownContext.setPartitionManager(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture)),
shutdownFuture);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionManagerStepTest {
private static final TestConcurrencyControl CONCURRENCY_CONTROL = new TestConcurrencyControl();
Expand All @@ -48,6 +52,7 @@ class PartitionManagerStepTest {
networkCfg.setHost("localhost");
}

private final Logger log = LoggerFactory.getLogger(PartitionManagerStepTest.class);
private final PartitionManagerStep sut = new PartitionManagerStep();
private BrokerStartupContextImpl testBrokerStartupContext;

Expand Down Expand Up @@ -105,7 +110,11 @@ void tearDown() {
if (partitionManager != null) {
partitionManager.stop().join();
}
actorScheduler.stop();
try {
actorScheduler.stop();
} catch (final IllegalStateException e) {
log.debug("ActorScheduler was already stopped.");
}
}

@Test
Expand All @@ -128,6 +137,22 @@ void shouldStartAndInstallEmbeddedGatewayService() {
final var partitionManager = testBrokerStartupContext.getPartitionManager();
assertThat(partitionManager).isNotNull();
}

@Test
void shouldHandleSyncFailOfStart() throws Exception {
// given
actorScheduler.close();

// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);

// then
assertThat(startupFuture)
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(CompletionException.class)
.withRootCauseInstanceOf(IllegalStateException.class);
}
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.processing;

import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.util.EngineRule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.processing.randomized;

import io.camunda.zeebe.test.util.bpmn.random.TestDataGenerator.TestDataRecord;
import java.util.function.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.processing.randomized;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.ProcessExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.processing.randomized;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.state;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.state;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.engine.processing.message.MessageObserver;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.streamprocessor.EventFilter;
import org.junit.Test;

public final class EventFilterTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.util.RecordToWrite.command;
import static io.camunda.zeebe.engine.util.RecordToWrite.event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.util.RecordToWrite.command;
import static io.camunda.zeebe.engine.util.RecordToWrite.event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ACTIVATE_ELEMENT;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -34,7 +34,6 @@
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.streamprocessor.ProcessingScheduleServiceImpl;
import io.camunda.zeebe.streamprocessor.StreamProcessor.Phase;
import io.camunda.zeebe.test.util.junit.RegressionTest;
import java.time.Duration;
Expand Down

0 comments on commit e64ea10

Please sign in to comment.