Skip to content

Commit

Permalink
refactor: apply codestyle
Browse files Browse the repository at this point in the history
(cherry picked from commit 4cd49f3)
  • Loading branch information
Zelldon authored and github-actions[bot] committed Sep 24, 2021
1 parent 5f2dd6a commit a988182
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 190 deletions.
Expand Up @@ -94,103 +94,6 @@ public PartitionAdminAccess createAdminAccess() {
return new ZeebePartitionAdminAccess(actor, adminControl);
}

/**
* Called by atomix on role change.
*
* @param newRole the new role of the raft partition
*/
@Override
@Deprecated // will be removed from public API of ZeebePartition
public void onNewRole(final Role newRole, final long newTerm) {
actor.run(() -> onRoleChange(newRole, newTerm));
}

private void onRoleChange(final Role newRole, final long newTerm) {
ActorFuture<Void> nextTransitionFuture = null;
switch (newRole) {
case LEADER:
if (raftRole != Role.LEADER) {
nextTransitionFuture = leaderTransition(newTerm);
}
break;
case INACTIVE:
nextTransitionFuture = transitionToInactive();
break;
case PASSIVE:
case PROMOTABLE:
case CANDIDATE:
case FOLLOWER:
default:
if (raftRole == null || raftRole == Role.LEADER) {
nextTransitionFuture = followerTransition(newTerm);
}
break;
}

if (nextTransitionFuture != null) {
currentTransitionFuture = nextTransitionFuture;
}
LOG.debug("Partition role transitioning from {} to {} in term {}", raftRole, newRole, newTerm);
raftRole = newRole;
}

