Skip to content

Commit

Permalink
Moves rescale decision process back into state implementation
Browse files Browse the repository at this point in the history
We're planning to reuse the RescaleManager in WaitingForResources but the decision on whether a state transition should happen varies between the two states Executing (based on parallelism diff) and WaitingForResources (based on static info from JobGraph and JobInformation)
  • Loading branch information
XComp committed Jun 18, 2024
1 parent 2c102f8 commit 7ab6f11
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,8 @@ public void goToExecuting(
this,
userCodeClassLoader,
failureCollection,
DefaultRescaleManager.Factory.fromSettings(settings)));
DefaultRescaleManager.Factory.fromSettings(settings),
settings.getMinParallelismChangeForDesiredRescale()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -61,41 +57,30 @@ public class DefaultRescaleManager implements RescaleManager {
@VisibleForTesting final Duration scalingIntervalMin;
@VisibleForTesting @Nullable final Duration scalingIntervalMax;

@VisibleForTesting final int minParallelismChange;
private final RescalingController softRescalingController;
private final RescalingController hardRescalingController;

private final RescaleManager.Context rescaleContext;
private final VertexParallelism currentVertexParallelism;

private boolean rescaleScheduled = false;

DefaultRescaleManager(
Temporal initializationTime,
RescaleManager.Context rescaleContext,
VertexParallelism currentVertexParallelism,
Duration scalingIntervalMin,
@Nullable Duration scalingIntervalMax,
int minParallelismChange) {
@Nullable Duration scalingIntervalMax) {
this(
initializationTime,
Instant::now,
rescaleContext,
currentVertexParallelism,
scalingIntervalMin,
scalingIntervalMax,
minParallelismChange);
scalingIntervalMax);
}

