From 10ed8e7f8f34d2e61f51f1d37102124223cc60fa Mon Sep 17 00:00:00 2001 From: Clara Xiong Date: Wed, 18 Oct 2023 00:51:33 -0700 Subject: [PATCH 1/3] [FLINK-33187] using hashcode for parallelism map comparison review feedback --- .../flink/autoscaler/JobAutoScalerImpl.java | 7 +- .../flink/autoscaler/JobVertexScaler.java | 7 +- .../flink/autoscaler/ScalingExecutor.java | 46 +-- .../event/AutoScalerEventHandler.java | 58 +++- .../flink/autoscaler/ScalingExecutorTest.java | 11 +- .../event/TestingEventCollector.java | 32 +-- .../KubernetesAutoScalerEventHandler.java | 90 ++++-- .../operator/utils/EventRecorder.java | 17 +- .../kubernetes/operator/utils/EventUtils.java | 68 ++--- .../KubernetesAutoScalerEventHandlerTest.java | 268 ++++++++++++++++++ .../operator/utils/EventUtilsTest.java | 172 +++++------ 11 files changed, 547 insertions(+), 229 deletions(-) create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 3196a3de26..11fc8facb9 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -194,12 +194,7 @@ private void onError(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics, Thro LOG.error("Error while scaling job", e); autoscalerMetrics.incrementError(); eventHandler.handleEvent( - ctx, - AutoScalerEventHandler.Type.Warning, - AUTOSCALER_ERROR, - e.getMessage(), - null, - null); + ctx, AutoScalerEventHandler.Type.Warning, AUTOSCALER_ERROR, e.getMessage(), null); } private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context ctx) { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 9bc46b7e57..6be8c47ad0 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -214,12 +214,7 @@ private boolean detectIneffectiveScaleUp( var message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex); autoScalerEventHandler.handleEvent( - context, - AutoScalerEventHandler.Type.Normal, - INEFFECTIVE_SCALING, - message, - null, - null); + context, AutoScalerEventHandler.Type.Normal, INEFFECTIVE_SCALING, message, null); if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { LOG.warn( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index 406b17d734..c9b35ed1d1 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -40,21 +40,12 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; -import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ public class ScalingExecutor> { - public static final String SCALING_SUMMARY_ENTRY = - " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f"; - public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED = - "Recommended parallelism change:"; - public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:"; - @VisibleForTesting static final String SCALING_REPORT_REASON = "ScalingReport"; - private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class); private final JobVertexScaler jobVertexScaler; @@ -100,18 +91,9 @@ public boolean scaleResource( updateRecommendedParallelism(evaluatedMetrics, scalingSummaries); - var scalingEnabled = conf.get(SCALING_ENABLED); - - var scalingReport = scalingReport(scalingSummaries, scalingEnabled); - autoScalerEventHandler.handleEvent( - context, - AutoScalerEventHandler.Type.Normal, - SCALING_REPORT_REASON, - scalingReport, - "ScalingExecutor", - scalingEnabled ? null : conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL)); + autoScalerEventHandler.handleScalingEvent(context, scalingSummaries); - if (!scalingEnabled) { + if (!conf.get(SCALING_ENABLED)) { return false; } @@ -136,27 +118,6 @@ private void updateRecommendedParallelism( scalingSummary.getNewParallelism()))); } - private static String scalingReport( - Map scalingSummaries, boolean scalingEnabled) { - StringBuilder sb = - new StringBuilder( - scalingEnabled - ? SCALING_SUMMARY_HEADER_SCALING_ENABLED - : SCALING_SUMMARY_HEADER_SCALING_DISABLED); - scalingSummaries.forEach( - (v, s) -> - sb.append( - String.format( - SCALING_SUMMARY_ENTRY, - v, - s.getCurrentParallelism(), - s.getNewParallelism(), - s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(), - s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(), - s.getMetrics().get(TARGET_DATA_RATE).getAverage()))); - return sb.toString(); - } - protected static boolean allVerticesWithinUtilizationTarget( Map> evaluatedMetrics, Map scalingSummaries) { @@ -190,7 +151,8 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } - private Map computeScalingSummary( + @VisibleForTesting + Map computeScalingSummary( Context context, Map> evaluatedMetrics, Map> scalingHistory) { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index a5a0edfefe..a0d971cddc 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -19,10 +19,17 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; -import java.time.Duration; +import java.util.Map; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** * Handler for autoscaler events. @@ -32,20 +39,45 @@ */ @Experimental public interface AutoScalerEventHandler> { + String SCALING_SUMMARY_ENTRY = + " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f"; + String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:"; + String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:"; + String SCALING_REPORT_REASON = "ScalingReport"; + String EVENT_MESSAGE_KEY = "ScalingExecutor"; - /** - * Handle the event. - * - * @param interval When interval is great than 0, events that repeat within the interval will be - * ignored. - */ + /** Handle the event. */ void handleEvent( - Context context, - Type type, - String reason, - String message, - @Nullable String messageKey, - @Nullable Duration interval); + Context context, Type type, String reason, String message, @Nullable String messageKey); + + default void handleScalingEvent( + Context context, Map scalingSummaries) { + // Provide default implementation without proper deduplication + var scalingReport = + scalingReport(scalingSummaries, context.getConfiguration().get(SCALING_ENABLED)); + handleEvent(context, Type.Normal, SCALING_REPORT_REASON, scalingReport, EVENT_MESSAGE_KEY); + } + + static String scalingReport( + Map scalingSummaries, boolean scalingEnabled) { + StringBuilder sb = + new StringBuilder( + scalingEnabled + ? SCALING_SUMMARY_HEADER_SCALING_ENABLED + : SCALING_SUMMARY_HEADER_SCALING_DISABLED); + scalingSummaries.forEach( + (v, s) -> + sb.append( + String.format( + SCALING_SUMMARY_ENTRY, + v, + s.getCurrentParallelism(), + s.getNewParallelism(), + s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(), + s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(), + s.getMetrics().get(TARGET_DATA_RATE).getAverage()))); + return sb.toString(); + } /** The type of the events. */ enum Type { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 36fb6e31db..3135766fd5 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -37,8 +37,11 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.flink.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY; import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON; +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY; +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_DISABLED; +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -208,9 +211,9 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws event.getMessage() .contains( scalingEnabled - ? ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED - : ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED)); - assertEquals(ScalingExecutor.SCALING_REPORT_REASON, event.getReason()); + ? SCALING_SUMMARY_HEADER_SCALING_ENABLED + : SCALING_SUMMARY_HEADER_SCALING_DISABLED)); + assertEquals(SCALING_REPORT_REASON, event.getReason()); metrics = Map.of(jobVertexID, evaluated(1, 110, 101)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java index bd742cde5e..39367283bc 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java @@ -20,16 +20,19 @@ import org.apache.flink.autoscaler.JobAutoScalerContext; import lombok.Getter; +import lombok.Setter; import javax.annotation.Nullable; -import java.time.Duration; import java.time.Instant; import java.util.LinkedList; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_REPORT_INTERVAL; + /** Testing {@link AutoScalerEventHandler} implementation. */ public class TestingEventCollector> implements AutoScalerEventHandler { @@ -44,18 +47,19 @@ public void handleEvent( Type type, String reason, String message, - @Nullable String messageKey, - @Nullable Duration interval) { + @Nullable String messageKey) { String eventKey = generateEventKey(context, type, reason, messageKey != null ? messageKey : message); Event event = eventMap.get(eventKey); + var interval = context.getConfiguration().get(SCALING_REPORT_INTERVAL); + var scaleEnabled = context.getConfiguration().get(SCALING_ENABLED); if (event == null) { - Event newEvent = new Event<>(context, type, reason, message, messageKey); + Event newEvent = new Event<>(context, reason, message, messageKey); events.add(newEvent); eventMap.put(eventKey, newEvent); return; - } - if (Objects.equals(event.getMessage(), message) + } else if (!scaleEnabled + && Objects.equals(event.getMessage(), message) && interval != null && Instant.now() .isBefore(event.getLastUpdateTimestamp().plusMillis(interval.toMillis()))) { @@ -63,6 +67,8 @@ public void handleEvent( return; } event.incrementCount(); + event.setMessage(message); + event.setLastUpdateTimestamp(Instant.now()); events.add(event); } @@ -73,29 +79,21 @@ private String generateEventKey(Context context, Type type, String reason, Strin /** The collected event. */ public static class Event> { - @Getter private Instant lastUpdateTimestamp; + @Getter @Setter private Instant lastUpdateTimestamp; @Getter private final Context context; - @Getter private final Type type; - @Getter private final String reason; - @Getter private final String message; + @Getter @Setter private String message; @Getter @Nullable private final String messageKey; @Getter private int count; - public Event( - Context context, - Type type, - String reason, - String message, - @Nullable String messageKey) { + public Event(Context context, String reason, String message, @Nullable String messageKey) { this.lastUpdateTimestamp = Instant.now(); this.context = context; - this.type = type; this.reason = reason; this.message = message; this.messageKey = messageKey; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java index 54fe84ae25..74e2ef9b1d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java @@ -17,19 +17,29 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.runtime.jobgraph.JobVertexID; import io.javaoperatorsdk.operator.processing.event.ResourceID; import javax.annotation.Nullable; -import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiPredicate; +import java.util.stream.Collectors; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_REPORT_INTERVAL; /** An event handler which posts events to the Kubernetes events API. */ public class KubernetesAutoScalerEventHandler implements AutoScalerEventHandler { + public static final String PARALLELISM_MAP_KEY = "parallelismMap"; private final EventRecorder eventRecorder; public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) { @@ -42,27 +52,71 @@ public void handleEvent( Type type, String reason, String message, - @Nullable String messageKey, - @Nullable Duration interval) { - if (interval == null) { - eventRecorder.triggerEvent( - context.getResource(), - EventRecorder.Type.valueOf(type.name()), - reason, - message, - EventRecorder.Component.Operator, - messageKey, - context.getKubernetesClient()); + @Nullable String messageKey) { + eventRecorder.triggerEvent( + context.getResource(), + EventRecorder.Type.valueOf(type.name()), + reason, + message, + EventRecorder.Component.Operator, + messageKey, + context.getKubernetesClient()); + } + + @Override + public void handleScalingEvent( + KubernetesJobAutoScalerContext context, + Map scalingSummaries) { + var scalingEnabled = context.getConfiguration().get(SCALING_ENABLED); + if (scalingEnabled) { + AutoScalerEventHandler.super.handleScalingEvent(context, scalingSummaries); } else { - eventRecorder.triggerEventByInterval( + var conf = context.getConfiguration(); + var scalingReport = + AutoScalerEventHandler.scalingReport(scalingSummaries, scalingEnabled); + var labels = Map.of(PARALLELISM_MAP_KEY, getParallelismHashCode(scalingSummaries)); + var interval = context.getConfiguration().get(SCALING_REPORT_INTERVAL); + + @Nullable + BiPredicate, Instant> suppressionPredicate = + new BiPredicate, Instant>() { + @Override + public boolean test(Map stringStringMap, Instant instant) { + return Instant.now().isBefore(instant.plusMillis(interval.toMillis())) + && stringStringMap != null + && Objects.equals( + stringStringMap.get(PARALLELISM_MAP_KEY), + getParallelismHashCode(scalingSummaries)); + } + }; + + eventRecorder.triggerEventWithLabels( context.getResource(), - EventRecorder.Type.valueOf(type.name()), - reason, + EventRecorder.Type.Normal, + AutoScalerEventHandler.SCALING_REPORT_REASON, + scalingReport, EventRecorder.Component.Operator, - message, - messageKey, + AutoScalerEventHandler.EVENT_MESSAGE_KEY, context.getKubernetesClient(), - interval); + suppressionPredicate, + labels); } } + + private static String getParallelismHashCode( + Map scalingSummaryHashMap) { + return Integer.toString( + scalingSummaryHashMap.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey().toString(), + e -> + String.format( + "Parallelism %d -> %d", + e.getValue() + .getCurrentParallelism(), + e.getValue().getNewParallelism()))) + .hashCode() + & 0x7FFFFFFF); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index f29b3a0051..595c6d681e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -26,11 +26,14 @@ import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.client.KubernetesClient; +import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.time.Duration; +import java.time.Instant; import java.util.Collection; +import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BiPredicate; /** Helper class for creating Kubernetes events for Flink resources. */ public class EventRecorder { @@ -113,16 +116,17 @@ public boolean triggerEventOnce( messageKey); } - public boolean triggerEventByInterval( + public boolean triggerEventWithLabels( AbstractFlinkResource resource, Type type, String reason, - Component component, String message, + Component component, @Nullable String messageKey, KubernetesClient client, - Duration interval) { - return EventUtils.createByInterval( + @Nonnull BiPredicate, Instant> suppressionPredicate, + @Nonnull Map labels) { + return EventUtils.createOrUpdateEventWithLabels( client, resource, type, @@ -131,7 +135,8 @@ public boolean triggerEventByInterval( component, e -> eventListener.accept(resource, e), messageKey, - interval); + suppressionPredicate, + labels); } public boolean triggerEvent( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java index dfdbdd9e79..e7c8adf348 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java @@ -20,14 +20,15 @@ import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import javax.annotation.Nullable; -import java.time.Duration; import java.time.Instant; -import java.util.Objects; +import java.util.Map; +import java.util.function.BiPredicate; import java.util.function.Consumer; /** @@ -63,7 +64,7 @@ public static boolean createOrUpdateEvent( EventRecorder.Component component, Consumer eventListener, @Nullable String messageKey) { - return createByInterval( + return createOrUpdateEventWithLabels( client, target, type, @@ -72,10 +73,11 @@ public static boolean createOrUpdateEvent( component, eventListener, messageKey, - Duration.ofSeconds(0)); + null, + Map.of()); } - private static Event findExistingEvent( + public static Event findExistingEvent( KubernetesClient client, HasMetadata target, String eventName) { return client.v1() .events() @@ -102,13 +104,13 @@ public static boolean createIfNotExists( if (existing != null) { return false; } else { - createNewEvent( - client, target, type, reason, message, component, eventListener, eventName); + Event event = buildEvent(target, type, reason, message, component, eventName); + eventListener.accept(client.resource(event).createOrReplace()); return true; } } - public static boolean createByInterval( + public static boolean createOrUpdateEventWithLabels( KubernetesClient client, HasMetadata target, EventRecorder.Type type, @@ -117,53 +119,53 @@ public static boolean createByInterval( EventRecorder.Component component, Consumer eventListener, @Nullable String messageKey, - Duration interval) { - + @Nullable BiPredicate, Instant> suppressionPredicate, + @Nullable Map labels) { String eventName = generateEventName( target, type, reason, messageKey != null ? messageKey : message, component); Event existing = findExistingEvent(client, target, eventName); if (existing != null) { - if (Objects.equals(existing.getMessage(), message) - && Instant.now() - .isBefore( - Instant.parse(existing.getLastTimestamp()) - .plusMillis(interval.toMillis()))) { - return false; - } else { - createUpdatedEvent(existing, client, message, eventListener); + if (suppressionPredicate != null + && existing.getMetadata() != null + && suppressionPredicate.test( + existing.getMetadata().getLabels(), + Instant.parse(existing.getLastTimestamp()))) { return false; } + updatedEventWithLabels(existing, client, message, eventListener, labels); + return false; } else { - createNewEvent( - client, target, type, reason, message, component, eventListener, eventName); + Event event = buildEvent(target, type, reason, message, component, eventName); + setLabels(event, labels); + eventListener.accept(client.resource(event).createOrReplace()); return true; } } - private static void createUpdatedEvent( + private static void updatedEventWithLabels( Event existing, KubernetesClient client, String message, - Consumer eventListener) { + Consumer eventListener, + @Nullable Map labels) { existing.setLastTimestamp(Instant.now().toString()); existing.setCount(existing.getCount() + 1); existing.setMessage(message); + setLabels(existing, labels); eventListener.accept(client.resource(existing).createOrReplace()); } - private static void createNewEvent( - KubernetesClient client, - HasMetadata target, - EventRecorder.Type type, - String reason, - String message, - EventRecorder.Component component, - Consumer eventListener, - String eventName) { - Event event = buildEvent(target, type, reason, message, component, eventName); - eventListener.accept(client.resource(event).createOrReplace()); + private static void setLabels(Event existing, @Nullable Map labels) { + if (existing.getMetadata() == null) { + var metaData = new ObjectMeta(); + metaData.setLabels(labels); + } else if (existing.getMetadata().getLabels() == null) { + existing.getMetadata().setLabels(labels); + } else { + existing.getMetadata().setLabels(labels); + } } private static Event buildEvent( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java new file mode 100644 index 0000000000..a0fbe77b51 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import io.fabric8.kubernetes.api.model.Event; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; +import java.util.Map; + +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY; +import static org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerEventHandler.PARALLELISM_MAP_KEY; +import static org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link KubernetesAutoScalerStateStore}. */ +@EnableKubernetesMockClient(crud = true) +public class KubernetesAutoScalerEventHandlerTest { + + private KubernetesClient kubernetesClient; + + private KubernetesAutoScalerEventHandler eventHandler; + + private KubernetesJobAutoScalerContext ctx; + + ConfigMapStore configMapStore; + + KubernetesAutoScalerStateStore stateStore; + + private EventCollector eventCollector; + + @BeforeEach + void setup() { + eventCollector = new EventCollector(); + var eventRecorder = new EventRecorder(eventCollector); + ctx = createContext("cr1", kubernetesClient); + eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder); + stateStore = new KubernetesAutoScalerStateStore(configMapStore); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHandleScalingEventsWith0Interval(boolean scalingEnabled) { + testHandleScalingEvents(scalingEnabled, Duration.ofSeconds(0)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHandleScalingEventsWithInterval(boolean scalingEnabled) { + testHandleScalingEvents(scalingEnabled, Duration.ofSeconds(1800)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHandleScalingEventsWithDefaultInterval(boolean scalingEnabled) { + testHandleScalingEvents(scalingEnabled, null); + } + + private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) { + var jobVertexID = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2"); + var conf = ctx.getConfiguration(); + conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled); + if (interval != null) { + conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval); + } + + var evaluatedScalingMetric = new EvaluatedScalingMetric(); + evaluatedScalingMetric.setAverage(1); + evaluatedScalingMetric.setCurrent(2); + Map scalingSummaries1 = + Map.of( + jobVertexID, + new ScalingSummary( + 1, + 2, + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.EXPECTED_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.TARGET_DATA_RATE, + evaluatedScalingMetric))); + + eventHandler.handleScalingEvent(ctx, scalingSummaries1); + var event = eventCollector.events.poll(); + assertTrue( + event.getMessage() + .contains( + String.format( + SCALING_SUMMARY_ENTRY, + jobVertexID, + 1, + 2, + 1.00, + 2.00, + 1.00))); + + assertEquals(EventRecorder.Reason.ScalingReport.name(), event.getReason()); + assertEquals( + scalingEnabled ? null : "1286380436", + event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); + assertEquals(1, event.getCount()); + + // Parallelism map doesn't change. + eventHandler.handleScalingEvent(ctx, scalingSummaries1); + Event newEvent; + if ((interval == null || (!interval.isNegative() && !interval.isZero())) + && !scalingEnabled) { + assertEquals(0, eventCollector.events.size()); + } else { + assertEquals(1, eventCollector.events.size()); + newEvent = eventCollector.events.poll(); + assertEquals(event.getMetadata().getUid(), newEvent.getMetadata().getUid()); + assertEquals(2, newEvent.getCount()); + } + + // Parallelism map changes. New recommendation + Map scalingSummaries2 = + Map.of( + jobVertexID, + new ScalingSummary( + 1, + 3, + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.EXPECTED_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.TARGET_DATA_RATE, + evaluatedScalingMetric))); + eventHandler.handleScalingEvent(ctx, scalingSummaries2); + + assertEquals(1, eventCollector.events.size()); + + newEvent = eventCollector.events.poll(); + assertEquals(event.getMetadata().getUid(), newEvent.getMetadata().getUid()); + assertEquals( + (interval == null || (!interval.isNegative() && !interval.isZero())) + && !scalingEnabled + ? 2 + : 3, + newEvent.getCount()); + + // Parallelism map doesn't change but metrics changed. + evaluatedScalingMetric.setCurrent(3); + Map scalingSummaries3 = + Map.of( + jobVertexID, + new ScalingSummary( + 1, + 3, + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.EXPECTED_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.TARGET_DATA_RATE, + evaluatedScalingMetric))); + eventHandler.handleScalingEvent(ctx, scalingSummaries2); + + if ((interval == null || (!interval.isNegative() && !interval.isZero())) + && !scalingEnabled) { + assertEquals(0, eventCollector.events.size()); + } else { + assertEquals(1, eventCollector.events.size()); + newEvent = eventCollector.events.poll(); + assertEquals(4, newEvent.getCount()); + } + } + + @Test + public void testSwitchingScalingEnabled() { + var jobVertexID = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2"); + var evaluatedScalingMetric = new EvaluatedScalingMetric(); + evaluatedScalingMetric.setAverage(1); + evaluatedScalingMetric.setCurrent(2); + Map scalingSummaries1 = + Map.of( + jobVertexID, + new ScalingSummary( + 1, + 2, + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.EXPECTED_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.TARGET_DATA_RATE, + evaluatedScalingMetric))); + + ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, true); + eventHandler.handleScalingEvent(ctx, scalingSummaries1); + var event = eventCollector.events.poll(); + assertEquals(null, event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); + assertEquals(1, event.getCount()); + + // Get recommendation event even parallelism map doesn't change and within supression + // interval + ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, false); + eventHandler.handleScalingEvent(ctx, scalingSummaries1); + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertTrue( + event.getMessage() + .contains( + String.format( + SCALING_SUMMARY_ENTRY, + jobVertexID, + 1, + 2, + 1.00, + 2.00, + 1.00))); + + assertEquals("1286380436", event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); + assertEquals(2, event.getCount()); + + // Get recommendation event even parallelism map doesn't change and within supression + // interval + ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, true); + eventHandler.handleScalingEvent(ctx, scalingSummaries1); + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertTrue( + event.getMessage() + .contains( + String.format( + SCALING_SUMMARY_ENTRY, + jobVertexID, + 1, + 2, + 1.00, + 2.00, + 1.00))); + + assertEquals(null, event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); + assertEquals(3, event.getCount()); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java index c58b020b35..311ddfa7a9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java @@ -26,7 +26,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.time.Duration; +import javax.annotation.Nullable; + +import java.time.Instant; +import java.util.Map; +import java.util.function.BiPredicate; import java.util.function.Consumer; /** Test for {@link EventUtils}. */ @@ -113,7 +117,7 @@ public void accept(Event event) { } @Test - public void testCreateUpdatedEvent() { + public void testCreateWithMessageKey() { var consumer = new Consumer() { @Override @@ -123,25 +127,24 @@ public void accept(Event event) { }; var flinkApp = TestUtils.buildApplicationCluster(); var reason = "Cleanup"; - var message = "message"; var eventName = EventUtils.generateEventName( flinkApp, EventRecorder.Type.Warning, reason, - message, + "mk", EventRecorder.Component.Operator); + Assertions.assertTrue( - EventUtils.createByInterval( + EventUtils.createOrUpdateEvent( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - message, + "message1", EventRecorder.Component.Operator, consumer, - null, - Duration.ofSeconds(1800))); + "mk")); var event = kubernetesClient .v1() @@ -150,22 +153,20 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals(eventConsumed, event); + Assertions.assertEquals("message1", event.getMessage()); Assertions.assertEquals(1, event.getCount()); - Assertions.assertEquals(reason, event.getReason()); - eventConsumed = null; Assertions.assertFalse( - EventUtils.createByInterval( + EventUtils.createOrUpdateEvent( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - message, + "message2", EventRecorder.Component.Operator, consumer, - null, - Duration.ofSeconds(1800))); + "mk")); + event = kubernetesClient .v1() @@ -174,24 +175,12 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertNull(eventConsumed); - Assertions.assertEquals(1, event.getCount()); - - Assertions.assertTrue( - EventUtils.createByInterval( - kubernetesClient, - flinkApp, - EventRecorder.Type.Warning, - reason, - null, - EventRecorder.Component.Operator, - consumer, - null, - Duration.ofSeconds(1800))); + Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals(2, event.getCount()); } @Test - public void testCreateWithMessageKey() { + public void testCreateWithLabels() { var consumer = new Consumer() { @Override @@ -199,6 +188,15 @@ public void accept(Event event) { eventConsumed = event; } }; + @Nullable + BiPredicate, Instant> suppressionPredicate = + new BiPredicate, Instant>() { + @Override + public boolean test(Map stringStringMap, Instant instant) { + return true; + } + }; + var flinkApp = TestUtils.buildApplicationCluster(); var reason = "Cleanup"; var eventName = @@ -209,16 +207,31 @@ public void accept(Event event) { "mk", EventRecorder.Component.Operator); + // Set up an event with empty labels Assertions.assertTrue( - EventUtils.createOrUpdateEvent( + EventUtils.createIfNotExists( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message1", + "message", EventRecorder.Component.Operator, consumer, "mk")); + + // Update the event with label + Assertions.assertFalse( + EventUtils.createOrUpdateEventWithLabels( + kubernetesClient, + flinkApp, + EventRecorder.Type.Warning, + reason, + "message1", + EventRecorder.Component.Operator, + consumer, + "mk", + null, + Map.of("a", "b"))); var event = kubernetesClient .v1() @@ -228,19 +241,22 @@ public void accept(Event event) { .get(); Assertions.assertNotNull(event); Assertions.assertEquals("message1", event.getMessage()); - Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(2, event.getCount()); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), "b"); + // Suppress event Assertions.assertFalse( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message2", + "message1", EventRecorder.Component.Operator, consumer, - "mk")); - + "mk", + suppressionPredicate, + Map.of())); event = kubernetesClient .v1() @@ -249,42 +265,25 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals("message1", event.getMessage()); Assertions.assertEquals(2, event.getCount()); - } - - @Test - public void testCreateByIntervalWithMessageKey() { - var consumer = - new Consumer() { - @Override - public void accept(Event event) { - eventConsumed = event; - } - }; - var flinkApp = TestUtils.buildApplicationCluster(); - var reason = "Cleanup"; - var eventName = - EventUtils.generateEventName( - flinkApp, - EventRecorder.Type.Warning, - reason, - "mk", - EventRecorder.Component.Operator); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), "b"); - eventConsumed = null; - Assertions.assertTrue( - EventUtils.createByInterval( + // Update the event with empty label + Assertions.assertFalse( + EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message1", + "message2", EventRecorder.Component.Operator, consumer, "mk", - Duration.ofSeconds(1800))); - var event = + null, + Map.of())); + + event = kubernetesClient .v1() .events() @@ -292,22 +291,23 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals(eventConsumed, event); - Assertions.assertEquals("message1", event.getMessage()); - Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals(3, event.getCount()); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); - eventConsumed = null; + // Update the event with null label Assertions.assertFalse( - EventUtils.createByInterval( + EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message2", + "message4", EventRecorder.Component.Operator, consumer, "mk", - Duration.ofSeconds(1800))); + null, + null)); event = kubernetesClient @@ -317,13 +317,13 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals(eventConsumed, event); - Assertions.assertEquals("message2", event.getMessage()); - Assertions.assertEquals(2, event.getCount()); + Assertions.assertEquals("message4", event.getMessage()); + Assertions.assertEquals(4, event.getCount()); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); - eventConsumed = null; + // Suppress the event Assertions.assertFalse( - EventUtils.createByInterval( + EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -332,7 +332,8 @@ public void accept(Event event) { EventRecorder.Component.Operator, consumer, "mk", - Duration.ofSeconds(1800))); + suppressionPredicate, + Map.of("a", "b"))); event = kubernetesClient @@ -342,20 +343,23 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertNull(eventConsumed); + Assertions.assertEquals("message4", event.getMessage()); + Assertions.assertEquals(4, event.getCount()); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); - eventConsumed = null; + // Create a new event Assertions.assertTrue( - EventUtils.createByInterval( + EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message2", + "message1", EventRecorder.Component.Operator, consumer, "mk2", - Duration.ofSeconds(1800))); + suppressionPredicate, + Map.of("a", "b"))); eventName = EventUtils.generateEventName( flinkApp, @@ -371,9 +375,9 @@ public void accept(Event event) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals(eventConsumed, event); - Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals("message1", event.getMessage()); Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(event.getMetadata().getLabels().get("a"), "b"); } @Test From ee8d932415a3a271c19381d0ad0edf125caa6f68 Mon Sep 17 00:00:00 2001 From: Clara Xiong Date: Fri, 20 Oct 2023 11:29:12 -0700 Subject: [PATCH 2/3] review feedback --- .../generated/auto_scaler_configuration.html | 2 +- .../flink/autoscaler/JobAutoScalerImpl.java | 7 +- .../flink/autoscaler/JobVertexScaler.java | 8 +- .../flink/autoscaler/ScalingExecutor.java | 7 +- .../autoscaler/config/AutoScalerOptions.java | 4 +- .../event/AutoScalerEventHandler.java | 40 +++- .../flink/autoscaler/JobVertexScalerTest.java | 33 +++ .../flink/autoscaler/ScalingExecutorTest.java | 15 +- .../event/TestingEventCollector.java | 12 +- .../KubernetesAutoScalerEventHandler.java | 43 ++-- .../AbstractFlinkResourceReconciler.java | 2 +- .../operator/utils/EventRecorder.java | 55 ++++- .../kubernetes/operator/utils/EventUtils.java | 44 ++-- .../KubernetesAutoScalerEventHandlerTest.java | 118 +++++++--- .../operator/utils/EventUtilsTest.java | 205 +++++++++++------- 15 files changed, 415 insertions(+), 180 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 0221b56741..09aee96a78 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -117,7 +117,7 @@ Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs. -
job.autoscaler.scaling.report.interval
+
job.autoscaler.event.report.interval
30 min Duration Time interval to resend the identical event diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 11fc8facb9..3196a3de26 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -194,7 +194,12 @@ private void onError(Context ctx, AutoscalerFlinkMetrics autoscalerMetrics, Thro LOG.error("Error while scaling job", e); autoscalerMetrics.incrementError(); eventHandler.handleEvent( - ctx, AutoScalerEventHandler.Type.Warning, AUTOSCALER_ERROR, e.getMessage(), null); + ctx, + AutoScalerEventHandler.Type.Warning, + AUTOSCALER_ERROR, + e.getMessage(), + null, + null); } private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context ctx) { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 6be8c47ad0..aba6ac6749 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -39,6 +39,7 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; @@ -214,7 +215,12 @@ private boolean detectIneffectiveScaleUp( var message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex); autoScalerEventHandler.handleEvent( - context, AutoScalerEventHandler.Type.Normal, INEFFECTIVE_SCALING, message, null); + context, + AutoScalerEventHandler.Type.Normal, + INEFFECTIVE_SCALING, + message, + null, + conf.get(SCALING_EVENT_INTERVAL)); if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { LOG.warn( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index c9b35ed1d1..0e1a47b8e7 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -38,6 +38,7 @@ import java.util.SortedMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; @@ -91,9 +92,11 @@ public boolean scaleResource( updateRecommendedParallelism(evaluatedMetrics, scalingSummaries); - autoScalerEventHandler.handleScalingEvent(context, scalingSummaries); + var scaleEnabled = conf.get(SCALING_ENABLED); + autoScalerEventHandler.handleScalingEvent( + context, scalingSummaries, scaleEnabled, conf.get(SCALING_EVENT_INTERVAL)); - if (!conf.get(SCALING_ENABLED)) { + if (!scaleEnabled) { return false; } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 626921bcb6..576037a48d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -231,8 +231,8 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); - public static final ConfigOption SCALING_REPORT_INTERVAL = - autoScalerConfig("scaling.report.interval") + public static final ConfigOption SCALING_EVENT_INTERVAL = + autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofSeconds(1800)) .withDescription("Time interval to resend the identical event"); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index a0d971cddc..5c49f9c550 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -24,9 +24,9 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Map; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; @@ -44,18 +44,42 @@ public interface AutoScalerEventHandler scalingSummaries) { + Context context, + Map scalingSummaries, + boolean scaled, + Duration interval) { // Provide default implementation without proper deduplication - var scalingReport = - scalingReport(scalingSummaries, context.getConfiguration().get(SCALING_ENABLED)); - handleEvent(context, Type.Normal, SCALING_REPORT_REASON, scalingReport, EVENT_MESSAGE_KEY); + var scalingReport = scalingReport(scalingSummaries, scaled); + handleEvent( + context, + Type.Normal, + SCALING_REPORT_REASON, + scalingReport, + SCALING_REPORT_KEY, + interval); } static String scalingReport( diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index d70db971dd..1b01087c8a 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -358,6 +358,39 @@ public void testSendingIneffectiveScalingEvents() { assertThat(event.getMessage()) .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID)); assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); + assertEquals(1, event.getCount()); + + // Repeat ineffective scale with default interval, no event is triggered + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism( + context, jobVertexID, evaluated, history)); + assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); + assertEquals(0, eventCollector.events.size()); + + // Repeat ineffective scale with postive interval, no event is triggered + conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800)); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism( + context, jobVertexID, evaluated, history)); + assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); + assertEquals(0, eventCollector.events.size()); + + // Ineffective scale with interval set to 0, an event is triggered + conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism( + context, jobVertexID, evaluated, history)); + assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertThat(event).isNotNull(); + assertThat(event.getMessage()) + .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID)); + assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); + assertEquals(2, event.getCount()); } private Map evaluated( diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 3135766fd5..bae6f36b2a 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -154,19 +154,20 @@ public void testVertexesExclusionForScaling() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testScalingEventsWith0Interval(boolean scalingEnabled) throws Exception { + public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) throws Exception { testScalingEvents(scalingEnabled, Duration.ofSeconds(0)); } @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testScalingEventsWithInterval(boolean scalingEnabled) throws Exception { + public void testScalingEventsWithIntervalConfig(boolean scalingEnabled) throws Exception { testScalingEvents(scalingEnabled, Duration.ofSeconds(1800)); } @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) throws Exception { + public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled) + throws Exception { testScalingEvents(scalingEnabled, null); } @@ -178,17 +179,13 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws var metrics = Map.of(jobVertexID, evaluated(1, 110, 100)); if (interval != null) { - conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval); + conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, interval); } assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics)); assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics)); - int expectedSize = - (interval == null || (!interval.isNegative() && !interval.isZero())) - && !scalingEnabled - ? 1 - : 2; + int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2; assertEquals(expectedSize, eventCollector.events.size()); TestingEventCollector.Event> event; diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java index 39367283bc..485f9c7e22 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.LinkedList; import java.util.Map; @@ -31,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_REPORT_INTERVAL; /** Testing {@link AutoScalerEventHandler} implementation. */ public class TestingEventCollector> @@ -47,19 +47,19 @@ public void handleEvent( Type type, String reason, String message, - @Nullable String messageKey) { + @Nullable String messageKey, + Duration interval) { String eventKey = generateEventKey(context, type, reason, messageKey != null ? messageKey : message); Event event = eventMap.get(eventKey); - var interval = context.getConfiguration().get(SCALING_REPORT_INTERVAL); - var scaleEnabled = context.getConfiguration().get(SCALING_ENABLED); + var scaled = context.getConfiguration().get(SCALING_ENABLED); if (event == null) { Event newEvent = new Event<>(context, reason, message, messageKey); events.add(newEvent); eventMap.put(eventKey, newEvent); return; - } else if (!scaleEnabled - && Objects.equals(event.getMessage(), message) + } else if (((!scaled && Objects.equals(event.getMessage(), message)) + || !Objects.equals(reason, SCALING_REPORT_REASON)) && interval != null && Instant.now() .isBefore(event.getLastUpdateTimestamp().plusMillis(interval.toMillis()))) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java index 74e2ef9b1d..d83e2db4d0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java @@ -26,15 +26,12 @@ import javax.annotation.Nullable; -import java.time.Instant; +import java.time.Duration; import java.util.Map; import java.util.Objects; -import java.util.function.BiPredicate; +import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_REPORT_INTERVAL; - /** An event handler which posts events to the Kubernetes events API. */ public class KubernetesAutoScalerEventHandler implements AutoScalerEventHandler { @@ -52,38 +49,39 @@ public void handleEvent( Type type, String reason, String message, - @Nullable String messageKey) { - eventRecorder.triggerEvent( + @Nullable String messageKey, + @Nullable Duration interval) { + eventRecorder.triggerEventWithInterval( context.getResource(), EventRecorder.Type.valueOf(type.name()), reason, message, EventRecorder.Component.Operator, messageKey, - context.getKubernetesClient()); + context.getKubernetesClient(), + interval); } @Override public void handleScalingEvent( KubernetesJobAutoScalerContext context, - Map scalingSummaries) { - var scalingEnabled = context.getConfiguration().get(SCALING_ENABLED); - if (scalingEnabled) { - AutoScalerEventHandler.super.handleScalingEvent(context, scalingSummaries); + Map scalingSummaries, + boolean scaled, + Duration interval) { + if (scaled) { + AutoScalerEventHandler.super.handleScalingEvent( + context, scalingSummaries, scaled, null); } else { var conf = context.getConfiguration(); - var scalingReport = - AutoScalerEventHandler.scalingReport(scalingSummaries, scalingEnabled); + var scalingReport = AutoScalerEventHandler.scalingReport(scalingSummaries, scaled); var labels = Map.of(PARALLELISM_MAP_KEY, getParallelismHashCode(scalingSummaries)); - var interval = context.getConfiguration().get(SCALING_REPORT_INTERVAL); @Nullable - BiPredicate, Instant> suppressionPredicate = - new BiPredicate, Instant>() { + Predicate> dedupePredicate = + new Predicate>() { @Override - public boolean test(Map stringStringMap, Instant instant) { - return Instant.now().isBefore(instant.plusMillis(interval.toMillis())) - && stringStringMap != null + public boolean test(Map stringStringMap) { + return stringStringMap != null && Objects.equals( stringStringMap.get(PARALLELISM_MAP_KEY), getParallelismHashCode(scalingSummaries)); @@ -96,9 +94,10 @@ public boolean test(Map stringStringMap, Instant instant) { AutoScalerEventHandler.SCALING_REPORT_REASON, scalingReport, EventRecorder.Component.Operator, - AutoScalerEventHandler.EVENT_MESSAGE_KEY, + AutoScalerEventHandler.SCALING_REPORT_KEY, context.getKubernetesClient(), - suppressionPredicate, + interval, + dedupePredicate, labels); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 6f39582a4d..dfe98a8968 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -203,8 +203,8 @@ private void triggerSpecChangeEvent(CR cr, DiffResult specDiff, Kubernetes cr, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, - EventRecorder.Component.JobManagerDeployment, String.format(MSG_SPEC_CHANGED, specDiff.getType(), specDiff), + EventRecorder.Component.JobManagerDeployment, "SpecChange: " + cr.getMetadata().getGeneration(), client); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index 595c6d681e..e99c8ef9d2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -26,14 +26,13 @@ import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.client.KubernetesClient; -import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.time.Instant; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.BiPredicate; +import java.util.function.Predicate; /** Helper class for creating Kubernetes events for Flink resources. */ public class EventRecorder { @@ -51,15 +50,15 @@ public boolean triggerEvent( Component component, String message, KubernetesClient client) { - return triggerEvent(resource, type, reason, component, message, null, client); + return triggerEvent(resource, type, reason, message, component, null, client); } public boolean triggerEventOnce( AbstractFlinkResource resource, Type type, Reason reason, - Component component, String message, + Component component, String messageKey, KubernetesClient client) { return triggerEventOnce( @@ -70,8 +69,8 @@ public boolean triggerEvent( AbstractFlinkResource resource, Type type, Reason reason, - Component component, String message, + Component component, @Nullable String messageKey, KubernetesClient client) { return triggerEvent( @@ -86,7 +85,7 @@ public boolean triggerEvent( Component component, String messageKey, KubernetesClient client) { - return EventUtils.createOrUpdateEvent( + return EventUtils.createOrUpdateEventWithInterval( client, resource, type, @@ -94,7 +93,33 @@ public boolean triggerEvent( message, component, e -> eventListener.accept(resource, e), - messageKey); + messageKey, + null); + } + + /** + * @param interval Interval for dedupe. Null mean no dedupe. + * @return + */ + public boolean triggerEventWithInterval( + AbstractFlinkResource resource, + Type type, + String reason, + String message, + Component component, + String messageKey, + KubernetesClient client, + @Nullable Duration interval) { + return EventUtils.createOrUpdateEventWithInterval( + client, + resource, + type, + reason, + message, + component, + e -> eventListener.accept(resource, e), + messageKey, + interval); } public boolean triggerEventOnce( @@ -116,6 +141,12 @@ public boolean triggerEventOnce( messageKey); } + /** + * @param interval Interval for dedupe. Null mean no dedupe. + * @param dedupePredicate Predicate for dedupe algorithm.. + * @param labels Labels to store in meta data for dedupe. Do nothing if null. + * @return + */ public boolean triggerEventWithLabels( AbstractFlinkResource resource, Type type, @@ -124,8 +155,9 @@ public boolean triggerEventWithLabels( Component component, @Nullable String messageKey, KubernetesClient client, - @Nonnull BiPredicate, Instant> suppressionPredicate, - @Nonnull Map labels) { + @Nullable Duration interval, + @Nullable Predicate> dedupePredicate, + @Nullable Map labels) { return EventUtils.createOrUpdateEventWithLabels( client, resource, @@ -135,7 +167,8 @@ public boolean triggerEventWithLabels( component, e -> eventListener.accept(resource, e), messageKey, - suppressionPredicate, + interval, + dedupePredicate, labels); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java index e7c8adf348..ef32cd5915 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java @@ -26,10 +26,11 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.Map; -import java.util.function.BiPredicate; import java.util.function.Consumer; +import java.util.function.Predicate; /** * The util to generate an event for the target resource. It is copied from @@ -55,7 +56,7 @@ public static String generateEventName( & 0x7FFFFFFF); } - public static boolean createOrUpdateEvent( + public static boolean createOrUpdateEventWithInterval( KubernetesClient client, HasMetadata target, EventRecorder.Type type, @@ -63,7 +64,8 @@ public static boolean createOrUpdateEvent( String message, EventRecorder.Component component, Consumer eventListener, - @Nullable String messageKey) { + @Nullable String messageKey, + @Nullable Duration interval) { return createOrUpdateEventWithLabels( client, target, @@ -73,6 +75,7 @@ public static boolean createOrUpdateEvent( component, eventListener, messageKey, + interval, null, Map.of()); } @@ -119,7 +122,8 @@ public static boolean createOrUpdateEventWithLabels( EventRecorder.Component component, Consumer eventListener, @Nullable String messageKey, - @Nullable BiPredicate, Instant> suppressionPredicate, + @Nullable Duration interval, + @Nullable Predicate> dedupePredicate, @Nullable Map labels) { String eventName = generateEventName( @@ -127,11 +131,7 @@ public static boolean createOrUpdateEventWithLabels( Event existing = findExistingEvent(client, target, eventName); if (existing != null) { - if (suppressionPredicate != null - && existing.getMetadata() != null - && suppressionPredicate.test( - existing.getMetadata().getLabels(), - Instant.parse(existing.getLastTimestamp()))) { + if (labelCheck(existing, dedupePredicate) && intervalCheck(existing, interval)) { return false; } updatedEventWithLabels(existing, client, message, eventListener, labels); @@ -158,14 +158,13 @@ private static void updatedEventWithLabels( } private static void setLabels(Event existing, @Nullable Map labels) { + if (labels == null) { + return; + } if (existing.getMetadata() == null) { - var metaData = new ObjectMeta(); - metaData.setLabels(labels); - } else if (existing.getMetadata().getLabels() == null) { - existing.getMetadata().setLabels(labels); - } else { - existing.getMetadata().setLabels(labels); + existing.setMetadata(new ObjectMeta()); } + existing.getMetadata().setLabels(labels); } private static Event buildEvent( @@ -199,4 +198,19 @@ private static Event buildEvent( .endMetadata() .build(); } + + private static boolean intervalCheck(Event existing, @Nullable Duration interval) { + return interval != null + && Instant.now() + .isBefore( + Instant.parse(existing.getLastTimestamp()) + .plusMillis(interval.toMillis())); + } + + private static boolean labelCheck( + Event existing, Predicate> dedupePredicate) { + return dedupePredicate == null + || (existing.getMetadata() != null + && dedupePredicate.test(existing.getMetadata().getLabels())); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java index a0fbe77b51..bed6a6ab2c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java @@ -18,7 +18,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.autoscaler.ScalingSummary; -import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -67,6 +67,82 @@ void setup() { stateStore = new KubernetesAutoScalerStateStore(configMapStore); } + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + void testHandEventsWithNoMessageKey(String intervalString) { + testHandEvents(intervalString, null); + } + + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + void testHandEventsWithMessageKey(String intervalString) { + testHandEvents(intervalString, "key"); + } + + private void testHandEvents(String intervalString, String messageKey) { + Duration interval = + intervalString.isBlank() ? null : Duration.ofSeconds(Long.valueOf(intervalString)); + var jobVertexID = new JobVertexID(); + + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + EventRecorder.Reason.IneffectiveScaling.name(), + "message", + messageKey, + interval); + var event = eventCollector.events.poll(); + assertEquals(EventRecorder.Reason.IneffectiveScaling.name(), event.getReason()); + assertEquals(1, event.getCount()); + + // Resend + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + EventRecorder.Reason.IneffectiveScaling.name(), + "message", + messageKey, + interval); + if (interval != null && interval.toMillis() > 0) { + assertEquals(0, eventCollector.events.size()); + } else { + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertEquals("message", event.getMessage()); + assertEquals(2, event.getCount()); + } + + // Message changed + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + EventRecorder.Reason.IneffectiveScaling.name(), + "message1", + messageKey, + interval); + if (messageKey != null && interval != null && interval.toMillis() > 0) { + assertEquals(0, eventCollector.events.size()); + } else { + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertEquals("message1", event.getMessage()); + assertEquals(messageKey == null ? 1 : 3, event.getCount()); + } + + // Message key changed + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + EventRecorder.Reason.IneffectiveScaling.name(), + "message1", + "newKey", + interval); + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertEquals("message1", event.getMessage()); + assertEquals(1, event.getCount()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHandleScalingEventsWith0Interval(boolean scalingEnabled) { @@ -81,17 +157,12 @@ public void testHandleScalingEventsWithInterval(boolean scalingEnabled) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testHandleScalingEventsWithDefaultInterval(boolean scalingEnabled) { + public void testHandleScalingEventsWithNullInterval(boolean scalingEnabled) { testHandleScalingEvents(scalingEnabled, null); } - private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) { + private void testHandleScalingEvents(boolean scaled, Duration interval) { var jobVertexID = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2"); - var conf = ctx.getConfiguration(); - conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled); - if (interval != null) { - conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval); - } var evaluatedScalingMetric = new EvaluatedScalingMetric(); evaluatedScalingMetric.setAverage(1); @@ -110,7 +181,7 @@ private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric))); - eventHandler.handleScalingEvent(ctx, scalingSummaries1); + eventHandler.handleScalingEvent(ctx, scalingSummaries1, scaled, interval); var event = eventCollector.events.poll(); assertTrue( event.getMessage() @@ -126,15 +197,14 @@ private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) assertEquals(EventRecorder.Reason.ScalingReport.name(), event.getReason()); assertEquals( - scalingEnabled ? null : "1286380436", + scaled ? null : "1286380436", event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); assertEquals(1, event.getCount()); // Parallelism map doesn't change. - eventHandler.handleScalingEvent(ctx, scalingSummaries1); + eventHandler.handleScalingEvent(ctx, scalingSummaries1, scaled, interval); Event newEvent; - if ((interval == null || (!interval.isNegative() && !interval.isZero())) - && !scalingEnabled) { + if (interval != null && interval.toMillis() > 0 && !scaled) { assertEquals(0, eventCollector.events.size()); } else { assertEquals(1, eventCollector.events.size()); @@ -157,17 +227,14 @@ private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric))); - eventHandler.handleScalingEvent(ctx, scalingSummaries2); + eventHandler.handleScalingEvent(ctx, scalingSummaries2, scaled, interval); assertEquals(1, eventCollector.events.size()); newEvent = eventCollector.events.poll(); assertEquals(event.getMetadata().getUid(), newEvent.getMetadata().getUid()); assertEquals( - (interval == null || (!interval.isNegative() && !interval.isZero())) - && !scalingEnabled - ? 2 - : 3, + interval != null && interval.toMillis() > 0 && !scaled ? 2 : 3, newEvent.getCount()); // Parallelism map doesn't change but metrics changed. @@ -185,10 +252,9 @@ private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric))); - eventHandler.handleScalingEvent(ctx, scalingSummaries2); + eventHandler.handleScalingEvent(ctx, scalingSummaries2, scaled, interval); - if ((interval == null || (!interval.isNegative() && !interval.isZero())) - && !scalingEnabled) { + if (interval != null && interval.toMillis() > 0 && !scaled) { assertEquals(0, eventCollector.events.size()); } else { assertEquals(1, eventCollector.events.size()); @@ -201,6 +267,7 @@ private void testHandleScalingEvents(boolean scalingEnabled, Duration interval) public void testSwitchingScalingEnabled() { var jobVertexID = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2"); var evaluatedScalingMetric = new EvaluatedScalingMetric(); + var interval = Duration.ofSeconds(1800); evaluatedScalingMetric.setAverage(1); evaluatedScalingMetric.setCurrent(2); Map scalingSummaries1 = @@ -217,16 +284,14 @@ public void testSwitchingScalingEnabled() { ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric))); - ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, true); - eventHandler.handleScalingEvent(ctx, scalingSummaries1); + eventHandler.handleScalingEvent(ctx, scalingSummaries1, true, interval); var event = eventCollector.events.poll(); assertEquals(null, event.getMetadata().getLabels().get(PARALLELISM_MAP_KEY)); assertEquals(1, event.getCount()); // Get recommendation event even parallelism map doesn't change and within supression // interval - ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, false); - eventHandler.handleScalingEvent(ctx, scalingSummaries1); + eventHandler.handleScalingEvent(ctx, scalingSummaries1, false, interval); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); assertTrue( @@ -246,8 +311,7 @@ public void testSwitchingScalingEnabled() { // Get recommendation event even parallelism map doesn't change and within supression // interval - ctx.getConfiguration().set(AutoScalerOptions.SCALING_ENABLED, true); - eventHandler.handleScalingEvent(ctx, scalingSummaries1); + eventHandler.handleScalingEvent(ctx, scalingSummaries1, true, interval); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); assertTrue( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java index 311ddfa7a9..24a4c99939 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java @@ -25,13 +25,15 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; -import java.time.Instant; +import java.time.Duration; import java.util.Map; -import java.util.function.BiPredicate; import java.util.function.Consumer; +import java.util.function.Predicate; /** Test for {@link EventUtils}. */ @EnableKubernetesMockClient(crud = true) @@ -61,7 +63,7 @@ public void accept(Event event) { message, EventRecorder.Component.Operator); Assertions.assertTrue( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithInterval( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -69,6 +71,7 @@ public void accept(Event event) { message, EventRecorder.Component.Operator, consumer, + null, null)); var event = kubernetesClient @@ -84,7 +87,7 @@ public void accept(Event event) { eventConsumed = null; Assertions.assertFalse( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithInterval( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -92,7 +95,9 @@ public void accept(Event event) { message, EventRecorder.Component.Operator, consumer, + null, null)); + event = kubernetesClient .v1() @@ -105,7 +110,7 @@ public void accept(Event event) { Assertions.assertEquals(2, event.getCount()); Assertions.assertTrue( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithInterval( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -113,11 +118,15 @@ public void accept(Event event) { null, EventRecorder.Component.Operator, consumer, + null, null)); } - @Test - public void testCreateWithMessageKey() { + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + public void testCreateWithInterval(String intervalString) { + Duration interval = + intervalString.isBlank() ? null : Duration.ofSeconds(Long.valueOf(intervalString)); var consumer = new Consumer() { @Override @@ -136,7 +145,7 @@ public void accept(Event event) { EventRecorder.Component.Operator); Assertions.assertTrue( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithInterval( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -144,7 +153,8 @@ public void accept(Event event) { "message1", EventRecorder.Component.Operator, consumer, - "mk")); + "mk", + interval)); var event = kubernetesClient .v1() @@ -157,7 +167,7 @@ public void accept(Event event) { Assertions.assertEquals(1, event.getCount()); Assertions.assertFalse( - EventUtils.createOrUpdateEvent( + EventUtils.createOrUpdateEventWithInterval( kubernetesClient, flinkApp, EventRecorder.Type.Warning, @@ -165,7 +175,8 @@ public void accept(Event event) { "message2", EventRecorder.Component.Operator, consumer, - "mk")); + "mk", + null)); event = kubernetesClient @@ -179,24 +190,51 @@ public void accept(Event event) { Assertions.assertEquals(2, event.getCount()); } - @Test - public void testCreateWithLabels() { - var consumer = - new Consumer() { + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + public void testCreateWithLabelsAndAllTruePredicate(String intervalString) { + @Nullable + Predicate> dedupePredicate = + new Predicate>() { @Override - public void accept(Event event) { - eventConsumed = event; + public boolean test(Map stringStringMap) { + return true; } }; + testCreateWithIntervalLabelsAndPredicate(intervalString, dedupePredicate); + } + + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + public void testCreateWithLabelsAndAllFalsePredicate(String intervalString) { @Nullable - BiPredicate, Instant> suppressionPredicate = - new BiPredicate, Instant>() { + Predicate> dedupePredicate = + new Predicate>() { @Override - public boolean test(Map stringStringMap, Instant instant) { - return true; + public boolean test(Map stringStringMap) { + return false; } }; + testCreateWithIntervalLabelsAndPredicate(intervalString, dedupePredicate); + } + @ParameterizedTest + @ValueSource(strings = {"", "0", "1800"}) + public void testCreateWithLabelsAndNullPredicate(String intervalString) { + testCreateWithIntervalLabelsAndPredicate(intervalString, null); + } + + private void testCreateWithIntervalLabelsAndPredicate( + String intervalString, @Nullable Predicate> dedupePredicate) { + Duration interval = + intervalString.isBlank() ? null : Duration.ofSeconds(Long.valueOf(intervalString)); + var consumer = + new Consumer() { + @Override + public void accept(Event event) { + eventConsumed = event; + } + }; var flinkApp = TestUtils.buildApplicationCluster(); var reason = "Cleanup"; var eventName = @@ -219,7 +257,8 @@ public boolean test(Map stringStringMap, Instant instant) { consumer, "mk")); - // Update the event with label + // Update the event with label. + var labels = Map.of("a", "b"); Assertions.assertFalse( EventUtils.createOrUpdateEventWithLabels( kubernetesClient, @@ -230,8 +269,9 @@ public boolean test(Map stringStringMap, Instant instant) { EventRecorder.Component.Operator, consumer, "mk", - null, - Map.of("a", "b"))); + interval, + dedupePredicate, + labels)); var event = kubernetesClient .v1() @@ -240,23 +280,33 @@ public boolean test(Map stringStringMap, Instant instant) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals("message1", event.getMessage()); - Assertions.assertEquals(2, event.getCount()); - Assertions.assertEquals(event.getMetadata().getLabels().get("a"), "b"); + if ((dedupePredicate == null || (dedupePredicate.test(labels))) + && interval != null + && interval.toMillis() > 0) { + Assertions.assertEquals("message", event.getMessage()); + Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } else { + Assertions.assertEquals("message1", event.getMessage()); + Assertions.assertEquals(2, event.getCount()); + Assertions.assertEquals("b", event.getMetadata().getLabels().get("a")); + } - // Suppress event + // Update with duplicate labels. Assertions.assertFalse( EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message1", + "message2", EventRecorder.Component.Operator, consumer, "mk", - suppressionPredicate, - Map.of())); + interval, + dedupePredicate, + labels)); + event = kubernetesClient .v1() @@ -265,24 +315,33 @@ public boolean test(Map stringStringMap, Instant instant) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals("message1", event.getMessage()); - Assertions.assertEquals(2, event.getCount()); - Assertions.assertEquals(event.getMetadata().getLabels().get("a"), "b"); + if ((dedupePredicate == null || (dedupePredicate.test(labels))) + && interval != null + && interval.toMillis() > 0) { + Assertions.assertEquals("message", event.getMessage()); + Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } else { + Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals(3, event.getCount()); + Assertions.assertEquals("b", event.getMetadata().getLabels().get("a")); + } - // Update the event with empty label + // Update with empty label. + labels = Map.of(); Assertions.assertFalse( EventUtils.createOrUpdateEventWithLabels( kubernetesClient, flinkApp, EventRecorder.Type.Warning, reason, - "message2", + "message3", EventRecorder.Component.Operator, consumer, "mk", - null, - Map.of())); - + interval, + dedupePredicate, + labels)); event = kubernetesClient .v1() @@ -291,9 +350,17 @@ public boolean test(Map stringStringMap, Instant instant) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals("message2", event.getMessage()); - Assertions.assertEquals(3, event.getCount()); - Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); + if ((dedupePredicate == null || (dedupePredicate.test(labels))) + && interval != null + && interval.toMillis() > 0) { + Assertions.assertEquals("message", event.getMessage()); + Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } else { + Assertions.assertEquals("message3", event.getMessage()); + Assertions.assertEquals(4, event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } // Update the event with null label Assertions.assertFalse( @@ -306,7 +373,8 @@ public boolean test(Map stringStringMap, Instant instant) { EventRecorder.Component.Operator, consumer, "mk", - null, + interval, + dedupePredicate, null)); event = @@ -317,36 +385,24 @@ public boolean test(Map stringStringMap, Instant instant) { .withName(eventName) .get(); Assertions.assertNotNull(event); - Assertions.assertEquals("message4", event.getMessage()); - Assertions.assertEquals(4, event.getCount()); - Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); - - // Suppress the event - Assertions.assertFalse( - EventUtils.createOrUpdateEventWithLabels( - kubernetesClient, - flinkApp, - EventRecorder.Type.Warning, - reason, - "message2", - EventRecorder.Component.Operator, - consumer, - "mk", - suppressionPredicate, - Map.of("a", "b"))); - - event = - kubernetesClient - .v1() - .events() - .inNamespace(flinkApp.getMetadata().getNamespace()) - .withName(eventName) - .get(); - Assertions.assertNotNull(event); - Assertions.assertEquals("message4", event.getMessage()); - Assertions.assertEquals(4, event.getCount()); - Assertions.assertEquals(event.getMetadata().getLabels().get("a"), null); - + if ((dedupePredicate == null || (dedupePredicate.test(labels))) + && interval != null + && interval.toMillis() > 0) { + Assertions.assertEquals("message", event.getMessage()); + Assertions.assertEquals(1, event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } else { + Assertions.assertEquals("message4", event.getMessage()); + Assertions.assertEquals( + dedupePredicate != null + && dedupePredicate.test(labels) + && interval != null + && interval.toMillis() > 0 + ? 4 + : 5, + event.getCount()); + Assertions.assertEquals(null, event.getMetadata().getLabels().get("a")); + } // Create a new event Assertions.assertTrue( EventUtils.createOrUpdateEventWithLabels( @@ -358,7 +414,8 @@ public boolean test(Map stringStringMap, Instant instant) { EventRecorder.Component.Operator, consumer, "mk2", - suppressionPredicate, + interval, + dedupePredicate, Map.of("a", "b"))); eventName = EventUtils.generateEventName( From ec5937700b3cd0da34ec1a5838362f0e41d2b5be Mon Sep 17 00:00:00 2001 From: Clara Xiong Date: Tue, 24 Oct 2023 08:40:33 -0700 Subject: [PATCH 3/3] Regenerated doc --- .../layouts/shortcodes/generated/auto_scaler_configuration.html | 2 +- .../org/apache/flink/autoscaler/config/AutoScalerOptions.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 09aee96a78..0b5f33a8e2 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -117,7 +117,7 @@ Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs. -
job.autoscaler.event.report.interval
+
job.autoscaler.scaling.event.interval
30 min Duration Time interval to resend the identical event diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 576037a48d..e8872a35c5 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -235,6 +235,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofSeconds(1800)) + .withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT =