Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<td>Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.</td>
</tr>
<tr>
<td><h5>job.autoscaler.scaling.report.interval</h5></td>
<td><h5>job.autoscaler.scaling.event.interval</h5></td>
<td style="word-wrap: break-word;">30 min</td>
<td>Duration</td>
<td>Time interval to resend the identical event</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
Expand Down Expand Up @@ -219,7 +220,7 @@ private boolean detectIneffectiveScaleUp(
INEFFECTIVE_SCALING,
message,
null,
null);
conf.get(SCALING_EVENT_INTERVAL));

if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,15 @@
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** Class responsible for executing scaling decisions. */
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
public static final String SCALING_SUMMARY_ENTRY =
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
"Recommended parallelism change:";
public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
@VisibleForTesting static final String SCALING_REPORT_REASON = "ScalingReport";

private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);

private final JobVertexScaler<KEY, Context> jobVertexScaler;
Expand Down Expand Up @@ -100,18 +92,11 @@ public boolean scaleResource(

updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);

var scalingEnabled = conf.get(SCALING_ENABLED);

var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
autoScalerEventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Normal,
SCALING_REPORT_REASON,
scalingReport,
"ScalingExecutor",
scalingEnabled ? null : conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
var scaleEnabled = conf.get(SCALING_ENABLED);
autoScalerEventHandler.handleScalingEvent(
context, scalingSummaries, scaleEnabled, conf.get(SCALING_EVENT_INTERVAL));

if (!scalingEnabled) {
if (!scaleEnabled) {
return false;
}

Expand All @@ -136,27 +121,6 @@ private void updateRecommendedParallelism(
scalingSummary.getNewParallelism())));
}

private static String scalingReport(
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
StringBuilder sb =
new StringBuilder(
scalingEnabled
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
scalingSummaries.forEach(
(v, s) ->
sb.append(
String.format(
SCALING_SUMMARY_ENTRY,
v,
s.getCurrentParallelism(),
s.getNewParallelism(),
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
return sb.toString();
}

protected static boolean allVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, ScalingSummary> scalingSummaries) {
Expand Down Expand Up @@ -190,7 +154,8 @@ protected static boolean allVerticesWithinUtilizationTarget(
return true;
}

private Map<JobVertexID, ScalingSummary> computeScalingSummary(
@VisibleForTesting
Map<JobVertexID, ScalingSummary> computeScalingSummary(
Context context,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,11 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");

public static final ConfigOption<Duration> SCALING_REPORT_INTERVAL =
autoScalerConfig("scaling.report.interval")
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1800))
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
.withDescription("Time interval to resend the identical event");

public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Map;

import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/**
* Handler for autoscaler events.
Expand All @@ -32,12 +39,17 @@
*/
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
String SCALING_SUMMARY_ENTRY =
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:";
String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
String SCALING_REPORT_REASON = "ScalingReport";
String SCALING_REPORT_KEY = "ScalingExecutor";

/**
* Handle the event.
*
* @param interval When interval is great than 0, events that repeat within the interval will be
* ignored.
* @param interval Define the interval to suppress duplicate events. No dedupe if null.
*/
void handleEvent(
Context context,
Expand All @@ -47,6 +59,50 @@ void handleEvent(
@Nullable String messageKey,
@Nullable Duration interval);

/**
* Handle scaling reports.
*
* @param interval Define the interval to suppress duplicate events.
* @param scaled Whether AutoScaler actually scaled the Flink job or just generate advice for
* scaling.
*/
default void handleScalingEvent(
Context context,
Map<JobVertexID, ScalingSummary> scalingSummaries,
boolean scaled,
Duration interval) {
// Provide default implementation without proper deduplication
var scalingReport = scalingReport(scalingSummaries, scaled);
handleEvent(
context,
Type.Normal,
SCALING_REPORT_REASON,
scalingReport,
SCALING_REPORT_KEY,
interval);
}

static String scalingReport(
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
StringBuilder sb =
new StringBuilder(
scalingEnabled
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
scalingSummaries.forEach(
(v, s) ->
sb.append(
String.format(
SCALING_SUMMARY_ENTRY,
v,
s.getCurrentParallelism(),
s.getNewParallelism(),
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
return sb.toString();
}

/** The type of the events. */
enum Type {
Normal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,39 @@ public void testSendingIneffectiveScalingEvents() {
assertThat(event.getMessage())
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(1, event.getCount());

// Repeat ineffective scale with default interval, no event is triggered
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(0, eventCollector.events.size());

// Repeat ineffective scale with postive interval, no event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800));
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(0, eventCollector.events.size());

// Ineffective scale with interval set to 0, an event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(1, eventCollector.events.size());
event = eventCollector.events.poll();
assertThat(event).isNotNull();
assertThat(event.getMessage())
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(2, event.getCount());
}

private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -151,19 +154,20 @@ public void testVertexesExclusionForScaling() throws Exception {

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWith0Interval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWithInterval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWithIntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled)
throws Exception {
testScalingEvents(scalingEnabled, null);
}

Expand All @@ -175,17 +179,13 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));

if (interval != null) {
conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, interval);
}

assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));

int expectedSize =
(interval == null || (!interval.isNegative() && !interval.isZero()))
&& !scalingEnabled
? 1
: 2;
int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2;
assertEquals(expectedSize, eventCollector.events.size());

TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> event;
Expand All @@ -208,9 +208,9 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
event.getMessage()
.contains(
scalingEnabled
? ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED
: ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED));
assertEquals(ScalingExecutor.SCALING_REPORT_REASON, event.getReason());
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED));
assertEquals(SCALING_REPORT_REASON, event.getReason());

metrics = Map.of(jobVertexID, evaluated(1, 110, 101));

Expand Down
Loading