Skip to content

Commit

Permalink
Merge #6426
Browse files Browse the repository at this point in the history
6426: [Backport 0.26] Use always new appender r=Zelldon a=Zelldon

## Description

Backports #6392 

> Previous if the leader close transition took to long and the broker become leader again it could happen that the new appender was used by the old log storage appender. This is now prevented by not using a supplier. It requests the appender once on leader transition and always uses the same object.
>
> @deepthidevaki you mentioned that you want to prevent stuff based on different term. I think it is already something checked in the ZeebePartition regarding that. If this is not enough for your we can create a follow up issue.
<!-- Please explain the changes you made here. -->

There were some changes necessary since here the journal changes are not available, which means we need to use the ZeebeIndexMapping etc.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #6346 

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The change has been verified by a QA run
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors[bot] and Zelldon committed Feb 24, 2021
2 parents 49290da + 95e89d3 commit 988adf7
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 92 deletions.
2 changes: 2 additions & 0 deletions broker/src/main/java/io/zeebe/broker/Broker.java
Expand Up @@ -50,6 +50,7 @@
import io.zeebe.broker.system.partitions.impl.AtomixPartitionMessagingService;
import io.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.zeebe.broker.system.partitions.impl.PartitionTransitionImpl;
import io.zeebe.broker.system.partitions.impl.steps.AtomixLogStoragePartitionStep;
import io.zeebe.broker.system.partitions.impl.steps.ExporterDirectorPartitionStep;
import io.zeebe.broker.system.partitions.impl.steps.FollowerPostStoragePartitionStep;
import io.zeebe.broker.system.partitions.impl.steps.LeaderPostStoragePartitionStep;
Expand Down Expand Up @@ -97,6 +98,7 @@ public final class Broker implements AutoCloseable {
public static final Logger LOG = Loggers.SYSTEM_LOGGER;
private static final List<PartitionStep> LEADER_STEPS =
List.of(
new AtomixLogStoragePartitionStep(),
new LogStreamPartitionStep(),
new RaftLogReaderPartitionStep(),
new SnapshotReplicationPartitionStep(),
Expand Down
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.broker.system.partitions;

import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;

/**
* A PartitionStep is an action to be taken while opening or closing a partition (e.g.,
Expand All @@ -21,10 +22,25 @@ public interface PartitionStep {
* components (e.g., logstream), setting their values in {@link PartitionContext}, etc. The
* subsequent partition steps will only be opened after the returned future is completed.
*
* @param currentTerm the current term of the transition
* @param context the partition context
* @return future
*/
ActorFuture<Void> open(final PartitionContext context);
default ActorFuture<Void> open(final long currentTerm, final PartitionContext context) {
return open(context);
}

/**
* Performs some action required for the partition to function. This may include opening
* components (e.g., logstream), setting their values in {@link PartitionContext}, etc. The
* subsequent partition steps will only be opened after the returned future is completed.
*
* @param context the partition context
* @return future
*/
default ActorFuture<Void> open(final PartitionContext context) {
return CompletableActorFuture.completed(null);
}

/**
* Perform tear-down actions to clear the partition and prepare for another one to be installed.
Expand Down
Expand Up @@ -15,17 +15,19 @@ public interface PartitionTransition {
* Transitions to follower asynchronously by closing the current partition's components and
* opening a follower partition.
*
* @param currentTerm the current term on which the transition happens
* @return an ActorFuture to be completed when the transition is complete
*/
ActorFuture<Void> toFollower();
ActorFuture<Void> toFollower(final long currentTerm);

/**
* Transitions to leader asynchronously by closing the current partition's components and opening
* a leader partition.
*
* @param currentTerm the current term on which the transition happens
* @return an ActorFuture to be completed when the transition is complete
*/
ActorFuture<Void> toLeader();
ActorFuture<Void> toLeader(final long currentTerm);

/**
* Closes the current partition's components asynchronously.
Expand Down
Expand Up @@ -14,7 +14,6 @@
import io.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.zeebe.broker.system.monitoring.HealthMetrics;
import io.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorage;
import io.zeebe.snapshots.raft.PersistedSnapshotStore;
import io.zeebe.util.health.CriticalComponentsHealthMonitor;
import io.zeebe.util.health.FailureListener;
Expand Down Expand Up @@ -102,12 +101,12 @@ private void onRoleChange(final Role newRole, final long newTerm) {
if (nextTransitionFuture != null) {
currentTransitionFuture = nextTransitionFuture;
}
LOG.debug("Partition role transitioning from {} to {}", raftRole, newRole);
LOG.debug("Partition role transitioning from {} to {} in term {}", raftRole, newRole, term);
raftRole = newRole;
}

private ActorFuture<Void> leaderTransition(final long newTerm) {
final var leaderTransitionFuture = transition.toLeader();
final var leaderTransitionFuture = transition.toLeader(newTerm);
leaderTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
Expand Down Expand Up @@ -136,7 +135,7 @@ private ActorFuture<Void> leaderTransition(final long newTerm) {
}

private ActorFuture<Void> followerTransition(final long newTerm) {
final var followerTransitionFuture = transition.toFollower();
final var followerTransitionFuture = transition.toFollower(newTerm);
followerTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
Expand Down Expand Up @@ -192,8 +191,6 @@ public String getName() {

@Override
public void onActorStarting() {
context.setAtomixLogStorage(
AtomixLogStorage.ofPartition(context.getZeebeIndexMapping(), context.getRaftPartition()));
context.getRaftPartition().addRoleChangeListener(this);
context.getComponentHealthMonitor().addFailureListener(this);
onRoleChange(context.getRaftPartition().getRole(), context.getRaftPartition().term());
Expand Down
Expand Up @@ -22,6 +22,7 @@ public class PartitionTransitionImpl implements PartitionTransition {

private static final Logger LOG = Loggers.SYSTEM_LOGGER;
private static final List<PartitionStep> EMPTY_LIST = Collections.emptyList();
private static final int INACTIVE_TERM = -1;

private final PartitionContext context;
private final List<PartitionStep> leaderSteps;
Expand All @@ -39,43 +40,51 @@ public PartitionTransitionImpl(
}

@Override
public ActorFuture<Void> toFollower() {
return enqueueTransition(followerSteps);
public ActorFuture<Void> toFollower(final long currentTerm) {
return enqueueTransition(currentTerm, followerSteps);
}

@Override
public ActorFuture<Void> toLeader() {
return enqueueTransition(leaderSteps);
public ActorFuture<Void> toLeader(final long currentTerm) {
return enqueueTransition(currentTerm, leaderSteps);
}

@Override
public ActorFuture<Void> toInactive() {
return enqueueTransition(EMPTY_LIST);
return enqueueTransition(INACTIVE_TERM, EMPTY_LIST);
}

/**
* This method allows to enqueue the next transition, such that the transitions are executed in
* order. Previous we had the issue that all transitions have subscribe to the current transition,
* which lead to undefined behavior.
*
* @param currentTerm
* @param partitionStepList the steps which should be installed on the transition
*/
private ActorFuture<Void> enqueueTransition(final List<PartitionStep> partitionStepList) {
private ActorFuture<Void> enqueueTransition(
final long currentTerm, final List<PartitionStep> partitionStepList) {
final var nextTransitionFuture = new CompletableActorFuture<Void>();
final var nextCurrentTransition = currentTransition;
currentTransition = nextTransitionFuture;
nextCurrentTransition.onComplete(
(nothing, err) -> transition(nextTransitionFuture, partitionStepList));
(nothing, err) -> transition(currentTerm, nextTransitionFuture, partitionStepList));
return nextTransitionFuture;
}

private void transition(
final CompletableActorFuture<Void> future, final List<PartitionStep> steps) {
closePartition().onComplete((nothing, err) -> installPartition(future, new ArrayList<>(steps)));
final long currentTerm,
final CompletableActorFuture<Void> future,
final List<PartitionStep> steps) {
closePartition()
.onComplete(
(nothing, err) -> installPartition(currentTerm, future, new ArrayList<>(steps)));
}

private void installPartition(
final CompletableActorFuture<Void> future, final List<PartitionStep> steps) {
final long currentTerm,
final CompletableActorFuture<Void> future,
final List<PartitionStep> steps) {
if (steps.isEmpty()) {
LOG.debug(
"Partition {} transition complete, installed {} resources!",
Expand All @@ -86,7 +95,7 @@ private void installPartition(
}

final PartitionStep step = steps.remove(0);
step.open(context)
step.open(currentTerm, context)
.onComplete(
(value, err) -> {
if (err != null) {
Expand All @@ -95,7 +104,7 @@ private void installPartition(
future.completeExceptionally(err);
} else {
openedSteps.add(step);
installPartition(future, steps);
installPartition(currentTerm, future, steps);
}
});
}
Expand Down
@@ -0,0 +1,59 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.broker.system.partitions.impl.steps;

import io.zeebe.broker.system.partitions.PartitionContext;
import io.zeebe.broker.system.partitions.PartitionStep;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorage;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;

public class AtomixLogStoragePartitionStep implements PartitionStep {
private static final String WRONG_TERM_ERROR_MSG =
"Expected that current term '%d' is same as raft term '%d', but was not. Failing installation of 'AtomixLogStoragePartitionStep' on partition %d.";

@Override
public ActorFuture<Void> open(final long currentTerm, final PartitionContext context) {
final var openFuture = new CompletableActorFuture<Void>();
final var server = context.getRaftPartition().getServer();

final var appenderOptional = server.getAppender();
appenderOptional.ifPresentOrElse(
logAppender -> {
final var raftTerm = server.getTerm();

if (raftTerm != currentTerm) {
openFuture.completeExceptionally(
new IllegalStateException(
String.format(
WRONG_TERM_ERROR_MSG, currentTerm, raftTerm, context.getPartitionId())));
} else {
context.setAtomixLogStorage(
AtomixLogStorage.ofPartition(
context.getZeebeIndexMapping(), server::openReader, logAppender));
openFuture.complete(null);
}
},
() ->
openFuture.completeExceptionally(
new IllegalStateException("Not leader of partition " + context.getPartitionId())));

return openFuture;
}

@Override
public ActorFuture<Void> close(final PartitionContext context) {
context.setAtomixLogStorage(null);
return CompletableActorFuture.completed(null);
}

@Override
public String getName() {
return "AtomixLogStorage";
}
}
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.broker.system.partitions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void shouldInstallLeaderPartition() {
schedulerRule.workUntilDone();

// then
verify(transition).toLeader();
verify(transition).toLeader(1);
}

@Test
Expand Down Expand Up @@ -114,9 +115,9 @@ public void shouldStepDownAfterFailedLeaderTransition() throws InterruptedExcept
schedulerRule.submitActor(partition);
final CountDownLatch latch = new CountDownLatch(1);

when(transition.toLeader())
when(transition.toLeader(anyLong()))
.thenReturn(CompletableActorFuture.completedExceptionally(new Exception("expected")));
when(transition.toFollower())
when(transition.toFollower(anyLong()))
.then(
invocation -> {
latch.countDown();
Expand All @@ -137,9 +138,9 @@ public void shouldStepDownAfterFailedLeaderTransition() throws InterruptedExcept

// then
final InOrder order = inOrder(transition, raft);
order.verify(transition).toLeader();
order.verify(transition).toLeader(1);
order.verify(raft).stepDown();
order.verify(transition).toFollower();
order.verify(transition).toFollower(2);
}

@Test
Expand All @@ -149,7 +150,7 @@ public void shouldGoInactiveAfterFailedFollowerTransition() throws InterruptedEx
schedulerRule.submitActor(partition);
final CountDownLatch latch = new CountDownLatch(1);

when(transition.toFollower())
when(transition.toFollower(anyLong()))
.thenReturn(CompletableActorFuture.completedExceptionally(new Exception("expected")));
when(transition.toInactive())
.then(
Expand All @@ -166,26 +167,25 @@ public void shouldGoInactiveAfterFailedFollowerTransition() throws InterruptedEx
});

// when
partition.onNewRole(Role.FOLLOWER, 1);
schedulerRule.workUntilDone();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

// then
final InOrder order = inOrder(transition, raft);
order.verify(transition).toFollower();
order.verify(transition).toFollower(0L);
order.verify(raft).goInactive();
order.verify(transition).toInactive();
}

private static class NoopTransition implements PartitionTransition {

@Override
public ActorFuture<Void> toFollower() {
public ActorFuture<Void> toFollower(final long currentTerm) {
return CompletableActorFuture.completed(null);
}

@Override
public ActorFuture<Void> toLeader() {
public ActorFuture<Void> toLeader(final long currentTerm) {
return CompletableActorFuture.completed(null);
}

Expand Down
Expand Up @@ -65,8 +65,8 @@ public void shouldTransitionToAndCloseInSequence() {
// then
final InOrder inOrder = Mockito.inOrder(transition);
inOrder.verify(transition).toInactive();
inOrder.verify(transition).toLeader();
inOrder.verify(transition).toFollower();
inOrder.verify(transition).toLeader(1);
inOrder.verify(transition).toFollower(1);
inOrder.verify(transition).toInactive();
}
}

0 comments on commit 988adf7

Please sign in to comment.