private ActorFuture<Void> leaderTransition(final long newTerm) {
final var installStartTime = ActorClock.currentTimeMillis();
final var leaderTransitionFuture = transition.toLeader(newTerm);
leaderTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
final var leaderTransitionLatency = ActorClock.currentTimeMillis() - installStartTime;
roleMetrics.setLeaderTransitionLatency(leaderTransitionLatency);
final List<ActorFuture<Void>> listenerFutures =
context.notifyListenersOfBecomingLeader(newTerm);
actor.runOnCompletion(
listenerFutures,
t -> {
if (t != null) {
onInstallFailure(t);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install leader partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
return leaderTransitionFuture;
}

private ActorFuture<Void> followerTransition(final long newTerm) {
final var followerTransitionFuture = transition.toFollower(newTerm);
followerTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
final List<ActorFuture<Void>> listenerFutures =
context.notifyListenersOfBecomingFollower(newTerm);
actor.runOnCompletion(
listenerFutures,
t -> {
// Compare with the current term in case a new role transition happened
if (t != null) {
onInstallFailure(t);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install follower partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
return followerTransitionFuture;
}

private ActorFuture<Void> transitionToInactive() {
zeebePartitionHealth.setServicesInstalled(false);
final var inactiveTransitionFuture = transition.toInactive();
currentTransitionFuture = inactiveTransitionFuture;
return inactiveTransitionFuture;
}

@Override
public String getName() {
return actorName;
Expand Down Expand Up @@ -302,6 +205,103 @@ protected void handleFailure(final Exception failure) {
onInstallFailure(failure);
}

/**
* Called by atomix on role change.
*
* @param newRole the new role of the raft partition
*/
@Override
@Deprecated // will be removed from public API of ZeebePartition
public void onNewRole(final Role newRole, final long newTerm) {
actor.run(() -> onRoleChange(newRole, newTerm));
}

private void onRoleChange(final Role newRole, final long newTerm) {
ActorFuture<Void> nextTransitionFuture = null;
switch (newRole) {
case LEADER:
if (raftRole != Role.LEADER) {
nextTransitionFuture = leaderTransition(newTerm);
}
break;
case INACTIVE:
nextTransitionFuture = transitionToInactive();
break;
case PASSIVE:
case PROMOTABLE:
case CANDIDATE:
case FOLLOWER:
default:
if (raftRole == null || raftRole == Role.LEADER) {
nextTransitionFuture = followerTransition(newTerm);
}
break;
}

if (nextTransitionFuture != null) {
currentTransitionFuture = nextTransitionFuture;
}
LOG.debug("Partition role transitioning from {} to {} in term {}", raftRole, newRole, newTerm);
raftRole = newRole;
}

private ActorFuture<Void> leaderTransition(final long newTerm) {
final var installStartTime = ActorClock.currentTimeMillis();
final var leaderTransitionFuture = transition.toLeader(newTerm);
leaderTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
final var leaderTransitionLatency = ActorClock.currentTimeMillis() - installStartTime;
roleMetrics.setLeaderTransitionLatency(leaderTransitionLatency);
final List<ActorFuture<Void>> listenerFutures =
context.notifyListenersOfBecomingLeader(newTerm);
actor.runOnCompletion(
listenerFutures,
t -> {
if (t != null) {
onInstallFailure(t);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install leader partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
return leaderTransitionFuture;
}

private ActorFuture<Void> followerTransition(final long newTerm) {
final var followerTransitionFuture = transition.toFollower(newTerm);
followerTransitionFuture.onComplete(
(success, error) -> {
if (error == null) {
final List<ActorFuture<Void>> listenerFutures =
context.notifyListenersOfBecomingFollower(newTerm);
actor.runOnCompletion(
listenerFutures,
t -> {
// Compare with the current term in case a new role transition happened
if (t != null) {
onInstallFailure(t);
}
});
onRecoveredInternal();
} else {
LOG.error("Failed to install follower partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
return followerTransitionFuture;
}

private ActorFuture<Void> transitionToInactive() {
zeebePartitionHealth.setServicesInstalled(false);
final var inactiveTransitionFuture = transition.toInactive();
currentTransitionFuture = inactiveTransitionFuture;
return inactiveTransitionFuture;
}

private void registerListenersAndTriggerRoleChange() {
context.getRaftPartition().addRoleChangeListener(this);
context.getComponentHealthMonitor().addFailureListener(this);
Expand Down
Expand Up @@ -77,6 +77,46 @@ private AsyncSnapshotDirector(
}
}

@Override
public String getName() {
return actorName;
}

@Override
protected void onActorStarting() {
actor.setSchedulingHints(SchedulingHints.ioBound());
final var firstSnapshotTime =
RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, snapshotRate);
actor.runDelayed(firstSnapshotTime, this::scheduleSnapshotOnRate);

lastWrittenEventPosition = null;
}

@Override
public ActorFuture<Void> closeAsync() {
if (actor.isClosed()) {
return CompletableActorFuture.completed(null);
}

return super.closeAsync();
}

@Override
protected void handleFailure(final Exception failure) {
LOG.error(
"No snapshot was taken due to failure in '{}'. Will try to take snapshot after snapshot period {}. {}",
actorName,
snapshotRate,
failure);

resetStateOnFailure();
healthStatus = HealthStatus.UNHEALTHY;

for (final var listener : listeners) {
listener.onFailure();
}
}

/**
* Create an AsyncSnapshotDirector that can take snapshot when the Streamprocessor is in
* continuous replay mode.
Expand Down Expand Up @@ -129,46 +169,6 @@ public static AsyncSnapshotDirector ofProcessingMode(
StreamProcessorMode.PROCESSING);
}

@Override
public String getName() {
return actorName;
}

@Override
protected void onActorStarting() {
actor.setSchedulingHints(SchedulingHints.ioBound());
final var firstSnapshotTime =
RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, snapshotRate);
actor.runDelayed(firstSnapshotTime, this::scheduleSnapshotOnRate);

lastWrittenEventPosition = null;
}

@Override
public ActorFuture<Void> closeAsync() {
if (actor.isClosed()) {
return CompletableActorFuture.completed(null);
}

return super.closeAsync();
}

@Override
protected void handleFailure(final Exception failure) {
LOG.error(
"No snapshot was taken due to failure in '{}'. Will try to take snapshot after snapshot period {}. {}",
actorName,
snapshotRate,
failure);

resetStateOnFailure();
healthStatus = HealthStatus.UNHEALTHY;

for (final var listener : listeners) {
listener.onFailure();
}
}

private void scheduleSnapshotOnRate() {
actor.runAtFixedRate(snapshotRate, this::prepareTakingSnapshot);
prepareTakingSnapshot();
Expand Down

0 comments on commit a988182

Please sign in to comment.