@VisibleForTesting
DefaultRescaleManager(
Temporal initializationTime,
Supplier<Temporal> clock,
RescaleManager.Context rescaleContext,
VertexParallelism currentVertexParallelism,
Duration scalingIntervalMin,
@Nullable Duration scalingIntervalMax,
int minParallelismChange) {
@Nullable Duration scalingIntervalMax) {
this.initializationTime = initializationTime;
this.clock = clock;

Expand All @@ -106,12 +91,6 @@ public class DefaultRescaleManager implements RescaleManager {
this.scalingIntervalMax = scalingIntervalMax;

this.rescaleContext = rescaleContext;
this.currentVertexParallelism = currentVertexParallelism;

this.minParallelismChange = minParallelismChange;
this.softRescalingController =
new EnforceMinimalIncreaseRescalingController(minParallelismChange);
this.hardRescalingController = new EnforceParallelismChangeRescalingController();
}

@Override
Expand All @@ -130,51 +109,39 @@ private Duration timeSinceLastRescale() {

private void maybeRescale() {
rescaleScheduled = false;
if (shouldRescale(softRescalingController)) {
if (rescaleContext.hasDesiredResources()) {
LOG.info("Desired parallelism for job was reached: Rescaling will be triggered.");
rescaleContext.rescale();
} else if (scalingIntervalMax != null) {
LOG.info(
"The longer the pipeline runs, the more the (small) resource gain is worth the restarting time. "
+ "Last resource added does not meet the configured minimal parallelism change of {}. Forced rescaling will be triggered after {} if the resource is still there.",
minParallelismChange,
+ "Last resource added does not meet the configured minimal parallelism change. Forced rescaling will be triggered after {} if the resource is still there.",
scalingIntervalMax);

// reasoning for inconsistent scheduling:
// https://lists.apache.org/thread/m2w2xzfjpxlw63j0k7tfxfgs0rshhwwr
if (timeSinceLastRescale().compareTo(scalingIntervalMax) > 0) {
rescaleEvenIfDesiredParallelismCannotBeMatched();
rescaleWithSufficientResources();
} else {
rescaleContext.scheduleOperation(
this::rescaleEvenIfDesiredParallelismCannotBeMatched, scalingIntervalMax);
this::rescaleWithSufficientResources, scalingIntervalMax);
}
}
}

private void rescaleEvenIfDesiredParallelismCannotBeMatched() {
if (shouldRescale(hardRescalingController)) {
private void rescaleWithSufficientResources() {
if (rescaleContext.hasSufficientResources()) {
LOG.info(
"Resources for desired job parallelism couldn't be collected after {}: Rescaling will be enforced.",
scalingIntervalMax);
rescaleContext.rescale();
}
}

private boolean shouldRescale(RescalingController rescalingController) {
return rescaleContext
.getAvailableVertexParallelism()
.filter(
availableVertexParallelism ->
rescalingController.shouldRescale(
this.currentVertexParallelism, availableVertexParallelism))
.isPresent();
}

public static class Factory implements RescaleManager.Factory {

private final Duration scalingIntervalMin;
@Nullable private final Duration scalingIntervalMax;
private final int minParallelismChange;

/**
* Creates a {@code Factory} instance based on the {@link AdaptiveScheduler}'s {@code
Expand All @@ -183,33 +150,18 @@ public static class Factory implements RescaleManager.Factory {
public static Factory fromSettings(AdaptiveScheduler.Settings settings) {
// it's not ideal that we use a AdaptiveScheduler internal class here. We might want to
// change that as part of a more general alignment of the rescaling configuration.
return new Factory(
settings.getScalingIntervalMin(),
settings.getScalingIntervalMax(),
settings.getMinParallelismChangeForDesiredRescale());
return new Factory(settings.getScalingIntervalMin(), settings.getScalingIntervalMax());
}

private Factory(
Duration scalingIntervalMin,
@Nullable Duration scalingIntervalMax,
int minParallelismChange) {
private Factory(Duration scalingIntervalMin, @Nullable Duration scalingIntervalMax) {
this.scalingIntervalMin = scalingIntervalMin;
this.scalingIntervalMax = scalingIntervalMax;
this.minParallelismChange = minParallelismChange;
}

@Override
public DefaultRescaleManager create(
Context rescaleContext,
VertexParallelism currentVertexParallelism,
Instant lastRescale) {
public DefaultRescaleManager create(Context rescaleContext, Instant lastRescale) {
return new DefaultRescaleManager(
lastRescale,
rescaleContext,
currentVertexParallelism,
scalingIntervalMin,
scalingIntervalMax,
minParallelismChange);
lastRescale, rescaleContext, scalingIntervalMin, scalingIntervalMax);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.Preconditions;
Expand All @@ -57,6 +60,8 @@ class Executing extends StateWithExecutionGraph

private final Context context;

private final RescalingController sufficientResourcesController;
private final RescalingController desiredResourcesController;
private final RescaleManager rescaleManager;

Executing(
Expand All @@ -68,6 +73,7 @@ class Executing extends StateWithExecutionGraph
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
RescaleManager.Factory rescaleManagerFactory,
int minParallelismChangeForRescale,
Instant lastRescale) {
super(
context,
Expand All @@ -80,9 +86,11 @@ class Executing extends StateWithExecutionGraph
this.context = context;
Preconditions.checkState(
executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
this.rescaleManager =
rescaleManagerFactory.create(
this, extractCurrentVertexParallelism(executionGraph), lastRescale);

this.sufficientResourcesController = new EnforceParallelismChangeRescalingController();
this.desiredResourcesController =
new EnforceMinimalIncreaseRescalingController(minParallelismChangeForRescale);
this.rescaleManager = rescaleManagerFactory.create(this, lastRescale);

deploy();

Expand All @@ -91,13 +99,23 @@ class Executing extends StateWithExecutionGraph
}

@Override
public Optional<VertexParallelism> getAvailableVertexParallelism() {
return context.getAvailableVertexParallelism();
public boolean hasSufficientResources() {
return shouldRescale(sufficientResourcesController);
}

@Override
public void scheduleOperation(Runnable callback, Duration delay) {
context.runIfState(this, callback, delay);
public boolean hasDesiredResources() {
return shouldRescale(desiredResourcesController);
}

private boolean shouldRescale(RescalingController rescalingController) {
return context.getAvailableVertexParallelism()
.filter(
availableVertexParallelism ->
rescalingController.shouldRescale(
extractCurrentVertexParallelism(getExecutionGraph()),
availableVertexParallelism))
.isPresent();
}

private static VertexParallelism extractCurrentVertexParallelism(
Expand All @@ -110,6 +128,11 @@ private static VertexParallelism extractCurrentVertexParallelism(
AccessExecutionJobVertex::getParallelism)));
}

@Override
public void scheduleOperation(Runnable callback, Duration delay) {
context.runIfState(this, callback, delay);
}

@Override
public void rescale() {
context.goToRestarting(
Expand Down Expand Up @@ -256,6 +279,7 @@ static class Factory implements StateFactory<Executing> {
private final ClassLoader userCodeClassLoader;
private final List<ExceptionHistoryEntry> failureCollection;
private final RescaleManager.Factory rescaleManagerFactory;
private final int minParallelismChangeForRescale;

Factory(
ExecutionGraph executionGraph,
Expand All @@ -265,7 +289,8 @@ static class Factory implements StateFactory<Executing> {
Context context,
ClassLoader userCodeClassLoader,
List<ExceptionHistoryEntry> failureCollection,
RescaleManager.Factory rescaleManagerFactory) {
RescaleManager.Factory rescaleManagerFactory,
int minParallelismChangeForRescale) {
this.context = context;
this.log = log;
this.executionGraph = executionGraph;
Expand All @@ -274,6 +299,7 @@ static class Factory implements StateFactory<Executing> {
this.userCodeClassLoader = userCodeClassLoader;
this.failureCollection = failureCollection;
this.rescaleManagerFactory = rescaleManagerFactory;
this.minParallelismChangeForRescale = minParallelismChangeForRescale;
}

public Class<Executing> getStateClass() {
Expand All @@ -290,6 +316,7 @@ public Executing getState() {
userCodeClassLoader,
failureCollection,
rescaleManagerFactory,
minParallelismChangeForRescale,
Instant.now());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

/** The {@code RescaleManager} decides on whether rescaling should happen or not. */
public interface RescaleManager {
Expand All @@ -37,10 +34,20 @@ public interface RescaleManager {
interface Context {

/**
* Returns the {@link VertexParallelism} that could be achieved by the currently available
* free slots for the job or an empty {@code Optional} if there are not enough free slots.
* Returns {@code true} if the available resources are sufficient enough for a state
* transition; otherwise {@code false}.
*
* @see #hasDesiredResources()
*/
boolean hasSufficientResources();

/**
* Returns {@code true} if the available resources meet the desired resources for the job;
* otherwise {@code false}.
*
* @see #hasSufficientResources()
*/
Optional<VertexParallelism> getAvailableVertexParallelism();
boolean hasDesiredResources();

/** Triggers the rescaling of the job. */
void rescale();
Expand All @@ -56,9 +63,6 @@ interface Factory {
* Creates a {@code RescaleManager} instance for the given {@code rescaleContext} and
* previous rescale time.
*/
RescaleManager create(
Context rescaleContext,
VertexParallelism currentVertexParallelism,
Instant lastRescale);
RescaleManager create(Context rescaleContext, Instant lastRescale);
}
}
Loading

0 comments on commit 7ab6f11

Please sign in to comment.